Created
November 13, 2013 18:18
-
-
Save sirmes/7453751 to your computer and use it in GitHub Desktop.
CMP - Proposed changes for ResponseTopicConsumer and ESBBootstrap
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.s5a.cmp.scala.esb | |
| import com.s5a.service.core.akka.LifeCycle | |
| import com.s5a.config.{Config, S5AConfigFactory} | |
| import akka.actor.{ActorRef, ActorSystem} | |
| import org.hornetq.jms.server.embedded.EmbeddedJMS | |
| import javax.jms.ConnectionFactory | |
| import org.apache.camel.component.jms.JmsComponent | |
| import org.slf4j.{LoggerFactory, Logger} | |
| import akka.camel.CamelExtension | |
| import org.apache.camel.impl.DefaultCamelContext | |
| import scala.concurrent.duration._ | |
| import com.s5a.service.messages._ | |
| /** | |
| * | |
| * User: rrelyea | |
| * Date: 11/11/13 | |
| * Time: 12:00 PM | |
| */ | |
| //TODO this is not thread safe and has several race conditions... | |
| object ESBBootstrap { | |
| private val logger: Logger = LoggerFactory.getLogger(ESBBootstrap.getClass) | |
| val bootstrap = new ESBBootstrap | |
| def startup() = bootstrap.startup() | |
| def shutdown() = bootstrap.shutdown() | |
| def config: Config = bootstrap.saksConfig | |
| lazy val actorSystem = bootstrap.actorSystem | |
| lazy val camelConext = bootstrap.camelContext | |
| def endpointsForQuery(query: Query): (ActorRef, ActorRef) = { | |
| val actors: Option[(ActorRef, ActorRef)] = bootstrap.endpointsToActors.get(query.endpointName) | |
| actors match { | |
| case endpoints: Some[(ActorRef, ActorRef)] => endpoints.get | |
| case None => { | |
| logger.error("No endpoints for " + query.endpointName) | |
| throw new IllegalArgumentException("No endpoints for " + query.endpointName) | |
| } | |
| } | |
| } | |
| } | |
| class ESBBootstrap private[ESBBootstrap]() extends LifeCycle{ | |
| private val log: Logger = LoggerFactory.getLogger(classOf[ESBBootstrap]) | |
| override var serviceState: Any = "Stopped" | |
| private val saksConfig: Config = S5AConfigFactory.makeSaksConfig("cmp") | |
| val config = saksConfig | |
| private lazy val actorSystem: ActorSystem = ActorSystem.create("cmp-actors") | |
| private lazy val jmsServer: EmbeddedJMS = { | |
| val jms = new EmbeddedJMS | |
| jms.start() | |
| jms | |
| } | |
| private lazy val camelContext: DefaultCamelContext = CamelExtension(actorSystem).context | |
| //TODO: Use a case class instead of tuple | |
| private val endpointsToActors: scala.collection.mutable.Map[String, (ActorRef, ActorRef)] = scala.collection.mutable.Map() | |
| override def startup() { | |
| serviceState = "Starting" | |
| log.info("CMP starting") | |
| val cf = jmsServer.lookup("ConnectionFactory").asInstanceOf[ConnectionFactory] | |
| val jmsComponent = JmsComponent.jmsComponentAutoAcknowledge(cf) | |
| if(camelContext.hasComponent("jms") == null) | |
| camelContext.addComponent("jms", jmsComponent) | |
| val config = jmsComponent.getConfiguration | |
| config.setReceiveTimeout(25L) | |
| jmsComponent.setConfiguration(config) | |
| camelContext.start | |
| serviceState = "Running" | |
| log.info("started hornetq, now starting actors") | |
| val endpointNames:java.util.List[String] = saksConfig.getStringList("cmp.endpointNames") | |
| var i = 0 | |
| while(i < endpointNames.size){ | |
| val endpoint = endpointNames.get(i) | |
| val request = saksConfig.getString("cmp.endpoints." + endpointNames.get(i) + ".request") | |
| val response = saksConfig.getString("cmp.endpoints." + endpointNames.get(i) + ".response") | |
| log.debug("starting actors for: " + endpoint + "|" + request + "|" + response) | |
| val requestActor = actorSystem.actorOf(RequestQueueProducer.props(request)) | |
| val responseActor = actorSystem.actorOf(ResponseTopicConsumer.props(response)) | |
| val actorTuple = (requestActor, responseActor) | |
| endpointsToActors.put(endpoint, actorTuple) | |
| i = i + 1 | |
| } | |
| log.info("CMP started") | |
| } | |
| override def activate() {} | |
| override def deactivate() {} | |
| override def shutdown() { | |
| serviceState = "Stopping" | |
| camelContext.stop | |
| actorSystem.shutdown | |
| actorSystem.awaitTermination(5.second)//intellij highlights this as an error, but it isn't | |
| jmsServer.stop | |
| serviceState = "Stopped" | |
| } | |
| } | |
| package com.s5a.cmp.scala.esb | |
| import akka.actor.{ActorLogging, ActorRef, Props} | |
| import akka.camel.{CamelMessage, Consumer} | |
| import scala.collection.mutable.Map | |
| import play.api.libs.json._ | |
| import com.s5a.service.messages.Query | |
| object ResponseTopicConsumer { | |
| def props(endpointUri: String): Props = | |
| Props(classOf[ResponseTopicConsumer], endpointUri) | |
| } | |
| class ResponseTopicConsumer(override val endpointUri: String) extends Consumer with ActorLogging { | |
| //todo -- we need a ttl on the entries here | |
| //map of query uuid to aggregator actor | |
| val waitlist: Map[String, ActorRef] = Map() | |
| /** | |
| * This function will be invoked when we're using this actor as a | |
| * router. We need it so that we avoid a race condition, of | |
| * sending requests to a queue, before we've started listening to the | |
| * response topic. | |
| * @return | |
| */ | |
| def handleListenReq():Receive = { | |
| case ListenFor(aggregator,query,producer) => | |
| waitlist.put(query.id.toString, aggregator) | |
| producer ! CamelMessage( | |
| Json.toJson(query).toString(), | |
| scala.collection.immutable.Map(CamelMessage.MessageExchangeId -> query.id.toString)) | |
| } | |
| /** | |
| * This function will be invoked with this actor is behaving | |
| * as a real topic consumer. | |
| * @return | |
| */ | |
| override def receive: Receive = handleListenReq() orElse { | |
| case msg: CamelMessage => | |
| log.debug("ResponseTopicConsumer Consumed: " + msg) | |
| notifyTarget(msg) | |
| } | |
| def notifyTarget(response: CamelMessage){ | |
| val correlationId = response.getHeaderAs(CamelMessage.MessageExchangeId, classOf[String], camelContext) | |
| log.debug("marshalling: " + response.body.toString) | |
| val queryResponse: JsValue = Json.parse(response.body.toString) | |
| val aggregatorOption = waitlist.get(correlationId) | |
| aggregatorOption match { | |
| case Some(aggregator) => aggregator ! queryResponse | |
| case None => log.warning("message with correlation id " + correlationId | |
| + " did not have a corresponding aggregator\n" + response.body) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment