Skip to content

Instantly share code, notes, and snippets.

@alpinegizmo
Last active July 10, 2018 12:29
Show Gist options
  • Select an option

  • Save alpinegizmo/5e450fb7041537d7081c78bd7b985341 to your computer and use it in GitHub Desktop.

Select an option

Save alpinegizmo/5e450fb7041537d7081c78bd7b985341 to your computer and use it in GitHub Desktop.
Flink job to mark clickstream events with user-ids, connecting pre-registration sessions with later, authenticated sessions.
/*
* Copyright 2018 data Artisans GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dataartisans.flinktraining.examples.datastream_java.broadcast;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/*
Given a stream of website clickstream events, tag every event with a user-id. In particular,
we aim to connect the unauthenticated (logged-out) sessions that users have before they ever
register and login with the authenticated sessions that they have later.
Some events have only a cookie as an identifier, some have only an authentication token,
and only login events have both (allowing us to connect both kinds of sessions). The
events may arrive out-of-order, forcing us to buffer some logged-in events until the
login event itself has arrived.
*/
public class UserSessions {
public static long NOT_A_USER = 0L;
private static class Event {
final public long timestamp;
final public String payload;
final public String cookie;
final public long authtoken;
final public long user;
// an event in a cookie-based, unauthenticated session
Event(long t, String payload, String cookie) {
this.timestamp = t;
this.payload = payload;
this.cookie = cookie;
this.authtoken = 0L;
this.user = NOT_A_USER;
}
// an event in a token-based, authenticated session
Event(long t, String payload, long authtoken) {
this.timestamp = t;
this.payload = payload;
this.cookie = "";
this.authtoken = authtoken;
this.user = NOT_A_USER;
}
// a login event with both a cookie and a token
Event(long t, String payload, String cookie, Long authtoken) {
this.timestamp = t;
this.payload = payload;
this.cookie = cookie;
this.authtoken = authtoken;
this.user = NOT_A_USER;
}
// an event, either unauthenticated or authenticated, that has been marked with a user-id
Event(Event event, Long user) {
this.timestamp = event.timestamp;
this.payload = event.payload;
this.cookie = event.cookie;
this.authtoken = event.authtoken;
this.user = user;
}
public boolean hasCookie() {
return this.cookie != "";
}
public boolean hasToken() {
return this.authtoken != 0L;
}
public boolean isLogin() {
return hasCookie() && hasToken();
}
@Override
public String toString() {
return "Event{" +
"time=" + timestamp +
", payload=" + payload +
", cookie=" + cookie +
", authoken=" + authtoken +
", user=" + user +
'}';
}
}
final static OutputTag<Event> loginTag = new OutputTag<Event>("loginTag") {};
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
final List<Event> input = new ArrayList<>();
input.add(new Event(1, "homepage", "cookieA"));
input.add(new Event(2, "click", "cookieA"));
input.add(new Event(3, "homepage", "cookieB"));
input.add(new Event(4, "browse", "cookieA"));
input.add(new Event(5, "out-of-order-event", 1L));
input.add(new Event(6, "login", "cookieA", 1L));
input.add(new Event(7, "login", "cookieB", 2L));
input.add(new Event(8, "transfer", 2L));
input.add(new Event(9, "balance", 1L));
/*
+--------------+ +--------+
| +------------------------------> |
+--------+ +-------+ cookie | add user-id | | |
| | | +----------> keyBy cookie | login +--------------+ | sink |
| | | | | +---------> | | |
| source +-----> split | +--------------+ | add user-id +-----> |
| | | | | keyBy token | +--------+
| | | +-----------------------------------> |
+--------+ +-------+ no cookie +--------------+
*/
DataStream<Event> events = env
.fromCollection(input)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.milliseconds(10)) {
@Override
public long extractTimestamp(Event event) {
return event.timestamp;
}
});
// Split the stream:
// all cookie-based events, including login events, go into the cookie split
// all other events go into the authenticated split
SplitStream<Event> splits = events.split(new SplitCookiesFromTokens());
DataStream<Event> unauthenticated = splits.select("cookie");
DataStream<Event> authenticated = splits.select("authenticated");
// Mark the cookie-based events with a unique user-id.
DataStream<Event> cookieMarkedWithUser = unauthenticated
.keyBy(e -> e.cookie)
.process(new MarkCookieWithUser());
// Grab just the login events (now marked with a user-id), and broadcast them to the authenticated stream.
DataStream<Event> userInfoStream = ((SingleOutputStreamOperator<Event>) cookieMarkedWithUser)
.getSideOutput(loginTag)
.keyBy(e -> e.authtoken);
DataStream<Event> tokenMarkedWithUser = authenticated
.keyBy(e -> e.authtoken)
.connect(userInfoStream)
.process(new MarkTokenWithUser());
// Now that every event has been marked with the appropriate user-id, emit both sub-streams.
cookieMarkedWithUser.union(tokenMarkedWithUser).print();
System.out.println(env.getExecutionPlan());
env.execute();
}
public static class SplitCookiesFromTokens implements OutputSelector<Event> {
@Override
public Iterable<String> select(Event event) {
List<String> output = new ArrayList<String>();
if (event.hasCookie()){
output.add("cookie");
} else {
output.add("authenticated");
}
return output;
}
}
public static class MarkCookieWithUser extends KeyedProcessFunction<String, Event, Event> {
private final ValueStateDescriptor<Long> userForCookieDesc =
new ValueStateDescriptor<Long>("usersForCookie", BasicTypeInfo.LONG_TYPE_INFO);
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) throws Exception {
ValueState<Long> userState = getRuntimeContext().getState(userForCookieDesc);
Long user = userState.value();
if (user == null) {
// assign a randomly generated, unique ID
user = new Random().nextLong();
userState.update(user);
}
Event markedEvent = new Event(event, user);
out.collect(markedEvent);
// split off the login events to combine with the token-based events
if (event.isLogin()) {
ctx.output(loginTag, markedEvent);
}
}
}
public static class MarkTokenWithUser extends CoProcessFunction<Event, Event, Event> {
private transient ListState<Event> eventBuffer;
private transient ValueState<Long> usersForToken;
@Override
public void processElement1(Event event, Context ctx, Collector<Event> out) throws Exception {
Long user = usersForToken.value();
if (user == null) {
// we don't yet know the user-id, so buffer this event
eventBuffer.add(event);
} else {
out.collect(new Event(event, user));
}
}
@Override
public void processElement2(Event event, Context ctx, Collector<Event> out) throws Exception {
usersForToken.update(event.user);
// now that we know the user-id, flush the buffer
eventBuffer.get().forEach(e -> out.collect(new Event(e, event.user)));
eventBuffer.clear();
}
public void open(Configuration config) {
ValueStateDescriptor<Long> userForTokenDesc =
new ValueStateDescriptor<Long>("usersForToken", BasicTypeInfo.LONG_TYPE_INFO);
usersForToken = getRuntimeContext().getState(userForTokenDesc);
ListStateDescriptor<Event> eventBufferDesc =
new ListStateDescriptor<Event>("unmarkedEvents", Event.class);
eventBuffer = getRuntimeContext().getListState(eventBufferDesc);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment