Skip to content

Instantly share code, notes, and snippets.

@smarthi
Last active August 26, 2016 04:25
Show Gist options
  • Select an option

  • Save smarthi/4848cfd76d7f964dd2d1276518599903 to your computer and use it in GitHub Desktop.

Select an option

Save smarthi/4848cfd76d7f964dd2d1276518599903 to your computer and use it in GitHub Desktop.
private void executeJob(final ParameterTool parameters) throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
KeyedStream<Tuple3<String, String, Long>, Tuple> keyedDS = ....;
WindowedStream<Tuple3<String, String, Long>, Tuple, GlobalWindow> windowedStream =
keyedDS.window(GlobalWindows.create());
windowedStream.trigger(TimeoutTrigger.of(CountTrigger.of(3), 10));
DataStream<Tuple3<String, String, Long>> result = windowedStream.sum(2);
result.print();
execEnv.execute();
}
public class TimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
private Trigger<T, W> nestedTrigger;
private final long sessionTimeout;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
public TimeoutTrigger(Trigger<T, W> nestedTrigger,
final Long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
this.nestedTrigger = nestedTrigger;
}
@Override
public TriggerResult onElement(T newsFeed, long timestamp, W window,
TriggerContext triggerContext) throws Exception {
triggerContext.registerProcessingTimeTimer(window.maxTimestamp() + sessionTimeout);
TriggerResult triggerResult = nestedTrigger.onElement(newsFeed, timestamp, window, triggerContext);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public TriggerResult onProcessingTime(long timestamp, W window,
TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long timestamp, W window,
TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext triggerContext) throws Exception {
triggerContext.deleteProcessingTimeTimer(window.maxTimestamp() + sessionTimeout);
nestedTrigger.clear(window, triggerContext);
triggerContext.getPartitionedState(stateDesc).clear();
}
/**
* Creates a new purging trigger from the given {@code Trigger}.
*
* @param nestedTrigger The trigger that is wrapped by this purging trigger
*/
public static <T, W extends Window> TimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, long sessionTimeout) {
return new TimeoutTrigger<>(nestedTrigger, sessionTimeout);
}
@Override
public String toString() {
return "TimeoutTrigger()";
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment