Skip to content

Instantly share code, notes, and snippets.

@alpinegizmo
Created August 29, 2020 09:52
Show Gist options
  • Select an option

  • Save alpinegizmo/f51e924319f134373b59cb9814cf3db5 to your computer and use it in GitHub Desktop.

Select an option

Save alpinegizmo/f51e924319f134373b59cb9814cf3db5 to your computer and use it in GitHub Desktop.
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class CEPPattern46 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
environment.setParallelism(1);
DataStream<String> stringInputStream = environment.fromElements("4a", "4b", "6");
Pattern<String, ?> pattern = Pattern.<String>begin("start")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> c) throws Exception {
System.out.println("start: " + s);
return s.contains("4");
}
}).followedBy("next").where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> c) throws Exception {
System.out.println(" next: " + s);
System.out.println(" with start " + String.join(":", c.getEventsForPattern("start")));
return s.equals("6");
}
});
PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
DataStream<String> processed = patternStream.process(new PatternProcessFunction<String, String>() {
@Override
public void processMatch(
Map<String, List<String>> map,
Context context,
Collector<String> out) throws Exception {
out.collect("done");
}
});
processed.print();
environment.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment