Skip to content

Commit

Permalink
Merge pull request #5 from JasonMWhite/statsd_plus_tags
Browse files Browse the repository at this point in the history
Statsd plus tags
  • Loading branch information
JasonMWhite committed Oct 6, 2015
2 parents 9f7398f + fe52a9e commit 0f02931
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/main/scala/org/apache/spark/DatadogRelay.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ import org.apache.spark.scheduler._
import org.apache.spark.executor._
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.spark.tagUtils._

class DatadogRelay(conf: SparkConf) extends SparkFirehoseListener {

implicit val tags: List[String] = {
val datadogTags = conf.get("spark.datadog.tags", "")
if (datadogTags == "") List() else datadogTags.split(",").toList
}

val statsdOption: Option[NonBlockingStatsDClient] = {
try {
Some(new NonBlockingStatsDClient("spark", "localhost", 8125))
Expand Down
32 changes: 32 additions & 0 deletions src/main/scala/org/apache/spark/StatsDClientWithTag.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.spark

import java.nio.charset.Charset
import com.timgroup.statsd.{NonBlockingStatsDClient, StatsDClientException, StatsDClientErrorHandler}
import scala.reflect.runtime.{universe => ru}
import scala.reflect.runtime.universe._

package object tagUtils {
implicit class StatsDClientWithTag(val client: NonBlockingStatsDClient)(implicit val tags: List[String]) {
val originalMessageFor = {
val m = ru.runtimeMirror(client.getClass.getClassLoader)
val im = m.reflect(client)
val symbol = ru.typeOf[NonBlockingStatsDClient].member(newTermName("messageFor"))

// Method of interest is overridden: pick the one with 4 arguments
val method = symbol.asTerm.alternatives.filter(m => m.asMethod.paramss.head.length == 4).head.asMethod
im.reflectMethod(method)
}

def tagSuffix(): String = {
if (tags.length == 0) "" else "|#" + tags.mkString(",")
}

def messageFor(aspect: String, value: String, `type`: String, sampleRate: Double): String = {
originalMessageFor(aspect, value, `type`, sampleRate) + tagSuffix
}

def messageFor(aspect: String, value: String, `type`: String): String = {
originalMessageFor(aspect, value, `type`, 1.0) + tagSuffix
}
}
}

0 comments on commit 0f02931

Please sign in to comment.