I have a class that pulls in RDDs from a Flume stream. I'd like to test it by having the test populate the stream. I thought using the queueStream method on StreamingContext would work but I'm running into problems:
- I get NullPointerExceptions when I call queueStream
:
java.lang.NullPointerException
at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
- The RDDs are not passed from the test into the system.
Any ideas?
Is my approach anywhere close to correct?
Am I using queueStream correctly?
Should I refactor my code such that I don't need to test methods that create a FlumeStream (maybe use constructor dependency injection?)
Here's the code
System Under Test
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.flume.SparkFlumeEvent
class SystemUnderTest(ssc : StreamingContext, conf : SparkConf) {
def start = {
startFlumeStream
startSparkStreaming
}
private def startFlumeStream = {
// How do I test this function ?
val flumeStream = FlumeUtils.createStream(ssc, "localhost", 2121)
val reducedStream = flumeStream.reduceByWindow((key, value) => key, Seconds(30), Seconds(10))
reducedStream.foreachRDD(rdd => simpleTransformFunction(rdd))
}
private def simpleTransformFunction(rdd : RDD[SparkFlumeEvent]) = {
// This function is called every 10 seconds - but I can't seem to pass any
// RDDs to it
println("simpleTransformFunction called!")
rdd.map(event => {
val headers = event.event.getHeaders()
println(headers.get("type"))
println(headers.get("value"))
})
}
private def startSparkStreaming = {
ssc.start
}
}
Test driver
import scala.collection.mutable.Queue
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume.SparkFlumeEvent
import org.apache.spark.streaming.StreamingContext._
import java.util.Map
import java.util.HashMap
object Example {
def main(args:Array[String]): Unit = {
// Set the required values in the config
val appName = "AppName"
val master = "local[*]"
val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Create a queue that will be populated with test data
val rddQueue = new Queue[RDD[SparkFlumeEvent]]
rdd += scc.sparkContext.makeRDD(createSparkFlumeEvents)
// put the queue into the context (events added to the queue should show up in the context?)
// this seems to cause NPEs
ssc.queueStream(rddQueue)
// Create and start the system under test
val systemUnderTest = new SystemUnderTest(ssc, sparkConf)
systemUnderTest.start
}
// Create a Sequence of valid SparkFlumeEvents
private def createSparkFlumeEvents : Seq[SparkFlumeEvent] = {
val events = new Queue[SparkFlumeEvent]
for (a <- 1 to 10) {
events += createSparkFlumeEvent(a)
}
events.toSeq
}
private def createSparkFlumeEvent(value :Int) = {
val sparkFlumeEvent = new SparkFlumeEvent
val map = new HashMap[CharSequence, CharSequence]
map.put("type", "install")
map.put("value", value.toString)
sparkFlumeEvent.event.setHeaders(map)
sparkFlumeEvent
}
}
Aucun commentaire:
Enregistrer un commentaire