dimanche 1 mars 2020

Testing Flink with embedded Kafka

I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));

pixels
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(simpleNotificationServiceSink);


env.execute(jobName);


private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
        public TimestampsAndWatermarks() {
            super(Time.seconds(90));
        }

        // timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
        @Override
        public long extractTimestamp(Pixel pixel) {
            return Long.parseLong(pixel.timestampReadable);
        }
    }

I would like to implement the scenario:

  1. Start embedded Kafka

  2. Publish couple of messages to the topic

  3. Consume the messages with Flink

  4. Check the correctness of the output produced by Flink

Does Flink provides utilities to test the job with embedded Kafka? If yes, what is the recommended approach?

Thanks.

Aucun commentaire:

Enregistrer un commentaire