Skip to content

Instantly share code, notes, and snippets.

@sirmes
Created November 13, 2013 18:18
Show Gist options
  • Select an option

  • Save sirmes/7453751 to your computer and use it in GitHub Desktop.

Select an option

Save sirmes/7453751 to your computer and use it in GitHub Desktop.
CMP - Proposed changes for ResponseTopicConsumer and ESBBootstrap
package com.s5a.cmp.scala.esb
import com.s5a.service.core.akka.LifeCycle
import com.s5a.config.{Config, S5AConfigFactory}
import akka.actor.{ActorRef, ActorSystem}
import org.hornetq.jms.server.embedded.EmbeddedJMS
import javax.jms.ConnectionFactory
import org.apache.camel.component.jms.JmsComponent
import org.slf4j.{LoggerFactory, Logger}
import akka.camel.CamelExtension
import org.apache.camel.impl.DefaultCamelContext
import scala.concurrent.duration._
import com.s5a.service.messages._
/**
*
* User: rrelyea
* Date: 11/11/13
* Time: 12:00 PM
*/
//TODO this is not thread safe and has several race conditions...
object ESBBootstrap {
private val logger: Logger = LoggerFactory.getLogger(ESBBootstrap.getClass)
val bootstrap = new ESBBootstrap
def startup() = bootstrap.startup()
def shutdown() = bootstrap.shutdown()
def config: Config = bootstrap.saksConfig
lazy val actorSystem = bootstrap.actorSystem
lazy val camelConext = bootstrap.camelContext
def endpointsForQuery(query: Query): (ActorRef, ActorRef) = {
val actors: Option[(ActorRef, ActorRef)] = bootstrap.endpointsToActors.get(query.endpointName)
actors match {
case endpoints: Some[(ActorRef, ActorRef)] => endpoints.get
case None => {
logger.error("No endpoints for " + query.endpointName)
throw new IllegalArgumentException("No endpoints for " + query.endpointName)
}
}
}
}
class ESBBootstrap private[ESBBootstrap]() extends LifeCycle{
private val log: Logger = LoggerFactory.getLogger(classOf[ESBBootstrap])
override var serviceState: Any = "Stopped"
private val saksConfig: Config = S5AConfigFactory.makeSaksConfig("cmp")
val config = saksConfig
private lazy val actorSystem: ActorSystem = ActorSystem.create("cmp-actors")
private lazy val jmsServer: EmbeddedJMS = {
val jms = new EmbeddedJMS
jms.start()
jms
}
private lazy val camelContext: DefaultCamelContext = CamelExtension(actorSystem).context
//TODO: Use a case class instead of tuple
private val endpointsToActors: scala.collection.mutable.Map[String, (ActorRef, ActorRef)] = scala.collection.mutable.Map()
override def startup() {
serviceState = "Starting"
log.info("CMP starting")
val cf = jmsServer.lookup("ConnectionFactory").asInstanceOf[ConnectionFactory]
val jmsComponent = JmsComponent.jmsComponentAutoAcknowledge(cf)
if(camelContext.hasComponent("jms") == null)
camelContext.addComponent("jms", jmsComponent)
val config = jmsComponent.getConfiguration
config.setReceiveTimeout(25L)
jmsComponent.setConfiguration(config)
camelContext.start
serviceState = "Running"
log.info("started hornetq, now starting actors")
val endpointNames:java.util.List[String] = saksConfig.getStringList("cmp.endpointNames")
var i = 0
while(i < endpointNames.size){
val endpoint = endpointNames.get(i)
val request = saksConfig.getString("cmp.endpoints." + endpointNames.get(i) + ".request")
val response = saksConfig.getString("cmp.endpoints." + endpointNames.get(i) + ".response")
log.debug("starting actors for: " + endpoint + "|" + request + "|" + response)
val requestActor = actorSystem.actorOf(RequestQueueProducer.props(request))
val responseActor = actorSystem.actorOf(ResponseTopicConsumer.props(response))
val actorTuple = (requestActor, responseActor)
endpointsToActors.put(endpoint, actorTuple)
i = i + 1
}
log.info("CMP started")
}
override def activate() {}
override def deactivate() {}
override def shutdown() {
serviceState = "Stopping"
camelContext.stop
actorSystem.shutdown
actorSystem.awaitTermination(5.second)//intellij highlights this as an error, but it isn't
jmsServer.stop
serviceState = "Stopped"
}
}
package com.s5a.cmp.scala.esb
import akka.actor.{ActorLogging, ActorRef, Props}
import akka.camel.{CamelMessage, Consumer}
import scala.collection.mutable.Map
import play.api.libs.json._
import com.s5a.service.messages.Query
object ResponseTopicConsumer {
def props(endpointUri: String): Props =
Props(classOf[ResponseTopicConsumer], endpointUri)
}
class ResponseTopicConsumer(override val endpointUri: String) extends Consumer with ActorLogging {
//todo -- we need a ttl on the entries here
//map of query uuid to aggregator actor
val waitlist: Map[String, ActorRef] = Map()
/**
* This function will be invoked when we're using this actor as a
* router. We need it so that we avoid a race condition, of
* sending requests to a queue, before we've started listening to the
* response topic.
* @return
*/
def handleListenReq():Receive = {
case ListenFor(aggregator,query,producer) =>
waitlist.put(query.id.toString, aggregator)
producer ! CamelMessage(
Json.toJson(query).toString(),
scala.collection.immutable.Map(CamelMessage.MessageExchangeId -> query.id.toString))
}
/**
* This function will be invoked with this actor is behaving
* as a real topic consumer.
* @return
*/
override def receive: Receive = handleListenReq() orElse {
case msg: CamelMessage =>
log.debug("ResponseTopicConsumer Consumed: " + msg)
notifyTarget(msg)
}
def notifyTarget(response: CamelMessage){
val correlationId = response.getHeaderAs(CamelMessage.MessageExchangeId, classOf[String], camelContext)
log.debug("marshalling: " + response.body.toString)
val queryResponse: JsValue = Json.parse(response.body.toString)
val aggregatorOption = waitlist.get(correlationId)
aggregatorOption match {
case Some(aggregator) => aggregator ! queryResponse
case None => log.warning("message with correlation id " + correlationId
+ " did not have a corresponding aggregator\n" + response.body)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment