I'm using spark-testing-base to test an method with an action with "runAction" method.
I don't know the reason but although I set some input data it seems that the method doesn't get any data. I did some logging and it doesn't log anything.
class MyFirstSparkTest extends FunSuite with StreamingActionBase {
var cluster: Cluster = _
var session: Session = _
var service: MyService = _
override def beforeAll(): Unit = {
super.beforeAll()
..
service = MyService.apply(conf)
//Init embedded C*
...
}
test("RunAction") {
val inputInsert = MyData(...)
val input = List(List(inputInsert))
runAction(input, service.myMethod)
//Execute some assser here..
}
}
The method I want to test map the input data and store in an embedded Cassandra.
def myMethod(input: DStream[MyData]) : Unit = {
val transformRdd = tranform(input)
transformRdd .saveToCassandra(...)
}
Inside of the myMethod:
def transform(input : DStream[MyData]) : (DStream[MyDataToCassandra] = {
println("Transforming")
val rddToCassandra = input.
map {
record =>
//It doesnt' come here. Why?
println("record " + record)
...
newRecord
}
I execute in local[*] mode.
Aucun commentaire:
Enregistrer un commentaire