vendredi 1 juillet 2016

How to test Spark streaming code

I have a class that pulls in RDDs from a Flume stream. I'd like to test it by having the test populate the stream. I thought using the queueStream method on StreamingContext would work but I'm running into problems:

  1. I get NullPointerExceptions when I call queueStream

:

java.lang.NullPointerException
        at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
        at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
        at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
        at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
        at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
        at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

  1. The RDDs are not passed from the test into the system.

Any ideas?

Is my approach anywhere close to correct?

Am I using queueStream correctly?

Should I refactor my code such that I don't need to test methods that create a FlumeStream (maybe use constructor dependency injection?)

Here's the code

System Under Test

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.flume.SparkFlumeEvent

class SystemUnderTest(ssc : StreamingContext, conf : SparkConf) {

  def start = {
    startFlumeStream
    startSparkStreaming
  }


  private def startFlumeStream = {
    // How do I test this function ?
    val flumeStream = FlumeUtils.createStream(ssc, "localhost", 2121)
    val reducedStream = flumeStream.reduceByWindow((key, value) => key, Seconds(30), Seconds(10))
    reducedStream.foreachRDD(rdd => simpleTransformFunction(rdd))

  }

  private def simpleTransformFunction(rdd : RDD[SparkFlumeEvent]) = {
    // This function is called every 10 seconds - but I can't seem to pass any 
    // RDDs to it
    println("simpleTransformFunction called!")


    rdd.map(event => {
      val headers = event.event.getHeaders()
      println(headers.get("type"))
      println(headers.get("value"))
    })

  }

  private def startSparkStreaming = {
    ssc.start
  }

}

Test driver

import scala.collection.mutable.Queue

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume.SparkFlumeEvent
import org.apache.spark.streaming.StreamingContext._

import java.util.Map
import java.util.HashMap

object Example {

  def main(args:Array[String]): Unit = {
    // Set the required values in the config
    val appName = "AppName"
    val master = "local[*]"
    val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // Create a queue that will be populated with test data
    val rddQueue = new Queue[RDD[SparkFlumeEvent]]

    rdd += scc.sparkContext.makeRDD(createSparkFlumeEvents)
    // put the queue into the context (events added to the queue should show up in the context?)
    // this seems to cause NPEs
    ssc.queueStream(rddQueue) 

    // Create and start the system under test
    val systemUnderTest = new SystemUnderTest(ssc, sparkConf)
    systemUnderTest.start

  }

  // Create a Sequence of valid SparkFlumeEvents 
  private def createSparkFlumeEvents : Seq[SparkFlumeEvent] = {
    val events = new Queue[SparkFlumeEvent]
    for (a <- 1 to 10) {
      events += createSparkFlumeEvent(a)
    }
    events.toSeq
  }

  private def createSparkFlumeEvent(value :Int) = {
    val sparkFlumeEvent = new SparkFlumeEvent
    val map = new HashMap[CharSequence, CharSequence]
    map.put("type", "install")
    map.put("value", value.toString)
    sparkFlumeEvent.event.setHeaders(map)

    sparkFlumeEvent
  }
}

Aucun commentaire:

Enregistrer un commentaire