Does anyone know how to test windowing functions in Flink
? I am using the dependency flink-test-utils_2.11
.
My steps are:
- Get the
StreamExecutionEnvironment
- Create objects and add to the invironment
- Do a
keyBy
- add a Session Window
- execute an aggregate function
public class AggregateVariantCEVTest extends AbstractTestBase {
@Test
public void testAggregateVariantCev() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(objectOne, objectTwo)
.keyBy(new KeyedByMyCustomKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.aggregate(new MyAgreggateFunction());
JobExecutionResult result = env.execute();
assertEquals(myExpectedResults, result.getAllAccumulatorResults());
}
}
The problem is that result.getAllAccumulatorResults()
size is 0.
Any ideas what I am doing wrong? Thanks in advance!
Aucun commentaire:
Enregistrer un commentaire