Last active
December 28, 2015 05:59
-
-
Save sirmes/7453768 to your computer and use it in GitHub Desktop.
PRS - Proposed changes for ResponseAggregator and ProductDataRepository
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.s5a.service.product_repo | |
| import com.s5a.service.core.akka.EndpointUriSet | |
| import akka.camel.CamelMessage | |
| import com.s5a.service.core.akka.SimpleProducer | |
| import play.api.libs.json._ | |
| import play.api.libs.json.Json._ | |
| import play.api.libs.json.JsArray | |
| import play.api.libs.json.JsSuccess | |
| import play.api.libs.json.JsString | |
| import play.api.libs.json.JsBoolean | |
| import play.api.libs.json.JsNumber | |
| import com.s5a.product_service.repo.ProductRepoResponseAggregator | |
| import akka.util.Timeout | |
| import akka.actor._ | |
| import scala.concurrent.duration._ | |
| import com.s5a.service.messages.{ProductByUpc, ProductByProductCode, Request} | |
| import com.s5a.product_service.{ProductRepoMongo, ProductRepo} | |
| object ProductDataRepository { | |
| def producerFor(msg: CamelMessage, endpoints: EndpointUriSet, context: ActorContext):ActorRef = { | |
| context.actorOf(SimpleProducer.props(endpoints.response), "responseProducer") | |
| } | |
| def props(endpoints: EndpointUriSet, | |
| producerFor: | |
| (CamelMessage, EndpointUriSet, ActorContext) => ActorRef = ProductDataRepository.producerFor, productRepo: ProductRepo = new ProductRepoMongo) : Props = { | |
| Props(classOf[ProductDataRepository], endpoints, producerFor, productRepo) | |
| } | |
| } | |
| class ProductDataRepository(endpoints: EndpointUriSet, | |
| producerFor: (CamelMessage, EndpointUriSet, ActorContext) => ActorRef, productRepo : ProductRepo) extends Actor with ActorLogging { | |
| implicit val timeout = Timeout(1 seconds) | |
| override def receive: Receive = { | |
| //producer is responsible for writing answer back to requester | |
| case msg: CamelMessage => | |
| val producer = producerFor(msg, endpoints, context) | |
| val response = responseTo(msg,producer) | |
| case _ => | |
| // TODO: When we isolate the CamelMessage case class in ServiceCore and we have a body (string) | |
| // We will not need this error handling anymore | |
| throw new IllegalArgumentException("ProductDataRepository received bad message") | |
| } | |
| def responseTo(msg: CamelMessage, producer:ActorRef): Unit = { | |
| //TODO: Pull request from Jim / CMP how to handle this | |
| // val request = buildRequest(Option(msg.body.asInstanceOf[AnyRef]).map(_.toString).get)//crash when it null | |
| //1st cycle | |
| Option(msg.body.asInstanceOf[AnyRef]).map(_.toString).map { s => | |
| val request = buildRequest(s) | |
| context.actorOf(ProductRepoResponseAggregator.props(producer,request, 1.second, productRepo)) | |
| } getOrElse { | |
| producer ! "error message" | |
| } | |
| //alternative | |
| Option(msg.body.asInstanceOf[AnyRef]).map(_.toString) match { | |
| case Some(s) => | |
| val request = buildRequest(s) | |
| context.actorOf(ProductRepoResponseAggregator.props(producer,request, 1.second, productRepo)) | |
| case None => | |
| producer ! "error message" | |
| } | |
| } | |
| // Ideally, this would be a in a trait we could apply to CamelMessages... | |
| def correlatedResponse(response: CamelMessage, request: CamelMessage): CamelMessage = { | |
| val correlationId = response.headers.get(CamelMessage.MessageExchangeId) | |
| val correlatedHeaders = response.headers + (CamelMessage.MessageExchangeId -> correlationId) | |
| CamelMessage(response.body, correlatedHeaders) | |
| } | |
| //TODO: All the code below will be remove as soon as we have a clean way to cast from string to case classes | |
| private def buildRequest(s : String) : Request = { | |
| s.isEmpty || s == null match { | |
| case false => { | |
| val jsValue = Json.parse(s) | |
| val map = Json.fromJson(jsValue) | |
| s match { | |
| case s: String if s.contains("product_code") => Request(Seq(ProductByProductCode(map.get("product_code").toString))) | |
| case s: String if s.contains("upc_code") => Request(Seq(ProductByUpc(map.get("upc_code").toString))) | |
| } | |
| } | |
| } | |
| } | |
| implicit val objectMapFormat = new Format[Map[String, Any]] { | |
| def writes(map: Map[String, Any]): JsValue = | |
| Json.obj(map.map | |
| { case (string, any) => | |
| val ret:(String, JsValueWrapper) = any match { | |
| case _:String => string -> JsString(any.asInstanceOf[String]) | |
| case _:Int => string -> JsNumber( BigDecimal(any.asInstanceOf[Int])) | |
| case _:Boolean => string -> JsBoolean(any.asInstanceOf[Boolean]) | |
| case _ => string -> JsArray(any.asInstanceOf[List[String]].map(JsString(_))) | |
| } | |
| ret | |
| }.toSeq:_*) | |
| def reads(jv: JsValue): JsResult[Map[String, Any]] = | |
| JsSuccess(jv.as[Map[String, JsValue]].map{case (k, v) => | |
| k -> (v match { | |
| case s:JsString => s.as[String] | |
| case i:JsNumber => i.as[BigDecimal] | |
| case b:JsBoolean => b.as[Boolean] | |
| case l => l.as[List[String]] | |
| }) | |
| }) | |
| } | |
| } | |
| package com.s5a.product_service.repo | |
| import akka.actor._ | |
| import scala.concurrent.duration._ | |
| import play.api.libs.json._ | |
| import org.joda.time.Instant | |
| import com.s5a.product_service._ | |
| import play.libs.Json | |
| import com.s5a.api.models.Product | |
| import scala.concurrent._ | |
| import ExecutionContext.Implicits.global | |
| import akka.camel.CamelMessage | |
| import com.s5a.service.messages.{ResponseState, ResponseStates, Request, JSONResponse, ProductByProductCode, ProductByUpc} | |
| trait ResponseAggregatorBase extends Actor with ActorLogging { | |
| def request: Request | |
| def aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse | |
| def runQueriesFunc: (ActorRef, Request) => Unit | |
| def deliverFunc: JSONResponse => Unit | |
| def timeout: Duration | |
| def accumulate(current: JSONResponse): Receive = { | |
| case ReceiveTimeout => | |
| deliverFunc(current) | |
| context.stop(self) | |
| case response: JSONResponse => | |
| val newResponse = aggregatorFunc(current, response) | |
| if (newResponse.isComplete) { | |
| deliverFunc(newResponse) | |
| context.setReceiveTimeout(Duration.Undefined) | |
| context.stop(self) | |
| } else context.become(accumulate(newResponse)) | |
| } | |
| def receive: Receive = { | |
| context.setReceiveTimeout(timeout) | |
| runQueriesFunc(self, request) | |
| accumulate(JSONResponse(request)) | |
| } | |
| } | |
| class ResponseAggregator(val request: Request, | |
| val aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse, | |
| val runQueriesFunc: (ActorRef, Request) => Unit, | |
| val deliverFunc: JSONResponse => Unit, | |
| val timeout: Duration) extends ResponseAggregatorBase | |
| object ResponseAggregator { | |
| def props(request: Request, | |
| aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse, | |
| runQueriesFunc: (ActorRef, Request) => Unit, | |
| deliverFunc: JSONResponse => Unit, | |
| timeout: Duration): Props = | |
| Props(classOf[ResponseAggregator], request, aggregatorFunc, runQueriesFunc, deliverFunc, timeout) | |
| } | |
| object ResponseAggregatorHelpers { | |
| def keysChecker(completeKeys: Set[String], sufficientKeys: Set[String])(testKeys: Set[String]): ResponseState = | |
| if (testKeys.isEmpty) ResponseStates.Empty | |
| else if (completeKeys.subsetOf(testKeys)) ResponseStates.Complete | |
| else if (sufficientKeys.subsetOf(testKeys)) ResponseStates.Sufficient | |
| else ResponseStates.Incomplete | |
| def keysCheckerCompleteOnly(completeKeys: Set[String]): Set[String] => ResponseState = | |
| keysChecker(completeKeys, completeKeys) _ | |
| def simpleAggregator(keysStateChecker: Set[String] => ResponseState)(sum: JSONResponse, next: JSONResponse): JSONResponse = { | |
| val nr = (sum.body, next.body) match { | |
| case (JsNull, o: JsObject) => sum.copy(body = o, | |
| timestamp = Instant.now(), | |
| errors = sum.errors ++ next.errors) | |
| case (o1: JsObject, o2: JsObject) => sum.copy(body = o1 ++ o2, | |
| timestamp = Instant.now(), | |
| errors = sum.errors ++ next.errors) | |
| case _ => sum // Can't aggregate otherwise | |
| } | |
| nr.body match { | |
| case JsNull => nr.copy(state = ResponseStates.Empty) | |
| case o: JsObject => nr.copy(state = keysStateChecker(o.keys.toSet)) | |
| case _ => nr // probably should empty out this sucker | |
| } | |
| } | |
| } | |
| trait ProductRepoResponseAggregatorBase extends ResponseAggregatorBase { | |
| import ResponseAggregatorHelpers._ | |
| def getProductByProductCode(x:String):Option[Product] | |
| def getProductByUpc(x:String):Option[Product] | |
| def aggregatorFunc: (JSONResponse, JSONResponse) => JSONResponse = | |
| simpleAggregator(keysCheckerCompleteOnly(Set("product_code"))) | |
| def runQueriesMethod(target: ActorRef, req: Request):Unit = { | |
| // Not super ideal, but we cannot block the Actor main-thread | |
| future { | |
| val response = req.toResponse | |
| req.queries.filter(_.isInstanceOf[ProductQuery]).map(_.asInstanceOf[ProductQuery]) map { | |
| x => | |
| @unchecked //This removed the warnings about the case classes not tested(covered) in the pattern match below | |
| val product:Option[Product] = x match { | |
| case ProductByProductCode(x, _) => getProductByProductCode(x) | |
| case ProductByUpc(x, _) => getProductByUpc(x) | |
| } | |
| product match { | |
| case Some(p) => | |
| // Not ideal, but the best we have for the moment | |
| val productJSON = Json.toJson(p) | |
| val productStr = productJSON.toString | |
| val productJsValue = play.api.libs.json.Json.parse(productStr) | |
| log.info("product string: " + productStr) | |
| target ! response.withBody(productJsValue, ResponseStates.Complete) | |
| case None => | |
| target ! response // An empty result - nothing to do here | |
| } | |
| } | |
| } | |
| } | |
| val runQueriesFunc = runQueriesMethod _ | |
| } | |
| class ProductRepoResponseAggregator(val target: ActorRef, | |
| val request: Request, | |
| val timeout: Duration, | |
| val productRepo : ProductRepo) extends ProductRepoResponseAggregatorBase { | |
| def getProductByProductCode(x:String):Option[Product] = productRepo.getProductByProductCode(x) | |
| def getProductByUpc(x:String):Option[Product] = productRepo.getProductByUpc(x) | |
| val deliverFunc: JSONResponse => Unit = | |
| (r => { target ! CamelMessage(r.toString, Map()) }) | |
| } | |
| object ProductRepoResponseAggregator { | |
| def props(producer:ActorRef, | |
| request: Request, | |
| timeout: Duration, | |
| productRepo: ProductRepo): Props = | |
| Props(classOf[ProductRepoResponseAggregator], producer, request, timeout, productRepo) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment