I have a simple Flink application, which sums up the events with the same id and timestamp within the last minute:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));
pixels
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(simpleNotificationServiceSink);
env.execute(jobName);
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
public TimestampsAndWatermarks() {
super(Time.seconds(90));
}
// timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
@Override
public long extractTimestamp(Pixel pixel) {
return Long.parseLong(pixel.timestampReadable);
}
}
I would like to implement the scenario:
-
Start embedded Kafka
-
Publish couple of messages to the topic
-
Consume the messages with Flink
-
Check the correctness of the output produced by Flink
Does Flink provides utilities to test the job with embedded Kafka? If yes, what is the recommended approach?
Thanks.
Aucun commentaire:
Enregistrer un commentaire