Created
April 5, 2017 11:00
-
-
Save vdebergue/e9e7bddb5f773582a5fc0a1b0276eb9c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| sealed trait RedisOverflowStrategy | |
| object RedisOverflowStrategy { | |
| case object DropHead extends RedisOverflowStrategy | |
| case object DropBuffer extends RedisOverflowStrategy | |
| case object DropNew extends RedisOverflowStrategy | |
| case object Fail extends RedisOverflowStrategy | |
| } | |
| class RedisSourceStage(sedisPool: org.sedis.Pool, channel: String, bufferSize: Int, overflowStrategy: RedisOverflowStrategy) extends GraphStage[SourceShape[String]] { | |
| val out: Outlet[String] = Outlet("Redis.out") | |
| val logger = Logger("storage.redis.source") | |
| override val shape: SourceShape[String] = SourceShape(out) | |
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
| val pool = sedisPool.underlying | |
| val queue = collection.mutable.Queue.empty[String] | |
| val runner = new Thread(new Runnable { | |
| override def run(): Unit = { | |
| logger.debug(s"Subscribing to new channel '$channel'") | |
| val client = pool.getResource() | |
| try { | |
| client.subscribe(listener, channel) | |
| } catch { | |
| case NonFatal(e) => logger.warn(s"Error in redis thread $channel", e) | |
| } finally { | |
| pool.returnResourceObject(client) | |
| } | |
| } | |
| }) | |
| runner.setName(channel) | |
| runner.setDaemon(true) | |
| // wrap 'deliverMessageOrEnqueue' in a AsyncCallback beacause it will be called from the daemon thread | |
| val onMessageCallBack = getAsyncCallback(deliverMessageOrEnqueue) | |
| val listener = new JedisPubSub { | |
| override def onMessage(chan: String, message: String) { | |
| onMessageCallBack.invoke(message) | |
| } | |
| def onSubscribe(m: String, c: Int) {} | |
| def onUnsubscribe(m: String, c: Int) {} | |
| def onPSubscribe(m: String, c: Int) {} | |
| def onPUnsubscribe(m: String, c: Int) {} | |
| def onPMessage(pattern: String, channel: String, message: String) {} | |
| } | |
| override def preStart(): Unit = { | |
| runner.start() | |
| } | |
| def deliverMessageOrEnqueue(msg: String): Unit = { | |
| if (isAvailable(out)) { | |
| push(out, msg) | |
| } else { | |
| if (queue.size + 1 > bufferSize) { | |
| logger.warn(s"Buffer is full for channel '$channel', handling overflow with $overflowStrategy") | |
| handleOverflow(msg) | |
| } else { | |
| queue.enqueue(msg) | |
| } | |
| } | |
| } | |
| def handleOverflow(msg: String): Unit = overflowStrategy match { | |
| case RedisOverflowStrategy.DropHead => | |
| queue.dequeue() | |
| queue.enqueue(msg) | |
| case RedisOverflowStrategy.DropBuffer => | |
| queue.clear() | |
| queue.enqueue(msg) | |
| case RedisOverflowStrategy.DropNew => () | |
| case RedisOverflowStrategy.Fail => failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize")) | |
| } | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| if (queue.nonEmpty) { | |
| push(out, queue.dequeue()) | |
| } | |
| } | |
| }) | |
| override def postStop(): Unit = { | |
| logger.debug(s"Stoping redis source for channel $channel") | |
| listener.unsubscribe() | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment