Last active
August 26, 2016 04:25
-
-
Save smarthi/4848cfd76d7f964dd2d1276518599903 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private void executeJob(final ParameterTool parameters) throws Exception { | |
| StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); | |
| KeyedStream<Tuple3<String, String, Long>, Tuple> keyedDS = ....; | |
| WindowedStream<Tuple3<String, String, Long>, Tuple, GlobalWindow> windowedStream = | |
| keyedDS.window(GlobalWindows.create()); | |
| windowedStream.trigger(TimeoutTrigger.of(CountTrigger.of(3), 10)); | |
| DataStream<Tuple3<String, String, Long>> result = windowedStream.sum(2); | |
| result.print(); | |
| execEnv.execute(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class TimeoutTrigger<T, W extends Window> extends Trigger<T, W> { | |
| private Trigger<T, W> nestedTrigger; | |
| private final long sessionTimeout; | |
| private final ReducingStateDescriptor<Long> stateDesc = | |
| new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); | |
| public TimeoutTrigger(Trigger<T, W> nestedTrigger, | |
| final Long sessionTimeout) { | |
| this.sessionTimeout = sessionTimeout; | |
| this.nestedTrigger = nestedTrigger; | |
| } | |
| @Override | |
| public TriggerResult onElement(T newsFeed, long timestamp, W window, | |
| TriggerContext triggerContext) throws Exception { | |
| triggerContext.registerProcessingTimeTimer(window.maxTimestamp() + sessionTimeout); | |
| TriggerResult triggerResult = nestedTrigger.onElement(newsFeed, timestamp, window, triggerContext); | |
| return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; | |
| } | |
| @Override | |
| public TriggerResult onProcessingTime(long timestamp, W window, | |
| TriggerContext triggerContext) throws Exception { | |
| return TriggerResult.FIRE_AND_PURGE; | |
| } | |
| @Override | |
| public TriggerResult onEventTime(long timestamp, W window, | |
| TriggerContext triggerContext) throws Exception { | |
| return TriggerResult.CONTINUE; | |
| } | |
| @Override | |
| public void clear(W window, TriggerContext triggerContext) throws Exception { | |
| triggerContext.deleteProcessingTimeTimer(window.maxTimestamp() + sessionTimeout); | |
| nestedTrigger.clear(window, triggerContext); | |
| triggerContext.getPartitionedState(stateDesc).clear(); | |
| } | |
| /** | |
| * Creates a new purging trigger from the given {@code Trigger}. | |
| * | |
| * @param nestedTrigger The trigger that is wrapped by this purging trigger | |
| */ | |
| public static <T, W extends Window> TimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, long sessionTimeout) { | |
| return new TimeoutTrigger<>(nestedTrigger, sessionTimeout); | |
| } | |
| @Override | |
| public String toString() { | |
| return "TimeoutTrigger()"; | |
| } | |
| private static class Sum implements ReduceFunction<Long> { | |
| private static final long serialVersionUID = 1L; | |
| @Override | |
| public Long reduce(Long value1, Long value2) throws Exception { | |
| return value1 + value2; | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment