Skip to content

Instantly share code, notes, and snippets.

@sirmes
Last active December 28, 2015 05:59
Show Gist options
  • Select an option

  • Save sirmes/7453768 to your computer and use it in GitHub Desktop.

Select an option

Save sirmes/7453768 to your computer and use it in GitHub Desktop.
PRS - Proposed changes for ResponseAggregator and ProductDataRepository
package com.s5a.service.product_repo
import com.s5a.service.core.akka.EndpointUriSet
import akka.camel.CamelMessage
import com.s5a.service.core.akka.SimpleProducer
import play.api.libs.json._
import play.api.libs.json.Json._
import play.api.libs.json.JsArray
import play.api.libs.json.JsSuccess
import play.api.libs.json.JsString
import play.api.libs.json.JsBoolean
import play.api.libs.json.JsNumber
import com.s5a.product_service.repo.ProductRepoResponseAggregator
import akka.util.Timeout
import akka.actor._
import scala.concurrent.duration._
import com.s5a.service.messages.{ProductByUpc, ProductByProductCode, Request}
import com.s5a.product_service.{ProductRepoMongo, ProductRepo}
object ProductDataRepository {
def producerFor(msg: CamelMessage, endpoints: EndpointUriSet, context: ActorContext):ActorRef = {
context.actorOf(SimpleProducer.props(endpoints.response), "responseProducer")
}
def props(endpoints: EndpointUriSet,
producerFor:
(CamelMessage, EndpointUriSet, ActorContext) => ActorRef = ProductDataRepository.producerFor, productRepo: ProductRepo = new ProductRepoMongo) : Props = {
Props(classOf[ProductDataRepository], endpoints, producerFor, productRepo)
}
}
class ProductDataRepository(endpoints: EndpointUriSet,
producerFor: (CamelMessage, EndpointUriSet, ActorContext) => ActorRef, productRepo : ProductRepo) extends Actor with ActorLogging {
implicit val timeout = Timeout(1 seconds)
override def receive: Receive = {
//producer is responsible for writing answer back to requester
case msg: CamelMessage =>
val producer = producerFor(msg, endpoints, context)
val response = responseTo(msg,producer)
case _ =>
// TODO: When we isolate the CamelMessage case class in ServiceCore and we have a body (string)
// We will not need this error handling anymore
throw new IllegalArgumentException("ProductDataRepository received bad message")
}
def responseTo(msg: CamelMessage, producer:ActorRef): Unit = {
//TODO: Pull request from Jim / CMP how to handle this
// val request = buildRequest(Option(msg.body.asInstanceOf[AnyRef]).map(_.toString).get)//crash when it null
//1st cycle
Option(msg.body.asInstanceOf[AnyRef]).map(_.toString).map { s =>
val request = buildRequest(s)
context.actorOf(ProductRepoResponseAggregator.props(producer,request, 1.second, productRepo))
} getOrElse {
producer ! "error message"
}
//alternative
Option(msg.body.asInstanceOf[AnyRef]).map(_.toString) match {
case Some(s) =>
val request = buildRequest(s)
context.actorOf(ProductRepoResponseAggregator.props(producer,request, 1.second, productRepo))
case None =>
producer ! "error message"
}
}
// Ideally, this would be a in a trait we could apply to CamelMessages...
def correlatedResponse(response: CamelMessage, request: CamelMessage): CamelMessage = {
val correlationId = response.headers.get(CamelMessage.MessageExchangeId)
val correlatedHeaders = response.headers + (CamelMessage.MessageExchangeId -> correlationId)
CamelMessage(response.body, correlatedHeaders)
}
//TODO: All the code below will be remove as soon as we have a clean way to cast from string to case classes
private def buildRequest(s : String) : Request = {
s.isEmpty || s == null match {
case false => {
val jsValue = Json.parse(s)
val map = Json.fromJson(jsValue)
s match {
case s: String if s.contains("product_code") => Request(Seq(ProductByProductCode(map.get("product_code").toString)))
case s: String if s.contains("upc_code") => Request(Seq(ProductByUpc(map.get("upc_code").toString)))
}
}
}
}
implicit val objectMapFormat = new Format[Map[String, Any]] {
def writes(map: Map[String, Any]): JsValue =
Json.obj(map.map
{ case (string, any) =>
val ret:(String, JsValueWrapper) = any match {
case _:String => string -> JsString(any.asInstanceOf[String])
case _:Int => string -> JsNumber( BigDecimal(any.asInstanceOf[Int]))
case _:Boolean => string -> JsBoolean(any.asInstanceOf[Boolean])
case _ => string -> JsArray(any.asInstanceOf[List[String]].map(JsString(_)))
}
ret
}.toSeq:_*)
def reads(jv: JsValue): JsResult[Map[String, Any]] =
JsSuccess(jv.as[Map[String, JsValue]].map{case (k, v) =>
k -> (v match {
case s:JsString => s.as[String]
case i:JsNumber => i.as[BigDecimal]
case b:JsBoolean => b.as[Boolean]
case l => l.as[List[String]]
})
})
}
}
package com.s5a.product_service.repo
import akka.actor._
import scala.concurrent.duration._
import play.api.libs.json._
import org.joda.time.Instant
import com.s5a.product_service._
import play.libs.Json
import com.s5a.api.models.Product
import scala.concurrent._
import ExecutionContext.Implicits.global
import akka.camel.CamelMessage
import com.s5a.service.messages.{ResponseState, ResponseStates, Request, JSONResponse, ProductByProductCode, ProductByUpc}
trait ResponseAggregatorBase extends Actor with ActorLogging {
def request: Request
def aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse
def runQueriesFunc: (ActorRef, Request) => Unit
def deliverFunc: JSONResponse => Unit
def timeout: Duration
def accumulate(current: JSONResponse): Receive = {
case ReceiveTimeout =>
deliverFunc(current)
context.stop(self)
case response: JSONResponse =>
val newResponse = aggregatorFunc(current, response)
if (newResponse.isComplete) {
deliverFunc(newResponse)
context.setReceiveTimeout(Duration.Undefined)
context.stop(self)
} else context.become(accumulate(newResponse))
}
def receive: Receive = {
context.setReceiveTimeout(timeout)
runQueriesFunc(self, request)
accumulate(JSONResponse(request))
}
}
class ResponseAggregator(val request: Request,
val aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse,
val runQueriesFunc: (ActorRef, Request) => Unit,
val deliverFunc: JSONResponse => Unit,
val timeout: Duration) extends ResponseAggregatorBase
object ResponseAggregator {
def props(request: Request,
aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse,
runQueriesFunc: (ActorRef, Request) => Unit,
deliverFunc: JSONResponse => Unit,
timeout: Duration): Props =
Props(classOf[ResponseAggregator], request, aggregatorFunc, runQueriesFunc, deliverFunc, timeout)
}
object ResponseAggregatorHelpers {
def keysChecker(completeKeys: Set[String], sufficientKeys: Set[String])(testKeys: Set[String]): ResponseState =
if (testKeys.isEmpty) ResponseStates.Empty
else if (completeKeys.subsetOf(testKeys)) ResponseStates.Complete
else if (sufficientKeys.subsetOf(testKeys)) ResponseStates.Sufficient
else ResponseStates.Incomplete
def keysCheckerCompleteOnly(completeKeys: Set[String]): Set[String] => ResponseState =
keysChecker(completeKeys, completeKeys) _
def simpleAggregator(keysStateChecker: Set[String] => ResponseState)(sum: JSONResponse, next: JSONResponse): JSONResponse = {
val nr = (sum.body, next.body) match {
case (JsNull, o: JsObject) => sum.copy(body = o,
timestamp = Instant.now(),
errors = sum.errors ++ next.errors)
case (o1: JsObject, o2: JsObject) => sum.copy(body = o1 ++ o2,
timestamp = Instant.now(),
errors = sum.errors ++ next.errors)
case _ => sum // Can't aggregate otherwise
}
nr.body match {
case JsNull => nr.copy(state = ResponseStates.Empty)
case o: JsObject => nr.copy(state = keysStateChecker(o.keys.toSet))
case _ => nr // probably should empty out this sucker
}
}
}
trait ProductRepoResponseAggregatorBase extends ResponseAggregatorBase {
import ResponseAggregatorHelpers._
def getProductByProductCode(x:String):Option[Product]
def getProductByUpc(x:String):Option[Product]
def aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse =
simpleAggregator(keysCheckerCompleteOnly(Set("product_code")))
def runQueriesMethod(target: ActorRef, req: Request):Unit = {
// Not super ideal, but we cannot block the Actor main-thread
future {
val response = req.toResponse
req.queries.filter(_.isInstanceOf[ProductQuery]).map(_.asInstanceOf[ProductQuery]) map {
x =>
@unchecked //This removed the warnings about the case classes not tested(covered) in the pattern match below
val product:Option[Product] = x match {
case ProductByProductCode(x, _) => getProductByProductCode(x)
case ProductByUpc(x, _) => getProductByUpc(x)
}
product match {
case Some(p) =>
// Not ideal, but the best we have for the moment
val productJSON = Json.toJson(p)
val productStr = productJSON.toString
val productJsValue = play.api.libs.json.Json.parse(productStr)
log.info("product string: " + productStr)
target ! response.withBody(productJsValue, ResponseStates.Complete)
case None =>
target ! response // An empty result - nothing to do here
}
}
}
}
val runQueriesFunc = runQueriesMethod _
}
class ProductRepoResponseAggregator(val target: ActorRef,
val request: Request,
val timeout: Duration,
val productRepo : ProductRepo) extends ProductRepoResponseAggregatorBase {
def getProductByProductCode(x:String):Option[Product] = productRepo.getProductByProductCode(x)
def getProductByUpc(x:String):Option[Product] = productRepo.getProductByUpc(x)
val deliverFunc: JSONResponse => Unit =
(r => { target ! CamelMessage(r.toString, Map()) })
}
object ProductRepoResponseAggregator {
def props(producer:ActorRef,
request: Request,
timeout: Duration,
productRepo: ProductRepo): Props =
Props(classOf[ProductRepoResponseAggregator], producer, request, timeout, productRepo)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment