-
-
Save esthomw/fb82c79ee8712d147a0f4995d4a83299 to your computer and use it in GitHub Desktop.
Akka-http example of SSE using an Actor as the source of the events Stream.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.klarna.risk.linking.variables | |
| import akka.actor.{Actor, ActorSystem, Props} | |
| import akka.http.scaladsl.Http | |
| import akka.http.scaladsl.model.StatusCodes | |
| import akka.http.scaladsl.model.sse.ServerSentEvent | |
| import akka.http.scaladsl.server.Directives._ | |
| import akka.http.scaladsl.server.Route | |
| import akka.stream._ | |
| import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete} | |
| import scala.concurrent.ExecutionContext.Implicits.global | |
| import scala.concurrent.duration._ | |
| object SseApp extends App { | |
| import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ | |
| implicit val actorSystem = ActorSystem() | |
| implicit val mat = ActorMaterializer() | |
| val (sourceQueue, eventsSource) = Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure) | |
| .delay(1.seconds, DelayOverflowStrategy.backpressure) | |
| .map(message => ServerSentEvent(message)) | |
| .keepAlive(1.second, () => ServerSentEvent.heartbeat) | |
| .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both) | |
| .run() | |
| val streamingActor = actorSystem.actorOf(Props(classOf[StreamingActor], sourceQueue)) | |
| def route: Route = { | |
| path("events") { | |
| get { | |
| complete { | |
| eventsSource | |
| } | |
| } ~ put { | |
| entity(as[String]) { event => | |
| complete { | |
| streamingActor ! event | |
| StatusCodes.OK | |
| } | |
| } | |
| } | |
| } | |
| } | |
| Http().bindAndHandle(route, "0.0.0.0", 9999) | |
| class StreamingActor(source: SourceQueueWithComplete[String]) extends Actor { | |
| override def receive: Receive = { | |
| case msg: String => source.offer(msg) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment