-
-
Save ibuenros/9b94736c2bad2f4b8e23 to your computer and use it in GitHub Desktop.
| //================================================================== | |
| // SPARK INSTRUMENTATION | |
| //================================================================== | |
| import com.codahale.metrics.{MetricRegistry, Meter, Gauge} | |
| import org.apache.spark.{SparkEnv, Accumulator} | |
| import org.apache.spark.metrics.source.Source | |
| import org.joda.time.DateTime | |
| import scala.collection.mutable | |
| /** Instrumentation for Spark based on accumulators. | |
| * | |
| * Usage: | |
| * val instrumentation = new SparkInstrumentation("example.metrics") | |
| * val numReqs = sc.accumulator(0L) | |
| * instrumentation.source.registerDailyAccumulator(numReqs, "numReqs") | |
| * instrumentation.register() | |
| * | |
| * Will create and report the following metrics: | |
| * - Gauge with total number of requests (daily) | |
| * - Meter with rate of requests | |
| * | |
| * @param prefix prefix for all metrics that will be reported by this Instrumentation | |
| */ | |
| class SparkInstrumentation(prefix: String) extends Serializable { | |
| val accumulators = mutable.Set[Accumulator[Long]]() | |
| private class InstrumentationSource(prefix: String) extends Source { | |
| val metricRegistry = new MetricRegistry | |
| val sourceName = prefix | |
| val oldgauges = mutable.Map[String,Long]() | |
| val oldtimes = mutable.Map[String, DateTime]() | |
| val meters = mutable.Map[String,Meter]() | |
| /** Computes metrics based on accumulator. Gauge never resets. | |
| * | |
| * @param a Metrics will be derived from this accumulator | |
| * @param name Name of the metrics | |
| */ | |
| def registerAccumulator(a: Accumulator[Long], name: String){ | |
| oldgauges += (name -> 0L) | |
| meters += (name -> metricRegistry.meter(name + "-rate")) | |
| metricRegistry.register(MetricRegistry.name(name), | |
| new Gauge[Long] { | |
| override def getValue: Long = { | |
| meters(name).mark(a.value - oldgauges(name)) | |
| oldgauges(name) = a.value | |
| return a.value | |
| } | |
| }) | |
| } | |
| /** Computes metrics based on accumulator. Gauge resets at the end of the day. | |
| * | |
| * @param a Metrics will be derived from this accumulator | |
| * @param name Name of the metrics | |
| */ | |
| def registerDailyAccumulator(a: Accumulator[Long], name: String){ | |
| oldgauges += (name -> 0L) | |
| meters += (name -> metricRegistry.meter(name + "-rate")) | |
| oldtimes += (name -> DateTime.now) | |
| metricRegistry.register(MetricRegistry.name(name), | |
| new Gauge[Long] { | |
| override def getValue: Long = { | |
| meters(name).mark(a.value - oldgauges(name)) | |
| val now = DateTime.now | |
| if (now.getDayOfMonth != oldtimes(name).getDayOfMonth){ | |
| a.setValue(0L) | |
| } | |
| oldtimes(name) = now | |
| oldgauges(name) = a.value | |
| return a.value | |
| } | |
| }) | |
| } | |
| } | |
| val source = new InstrumentationSource(prefix) | |
| /** Register the Instrumentation with Spark so the metrics are reported to any provided Sink. */ | |
| def register(){ | |
| SparkEnv.get.metricsSystem.registerSource(source) | |
| } | |
| } | |
| //============================================ | |
| // STREAMING LAUNCHER / SERVER | |
| //============================================ | |
| import scalax.io.Resource | |
| import org.apache.spark.deploy.yarn.{Client, ClientArguments} | |
| import org.apache.spark.{Logging, SparkConf} | |
| import org.apache.hadoop.yarn.api.records.{ApplicationReport, YarnApplicationState, ApplicationId} | |
| import org.apache.hadoop.yarn.client.ClientRMProxy | |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol | |
| import org.apache.hadoop.yarn.api.protocolrecords.{GetApplicationsResponse, GetApplicationsRequest} | |
| import org.eclipse.jetty.server.{Request, Handler, Server} | |
| import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, AbstractHandler} | |
| import javax.servlet.http.{HttpServletResponse, HttpServletRequest} | |
| import com.lambdaworks.jacks.JacksMapper | |
| import scala.annotation.tailrec | |
| import org.eclipse.jetty.util.thread.QueuedThreadPool | |
| import scala.util.{Failure, Success, Try} | |
| /** Local launcher client for streaming applications in YARN. | |
| * | |
| * Extends usual Spark YARN client for streaming applications. This class should not be called by the user, | |
| * instead, the StreamingClient object is the entry point for launching applications. | |
| * | |
| * @param args User supplied arguments | |
| * @param sparkConf Spark Configuration | |
| */ | |
| class StreamingClient(args: ClientArguments, sparkConf: SparkConf) extends Client(args, sparkConf) { | |
| import scala.collection.JavaConversions._ | |
| import scala.collection.JavaConverters._ | |
| var appIdOption: Option[ApplicationId] = None | |
| val clientHttp = new ClientHttp(this) | |
| val launcherArgs = args | |
| /** Connects to or launches application in YARN cluster. | |
| * | |
| * 1. Search for existing application by the same name. | |
| * 2. If found, monitor existing application. | |
| * 3. If not found, launch new application with this name, and monitor. | |
| */ | |
| override def run() { | |
| sparkConf.set("spark.yarn.report.interval", "10000") | |
| clientHttp.bind() | |
| println("Using yarn at " + yarnConf.getRaw("fs.defaultFS")) | |
| val pidFile = System.getenv("SPARK_LAUNCHER_PID_DIR") match { | |
| case "" => None | |
| case x => Some(Resource.fromFile("%s/%s.appInfo".format(x,args.appName))) | |
| } | |
| val instances = getSparkApplications(setAsJavaSet(Set("SPARK")),java.util.EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED, YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED)).flatMap { report => | |
| if (report.getName == args.appName){ | |
| Some(report.getApplicationId) | |
| } else { | |
| None | |
| } | |
| }.toList | |
| if (instances.size > 0){ | |
| println("Application already running. Monitoring old application.") | |
| init(yarnConf) | |
| start() | |
| appIdOption = Some(instances.head) | |
| monitorApplication(appIdOption.get) | |
| System.exit(0) | |
| } else{ | |
| //super.stop() | |
| println("Application not found.") | |
| appIdOption = Some(runApp()) | |
| if (!appIdOption.isDefined){ | |
| println("Application didn't start correctly") | |
| System.exit(1) | |
| } | |
| if (pidFile.isDefined) { | |
| pidFile.get.write("%s %s".format(appIdOption.get.toString, args.userArgs.mkString(" "))) | |
| System.exit(0) | |
| } | |
| monitorApplication(appIdOption.get) | |
| System.exit(0) | |
| } | |
| } | |
| /** Gets list of Spark applications in YARN cluster */ | |
| def getSparkApplications(applicationTypes: java.util.Set[String], applicationStates: java.util.EnumSet[YarnApplicationState]): java.util.List[ApplicationReport] = { | |
| setConfig(yarnConf) | |
| val rmClient = ClientRMProxy.createRMProxy(getConfig, classOf[ApplicationClientProtocol]) | |
| val request: GetApplicationsRequest = GetApplicationsRequest.newInstance(applicationTypes, applicationStates) | |
| val response: GetApplicationsResponse = rmClient.getApplications(request) | |
| return response.getApplicationList | |
| } | |
| } | |
| /** Local launcher client for streaming applications in YARN. | |
| * | |
| * Usage: | |
| * java -cp /etc/hadoop/conf:AppJar.jar:spark-assembly.jar org.apache.spark.yarn.StreamingClient --jar AppJar.jar | |
| * --addJars /jars/config.jar --class ooyala.app.MainClass --arg arg1 --arg arg2 --name MyApp | |
| * | |
| */ | |
| object StreamingClient { | |
| def main(argStrings: Array[String]) { | |
| // Set an env variable indicating we are running in YARN mode. | |
| // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes - | |
| // see Client#setupLaunchEnv(). | |
| System.setProperty("SPARK_YARN_MODE", "true") | |
| val sparkConf = new SparkConf() | |
| val args = new ClientArguments(argStrings, sparkConf) | |
| new StreamingClient(args, sparkConf).run() | |
| } | |
| } | |
| /** Starts an HTTP server for the launcher client. | |
| * | |
| * Allows to check health of application launcher by querying /healthz route. | |
| * | |
| * @param launcher Server will track status of this launcher client. | |
| */ | |
| class ClientHttp(val launcher: StreamingClient) extends Logging { | |
| val port = 8081 | |
| var boundPort: Option[Int] = None | |
| var server: Option[Server] = None | |
| val handlers = Seq[(String, Handler)]( | |
| ("/healthz", healthHandler) | |
| ) | |
| /** /healthz route handler | |
| * | |
| * Reports health of the launcher and publishes information of the application | |
| */ | |
| def healthHandler: Handler = { | |
| new AbstractHandler { | |
| override def handle(target: String, baseRequest: Request, request: HttpServletRequest, response: HttpServletResponse): Unit = { | |
| response.setContentType("application/json") | |
| if (!launcher.appIdOption.isDefined) { | |
| response.setStatus(HttpServletResponse.SC_NOT_FOUND) | |
| val res = Map( | |
| "LauncherStatus" -> "Application Not Found" | |
| ) | |
| baseRequest.setHandled(true) | |
| response.getWriter.println(JacksMapper.writeValueAsString(res)) | |
| return | |
| } | |
| val report = launcher.getApplicationReport(launcher.appIdOption.get) | |
| response.setStatus(HttpServletResponse.SC_OK) | |
| baseRequest.setHandled(true) | |
| val res = Map( | |
| "LauncherStatus" -> "Online", | |
| "YarnCluster" -> launcher.yarnConf.getRaw("fs.defaultFS"), | |
| "ApplicationId" -> launcher.appIdOption.get.toString, | |
| "ApplicationStatus" -> report.getYarnApplicationState, | |
| "StartedAt" -> report.getStartTime.toString, | |
| "TrackingURL" -> report.getTrackingUrl.toString, | |
| "ApplicationName" -> launcher.launcherArgs.appName | |
| ) | |
| response.getWriter.println(JacksMapper.writeValueAsString(res)) | |
| } | |
| } | |
| } | |
| /** | |
| * Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers. | |
| * | |
| * If the desired port number is contented, continues incrementing ports until a free port is | |
| * found. Returns the chosen port and the jetty Server object. | |
| */ | |
| def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = { | |
| val handlersToRegister = handlers.map { case (path, handler) => | |
| val contextHandler = new ContextHandler(path) | |
| contextHandler.setAllowNullPathInfo(true) | |
| contextHandler.setHandler(handler) | |
| contextHandler | |
| } | |
| val handlerList = new HandlerList | |
| handlerList.setHandlers(handlersToRegister.toArray) | |
| @tailrec | |
| def connect(currentPort: Int): (Server, Int) = { | |
| val server = new Server(currentPort) | |
| val pool = new QueuedThreadPool | |
| pool.setDaemon(true) | |
| server.setThreadPool(pool) | |
| server.setHandler(handlerList) | |
| Try { | |
| server.start() | |
| } match { | |
| case s: Success[_] => | |
| (server, server.getConnectors.head.getLocalPort) | |
| case f: Failure[_] => | |
| server.stop() | |
| logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) | |
| logInfo("Error was: " + f.toString) | |
| connect((currentPort + 1) % 65536) | |
| } | |
| } | |
| connect(port) | |
| } | |
| def bind() { | |
| try { | |
| val (srv, usedPort) = startJettyServer("0.0.0.0", port, handlers) | |
| logInfo("Started Streaming Launcher UI at port %d".format(usedPort)) | |
| server = Some(srv) | |
| boundPort = Some(usedPort) | |
| } catch { | |
| case e: Exception => | |
| logError("Failed to create Spark JettyUtils", e) | |
| System.exit(1) | |
| } | |
| } | |
| } | |
| //=============================================== | |
| // DATADOG SINK | |
| //=============================================== | |
| import com.codahale.metrics.MetricRegistry | |
| // Requires com.clipperz.metrics-datadog artifact (not in Maven) | |
| // Compile from https://github.com/clipperz/metrics-datadog | |
| import com.codahale.metrics.reporting.{DatadogReporter, HttpTransport} | |
| import java.util.concurrent.TimeUnit | |
| import java.util.Properties | |
| import org.apache.spark.metrics.sink.Sink | |
| /** Sink to report metrics to Datadog */ | |
| class DatadogSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink { | |
| val DD_KEY_PERIOD = "period" | |
| val DD_DEFAULT_PERIOD = 10L | |
| val DD_KEY_UNIT = "unit" | |
| val DD_DEFAULT_UNIT = TimeUnit.SECONDS | |
| val DD_API_KEY = "apikey" | |
| val DD_KEY_HOST = "host" | |
| val DD_DEFAULT_HOST = "" | |
| def propertyToOption(prop: String) = Option(property.getProperty(prop)) | |
| if (!propertyToOption(DD_API_KEY).isDefined) { | |
| throw new Exception("Datadog sink requires 'apikey' property.") | |
| } | |
| val pollPeriod = propertyToOption(DD_KEY_PERIOD).map(_.toLong) | |
| .getOrElse(DD_DEFAULT_PERIOD) | |
| val pollUnit = propertyToOption(DD_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) | |
| .getOrElse(DD_DEFAULT_UNIT) | |
| val host = propertyToOption(DD_KEY_HOST).getOrElse(DD_DEFAULT_HOST) | |
| val apikey = propertyToOption(DD_API_KEY).get | |
| val transport = new HttpTransport("app.datadoghq.com",apikey) | |
| val reporter = DatadogReporter.forRegistry(registry) | |
| .convertRatesTo(TimeUnit.SECONDS) | |
| .convertDurationsTo(TimeUnit.MILLISECONDS) | |
| .build(transport,host) | |
| override def start { | |
| reporter.start(pollPeriod, pollUnit) | |
| } | |
| override def stop { | |
| reporter.stop() | |
| } | |
| } |
I tried mimicking the custom metrics class and registered it but sadly it does not get written to Graphite sink
. This is structured streaming spark hence I am not sure whether there is a limitation on that , but I believe it shouldnt be a limitation . Attaching code snippets.
package com.hari.spark.kafka.struct.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.Dataset
import org.apache.spark.Accumulator
import com.codahale.metrics._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types.StructType
object ReadWriteKafka {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.appName("TestGrafana").getOrCreate
import sparkSession.implicits._
val rowsPassed = sparkSession.sparkContext.accumulator[Long](0, "rowsPassed")
CustomRowsProcessedMetrics.regiserCustomMetrics("rowsPassed", rowsPassed)
val readFromKafka = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").option("subscribe", "GrafanaSource").
option("enable.auto.commit", "false").option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer").option("failOnDataLoss", "false").option("auto.offset.reset", "latest").load
// perform same transformation
//implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
val keyValueTuples = readFromKafka.selectExpr("CAST(key as STRING)", "CAST(value as STRING)").as[(String, String)]
val rowProcessedStore = countRowsProcessed(rowsPassed,keyValueTuples.filter(tup => tup._2.contains("ichigo")).map(tup => (tup._1, tup._2.toUpperCase)))
rowProcessedStore.printSchema
val writeToKafka = rowProcessedStore.writeStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").
option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("checkpointLocation", "/Hari/hdfs/staging1/").
option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("topic", "GrafanaTarget").start
writeToKafka.awaitTermination
}
def countRowsProcessed(acc: Accumulator[Long], ds: Dataset[(String, String)]): Dataset[(String, String)] = {
implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
ds.map {
x =>
acc += 1
x
}.toDF("key", "value").as[(String, String)]
}
}
import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
import org.apache.spark.Accumulator
import scala.collection.mutable.Map
import org.apache.spark.SparkEnv
import org.apache.spark.metrics.source.CustomMetrics
package com.hari.spark.kafka.struct.streaming {
object CustomRowsProcessedMetrics extends Serializable {
def regiserCustomMetrics(metricsName: String, acc: Accumulator[Long]) {
import org.apache.spark.metrics.source.CustomMetrics
val customMets = new CustomMetrics(metricsName, Map[String, Long](), Map[String, Meter]())
customMets.regGaugeAndMeter(acc, metricsName)
SparkEnv.get.metricsSystem.registerSource(customMets)
val test = SparkEnv.get.metricsSystem.getSourcesByName(metricsName)
println(test)
}
}
}
package org.apache.spark.metrics.source {
class CustomMetrics(metricsName: String, gauge1: Map[String, Long], metrics: Map[String, Meter]) extends Source {
def metricRegistry = new MetricRegistry()
def sourceName = metricsName
// update the metrics with time series
def regGaugeAndMeter(acc: Accumulator[Long], metricsName: String): Unit = {
import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
gauge1 += (metricsName -> 0L)
metrics += (metricsName -> metricRegistry.meter(metricsName+"-rate"))
metricRegistry.register(
MetricRegistry.name(metricsName),
new Gauge[Long] {
override def getValue: Long = {
metrics(metricsName).mark(acc.value - gauge1(metricsName))
gauge1(metricsName) = acc.value
println("The incremented values are ---> " +acc.value )
return acc.value
}
})
}
}
}
Sadly in 2016 this is still the best solution for user defined accumulator metrics.