dimanche 1 avril 2018

Testing with Spark and spark-testing-base

I'm using spark-testing-base to test an method with an action with "runAction" method.

I don't know the reason but although I set some input data it seems that the method doesn't get any data. I did some logging and it doesn't log anything.

class MyFirstSparkTest extends FunSuite with StreamingActionBase {
  var cluster: Cluster = _
  var session: Session = _
  var service: MyService = _


  override def beforeAll(): Unit = {
    super.beforeAll()
    ..
    service = MyService.apply(conf)
    //Init embedded C*
    ...
  }

  test("RunAction") {

    val inputInsert = MyData(...)
    val input = List(List(inputInsert))
    runAction(input, service.myMethod)
    //Execute some assser here..
  }
}

The method I want to test map the input data and store in an embedded Cassandra.

  def myMethod(input: DStream[MyData]) : Unit = {
   val transformRdd = tranform(input)
   transformRdd .saveToCassandra(...)
  }

Inside of the myMethod:

def transform(input : DStream[MyData]) : (DStream[MyDataToCassandra] = {
    println("Transforming")

    val rddToCassandra = input.
      map {
        record => 
          //It doesnt' come here. Why?
          println("record " + record)
          ...
          newRecord
      }

I execute in local[*] mode.

Aucun commentaire:

Enregistrer un commentaire