mardi 25 juin 2019

How to properly test a Flink window function?

Does anyone know how to test windowing functions in Flink? I am using the dependency flink-test-utils_2.11.

My steps are:

  1. Get the StreamExecutionEnvironment
  2. Create objects and add to the invironment
  3. Do a keyBy
  4. add a Session Window
  5. execute an aggregate function
public class AggregateVariantCEVTest extends AbstractTestBase {

   @Test
    public void testAggregateVariantCev() throws Exception  {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       env.fromElements(objectOne, objectTwo)
               .keyBy(new KeyedByMyCustomKey())
               .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
               .aggregate(new MyAgreggateFunction());


       JobExecutionResult result = env.execute();

       assertEquals(myExpectedResults, result.getAllAccumulatorResults());

   }
}

The problem is that result.getAllAccumulatorResults() size is 0.

Any ideas what I am doing wrong? Thanks in advance!

Aucun commentaire:

Enregistrer un commentaire