Last active
October 17, 2016 02:57
-
-
Save TanUkkii007/f3b5784df0f8f7c3b144c38279455a09 to your computer and use it in GitHub Desktop.
flatMapConcat modification of StreamDispatchers.scala from https://gist.github.com/johanandren/d55d022bff39ab65b170d8478219604a
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package streams | |
| import akka.actor.ActorSystem | |
| import akka.stream._ | |
| import akka.stream.scaladsl._ | |
| import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
| import com.typesafe.config.ConfigFactory | |
| object StreamDispatchers extends App { | |
| implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
| """ | |
| another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| yet-another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| mine { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| """)) | |
| implicit val materializer = ActorMaterializer() | |
| val source = Source.single(1).flatMapConcat { _ => | |
| Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine")) | |
| } | |
| .map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
| .map { n => println("expected another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("another")) | |
| .map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("yet-another")) | |
| .runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
| } | |
| class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
| val out: Outlet[Int] = Outlet("TestStage") | |
| override def shape: SourceShape[Int] = SourceShape(out) | |
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
| var i = 0 | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| i += 1 | |
| println("expected mine: " + Thread.currentThread().getName) | |
| if (i == n) | |
| complete(out) | |
| else | |
| push(out, i) | |
| } | |
| }) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package streams | |
| import akka.actor.ActorSystem | |
| import akka.stream._ | |
| import akka.stream.scaladsl._ | |
| import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
| import com.typesafe.config.ConfigFactory | |
| object StreamDispatchers extends App { | |
| implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
| """ | |
| another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| yet-another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| mine { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| """)) | |
| implicit val materializer = ActorMaterializer() | |
| val source = Source.single(1).flatMapConcat { _ => | |
| Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine")).async //Just added async to above example | |
| } | |
| .map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
| .map { n => println("expected another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("another")) | |
| .map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("yet-another")) | |
| .runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
| } | |
| class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
| val out: Outlet[Int] = Outlet("TestStage") | |
| override def shape: SourceShape[Int] = SourceShape(out) | |
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
| var i = 0 | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| i += 1 | |
| println("expected mine: " + Thread.currentThread().getName) | |
| if (i == n) | |
| complete(out) | |
| else | |
| push(out, i) | |
| } | |
| }) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package streams | |
| import akka.actor.ActorSystem | |
| import akka.stream._ | |
| import akka.stream.scaladsl._ | |
| import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
| import com.typesafe.config.ConfigFactory | |
| object StreamDispatchers extends App { | |
| implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
| """ | |
| another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| yet-another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| mine { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| """)) | |
| implicit val materializer = ActorMaterializer() | |
| val source = Source.single(1).flatMapConcat { _ => | |
| Source.fromGraph(new TestStage(20)) | |
| }.withAttributes(ActorAttributes.dispatcher("mine")) // dispatcher attribute attatched to flatMapConcat stage | |
| .map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
| .map { n => println("expected another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("another")) | |
| .map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("yet-another")) | |
| .runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
| } | |
| class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
| val out: Outlet[Int] = Outlet("TestStage") | |
| override def shape: SourceShape[Int] = SourceShape(out) | |
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
| var i = 0 | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| i += 1 | |
| println("expected mine: " + Thread.currentThread().getName) | |
| if (i == n) | |
| complete(out) | |
| else | |
| push(out, i) | |
| } | |
| }) | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
// output fom StreamDispatchersWithFlatMapConcatWithDispathcer.scala
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4