Skip to content

Instantly share code, notes, and snippets.

@vdebergue
Created April 5, 2017 11:00
Show Gist options
  • Select an option

  • Save vdebergue/e9e7bddb5f773582a5fc0a1b0276eb9c to your computer and use it in GitHub Desktop.

Select an option

Save vdebergue/e9e7bddb5f773582a5fc0a1b0276eb9c to your computer and use it in GitHub Desktop.
sealed trait RedisOverflowStrategy
object RedisOverflowStrategy {
case object DropHead extends RedisOverflowStrategy
case object DropBuffer extends RedisOverflowStrategy
case object DropNew extends RedisOverflowStrategy
case object Fail extends RedisOverflowStrategy
}
class RedisSourceStage(sedisPool: org.sedis.Pool, channel: String, bufferSize: Int, overflowStrategy: RedisOverflowStrategy) extends GraphStage[SourceShape[String]] {
val out: Outlet[String] = Outlet("Redis.out")
val logger = Logger("storage.redis.source")
override val shape: SourceShape[String] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val pool = sedisPool.underlying
val queue = collection.mutable.Queue.empty[String]
val runner = new Thread(new Runnable {
override def run(): Unit = {
logger.debug(s"Subscribing to new channel '$channel'")
val client = pool.getResource()
try {
client.subscribe(listener, channel)
} catch {
case NonFatal(e) => logger.warn(s"Error in redis thread $channel", e)
} finally {
pool.returnResourceObject(client)
}
}
})
runner.setName(channel)
runner.setDaemon(true)
// wrap 'deliverMessageOrEnqueue' in a AsyncCallback beacause it will be called from the daemon thread
val onMessageCallBack = getAsyncCallback(deliverMessageOrEnqueue)
val listener = new JedisPubSub {
override def onMessage(chan: String, message: String) {
onMessageCallBack.invoke(message)
}
def onSubscribe(m: String, c: Int) {}
def onUnsubscribe(m: String, c: Int) {}
def onPSubscribe(m: String, c: Int) {}
def onPUnsubscribe(m: String, c: Int) {}
def onPMessage(pattern: String, channel: String, message: String) {}
}
override def preStart(): Unit = {
runner.start()
}
def deliverMessageOrEnqueue(msg: String): Unit = {
if (isAvailable(out)) {
push(out, msg)
} else {
if (queue.size + 1 > bufferSize) {
logger.warn(s"Buffer is full for channel '$channel', handling overflow with $overflowStrategy")
handleOverflow(msg)
} else {
queue.enqueue(msg)
}
}
}
def handleOverflow(msg: String): Unit = overflowStrategy match {
case RedisOverflowStrategy.DropHead =>
queue.dequeue()
queue.enqueue(msg)
case RedisOverflowStrategy.DropBuffer =>
queue.clear()
queue.enqueue(msg)
case RedisOverflowStrategy.DropNew => ()
case RedisOverflowStrategy.Fail => failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (queue.nonEmpty) {
push(out, queue.dequeue())
}
}
})
override def postStop(): Unit = {
logger.debug(s"Stoping redis source for channel $channel")
listener.unsubscribe()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment