mercredi 26 février 2020

Testing Flink window

I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));

pixels
        .keyBy("id", "timestampRoundedToMinutes")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(dynamoDBSink);

env.execute(jobName);

I am trying to test this application with the recommended approach in documentation. I also have looked at this stackoverflow question, but adding the sink hadn't helped.

I do have a @ClassRule as recommended in my test class. The function looks like this:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

CollectSink.values.clear();

Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();

env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
    .keyBy("id","timestampRoundedToMinutes")
    .timeWindow(Time.minutes(1))
    .sum("constant")
    .addSink(new CollectSink());

JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());

CollectSink is copied from documentation.

What am I doing wrong? Is there also a simple way to test the application with embedded kafka?

Thanks!

Aucun commentaire:

Enregistrer un commentaire