mardi 25 juin 2019

How to properly test a Flink window function?

Does anyone know how to test windowing functions in Flink? I am using the dependency flink-test-utils_2.11.

My steps are:

  1. Get the StreamExecutionEnvironment
  2. Create objects and add to the invironment
  3. Do a keyBy
  4. add a Session Window
  5. execute an aggregate function
public class AggregateVariantCEVTest extends AbstractTestBase {

    public void testAggregateVariantCev() throws Exception  {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       env.fromElements(objectOne, objectTwo)
               .keyBy(new KeyedByMyCustomKey())
               .aggregate(new MyAgreggateFunction());

       JobExecutionResult result = env.execute();

       assertEquals(myExpectedResults, result.getAllAccumulatorResults());


The problem is that result.getAllAccumulatorResults() size is 0.

Any ideas what I am doing wrong? Thanks in advance!

Aucun commentaire:

Enregistrer un commentaire