From 48b7fb48d0c0263afeb6baa150da893e717ae3bb Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 13 Jul 2020 17:57:09 +0200 Subject: [PATCH] Add per partition read metrics --- CHANGELOG.md | 5 +- README.md | 29 +++++++++ .../dgraph/connector/PartitionMetrics.scala | 60 +++++++++++++++++++ .../TriplePartitionReaderFactory.scala | 2 +- .../spark/dgraph/connector/TripleScan.scala | 3 +- .../connector/model/EdgeTableModel.scala | 10 +++- .../connector/model/GraphTableModel.scala | 11 +++- .../connector/model/NodeTableModel.scala | 10 +++- .../connector/model/TripleTableModel.scala | 10 +++- .../connector/partitioner/Partitioner.scala | 2 +- .../connector/model/TestGraphTableModel.scala | 10 +++- .../connector/sources/TestNodeSource.scala | 3 +- 12 files changed, 138 insertions(+), 17 deletions(-) create mode 100644 src/main/scala/uk/co/gresearch/spark/dgraph/connector/PartitionMetrics.scala diff --git a/CHANGELOG.md b/CHANGELOG.md index b6d51937..e06352e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] - YYYY-MM-DD ### Added -- Added Spark filter pushdown to improve efficiency when loading only a subgraph. Filters like +- Add Spark filter pushdown to improve efficiency when loading only a subgraph. Filters like `.where($"predicate" === "name")` will be pushed to Dgraph and only the relevant graph data will be read ([issue #7](https://github.com/G-Research/spark-dgraph-connector/issues/7)). - Improve performance of `PredicatePartitioner` for multiple predicates per partition. Restoring @@ -14,9 +14,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - The `PredicatePartitioner` combined with `UidRangePartitioner` is the default partitioner now. - Add stream-like reading of partitions from Dgraph. Partitions are split into smaller chunks. Works with predicate partitioning only, so it cannot be combined with uid partitioning. +- Add Dgraph metrics to measure throughput, visible in Spark UI Stages page and through `SparkListener`. ### Security -- Moved Google Guava dependency version fix to 24.1.1-jre due to [known security vulnerability +- Move Google Guava dependency version fix to 24.1.1-jre due to [known security vulnerability fixed in 24.1.1](https://github.com/advisories/GHSA-mvr2-9pj6-7w5j) ## [0.3.0] - 2020-06-22 diff --git a/README.md b/README.md index 84d0d863..cf3d3c4a 100644 --- a/README.md +++ b/README.md @@ -342,6 +342,35 @@ The following table lists all supported Spark filters: |`In` ||| |`IsNotNull` ||| +## Metrics + +The connector collects metrics per partition that provide insights in throughout and timing of the communication +to the Dgraph cluster. For each request to Dgraph (a chunk), the number of received bytes, uids and retrieval time are recorded and +summed per partition. The values can be seen on the Spark UI for the respective stages that performs the read: + +![Dgraph metrics as shown on Spark UI Stages page](static/accumulators.png "Dgraph metrics as shown on Spark UI Stages page") + +The connector uses [Spark Accumulators](http://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/Accumulator.html) +to collect these metrics. They can be accessed by the Spark driver via a `SparkListener`: + + val handler = new SparkListener { + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = + stageCompleted.stageInfo.accumulables.values.foreach(println) + } + + spark.sparkContext.addSparkListener(handler) + spark.read.dgraphTriples("localhost:9080").count() + + +The following metrics are available: + +|Metric|Description| +|------|-----------| +|`Dgraph Bytes`|Size of JSON responses from the Dgraph cluster in Byte.| +|`Dgraph Chunks`|Number of requests sent to the Dgraph cluster.| +|`Dgraph Time`|Time waited for Dgraph to respond in Seconds.| +|`Dgraph Uids`|Number of Uids read.| + ## Special Use Cases diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/PartitionMetrics.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/PartitionMetrics.scala new file mode 100644 index 00000000..170da42c --- /dev/null +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/PartitionMetrics.scala @@ -0,0 +1,60 @@ +package uk.co.gresearch.spark.dgraph.connector + +import org.apache.spark.SparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{DoubleAccumulator, LongAccumulator} + +trait PartitionMetrics { + + def incReadBytes(bytes: Long): Unit + + def incReadUids(uids: Long): Unit + + def incReadChunks(chunks: Long): Unit + + def incChunkTime(time: Double): Unit + +} + +case class AccumulatorPartitionMetrics(readBytes: LongAccumulator, + readUids: LongAccumulator, + readChunks: LongAccumulator, + chunkTime: DoubleAccumulator) + extends PartitionMetrics with Serializable { + + override def incReadBytes(bytes: Long): Unit = { + println(s"increment accum $readBytes") + readBytes.add(bytes) + } + + override def incReadUids(uids: Long): Unit = readUids.add(uids) + + override def incReadChunks(chunks: Long): Unit = readChunks.add(chunks) + + override def incChunkTime(time: Double): Unit = chunkTime.add(time) + +} + +object AccumulatorPartitionMetrics { + def apply(): AccumulatorPartitionMetrics = AccumulatorPartitionMetrics(SparkSession.builder().getOrCreate()) + + def apply(spark: SparkSession): AccumulatorPartitionMetrics = AccumulatorPartitionMetrics(spark.sparkContext) + + def apply(context: SparkContext): AccumulatorPartitionMetrics = + AccumulatorPartitionMetrics( + context.longAccumulator("Dgraph Bytes"), + context.longAccumulator("Dgraph Uids"), + context.longAccumulator("Dgraph Chunks"), + context.doubleAccumulator("Dgraph Time") + ) +} + +case class NoPartitionMetrics() extends PartitionMetrics with Serializable { + override def incReadBytes(bytes: Long): Unit = { } + + override def incReadUids(uids: Long): Unit = { } + + override def incReadChunks(chunks: Long): Unit = { } + + override def incChunkTime(time: Double): Unit = { } +} diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TriplePartitionReaderFactory.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TriplePartitionReaderFactory.scala index 6c6491df..1fd0b02a 100644 --- a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TriplePartitionReaderFactory.scala +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TriplePartitionReaderFactory.scala @@ -24,7 +24,7 @@ import uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel case class TriplePartitionReaderFactory(model: GraphTableModel) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = partition match { - case p: Partition => new TriplePartitionReader(p, model) + case p: Partition => TriplePartitionReader(p, model) case _ => throw new IllegalArgumentException( s"Expected ${Partition.getClass.getSimpleName}, not ${partition.getClass.getSimpleName}" ) diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TripleScan.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TripleScan.scala index ced2cf3d..69031154 100644 --- a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TripleScan.scala +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/TripleScan.scala @@ -30,6 +30,7 @@ case class TripleScan(partitioner: Partitioner, model: GraphTableModel) extends override def planInputPartitions(): Array[InputPartition] = partitioner.getPartitions.toArray - override def createReaderFactory(): PartitionReaderFactory = new TriplePartitionReaderFactory(model) + override def createReaderFactory(): PartitionReaderFactory = + TriplePartitionReaderFactory(model.withMetrics(AccumulatorPartitionMetrics())) } diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/EdgeTableModel.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/EdgeTableModel.scala index 524e93be..f014bb69 100644 --- a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/EdgeTableModel.scala +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/EdgeTableModel.scala @@ -2,12 +2,16 @@ package uk.co.gresearch.spark.dgraph.connector.model import uk.co.gresearch.spark.dgraph.connector.encoder.TripleEncoder import uk.co.gresearch.spark.dgraph.connector.executor.ExecutorProvider -import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, PartitionQuery} +import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, NoPartitionMetrics, PartitionMetrics, PartitionQuery} /** * Models only the edges of a graph as a table. */ -case class EdgeTableModel(execution: ExecutorProvider, encoder: TripleEncoder, chunkSize: Int) extends GraphTableModel { +case class EdgeTableModel(execution: ExecutorProvider, + encoder: TripleEncoder, + chunkSize: Int, + metrics: PartitionMetrics = NoPartitionMetrics()) + extends GraphTableModel { /** * Turn a partition query into a GraphQl query. @@ -20,4 +24,6 @@ case class EdgeTableModel(execution: ExecutorProvider, encoder: TripleEncoder, c // TODO: query for edges-only when supported query.forPropertiesAndEdges(chunk) + override def withMetrics(metrics: PartitionMetrics): EdgeTableModel = copy(metrics = metrics) + } diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/GraphTableModel.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/GraphTableModel.scala index 4f9022b4..06fcdf67 100644 --- a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/GraphTableModel.scala +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/GraphTableModel.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.types.StructType import uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder import uk.co.gresearch.spark.dgraph.connector.executor.{ExecutorProvider, JsonGraphQlExecutor} import uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.filter -import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, Partition, PartitionQuery, Uid} +import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, Partition, PartitionMetrics, PartitionQuery, Uid} import scala.collection.JavaConverters._ @@ -21,6 +21,9 @@ trait GraphTableModel { val execution: ExecutorProvider val encoder: JsonNodeInternalRowEncoder val chunkSize: Int + val metrics: PartitionMetrics + + def withMetrics(metrics: PartitionMetrics): GraphTableModel /** * Returns the schema of this table. If the table is not readable and doesn't have a schema, an @@ -65,6 +68,12 @@ trait GraphTableModel { s"read ${json.string.length} bytes with ${partition.predicates.map(p => s"${p.size} predicates for ").getOrElse("")}" + s"${chunk.length} uids after ${chunk.after.toHexString} ${until.map(e => s"until ${e.toHexString} ").getOrElse("")}" + s"with ${array.size()} nodes in ${(endTs - startTs)/1000.0}s") + + metrics.incReadBytes(json.string.getBytes.length) + metrics.incReadUids(array.size()) + metrics.incReadChunks(1) + metrics.incChunkTime((endTs - startTs)/1000.0) + array } diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/NodeTableModel.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/NodeTableModel.scala index 31d09a2d..f43789e6 100644 --- a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/NodeTableModel.scala +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/NodeTableModel.scala @@ -2,12 +2,16 @@ package uk.co.gresearch.spark.dgraph.connector.model import uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder import uk.co.gresearch.spark.dgraph.connector.executor.ExecutorProvider -import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, PartitionQuery} +import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, NoPartitionMetrics, PartitionMetrics, PartitionQuery} /** * Models only the nodes of a graph as a table. */ -case class NodeTableModel(execution: ExecutorProvider, encoder: JsonNodeInternalRowEncoder, chunkSize: Int) extends GraphTableModel { +case class NodeTableModel(execution: ExecutorProvider, + encoder: JsonNodeInternalRowEncoder, + chunkSize: Int, + metrics: PartitionMetrics = NoPartitionMetrics()) + extends GraphTableModel { /** * Turn a partition query into a GraphQl query. @@ -19,4 +23,6 @@ case class NodeTableModel(execution: ExecutorProvider, encoder: JsonNodeInternal override def toGraphQl(query: PartitionQuery, chunk: Option[Chunk]): GraphQl = query.forProperties(chunk) + override def withMetrics(metrics: PartitionMetrics): NodeTableModel = copy(metrics = metrics) + } diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/TripleTableModel.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/TripleTableModel.scala index 3949d9aa..4708bc69 100644 --- a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/TripleTableModel.scala +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/model/TripleTableModel.scala @@ -2,12 +2,16 @@ package uk.co.gresearch.spark.dgraph.connector.model import uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder import uk.co.gresearch.spark.dgraph.connector.executor.ExecutorProvider -import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, PartitionQuery} +import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, NoPartitionMetrics, PartitionMetrics, PartitionQuery} /** * Models all triples of a graph as a table, nodes with properties and edges. */ -case class TripleTableModel(execution: ExecutorProvider, encoder: JsonNodeInternalRowEncoder, chunkSize: Int) extends GraphTableModel { +case class TripleTableModel(execution: ExecutorProvider, + encoder: JsonNodeInternalRowEncoder, + chunkSize: Int, + metrics: PartitionMetrics = NoPartitionMetrics()) + extends GraphTableModel { /** * Turn a partition query into a GraphQl query. @@ -19,4 +23,6 @@ case class TripleTableModel(execution: ExecutorProvider, encoder: JsonNodeIntern override def toGraphQl(query: PartitionQuery, chunk: Option[Chunk]): GraphQl = query.forPropertiesAndEdges(chunk) + override def withMetrics(metrics: PartitionMetrics): TripleTableModel = copy(metrics = metrics) + } diff --git a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/partitioner/Partitioner.scala b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/partitioner/Partitioner.scala index 5e94eae0..38de2ce5 100644 --- a/src/main/scala/uk/co/gresearch/spark/dgraph/connector/partitioner/Partitioner.scala +++ b/src/main/scala/uk/co/gresearch/spark/dgraph/connector/partitioner/Partitioner.scala @@ -18,7 +18,7 @@ package uk.co.gresearch.spark.dgraph.connector.partitioner import uk.co.gresearch.spark.dgraph.connector -import uk.co.gresearch.spark.dgraph.connector.{Filters, Partition} +import uk.co.gresearch.spark.dgraph.connector.{Filters, Partition, PartitionMetrics} trait Partitioner { diff --git a/src/test/scala/uk/co/gresearch/spark/dgraph/connector/model/TestGraphTableModel.scala b/src/test/scala/uk/co/gresearch/spark/dgraph/connector/model/TestGraphTableModel.scala index 147ad835..7f039cd9 100644 --- a/src/test/scala/uk/co/gresearch/spark/dgraph/connector/model/TestGraphTableModel.scala +++ b/src/test/scala/uk/co/gresearch/spark/dgraph/connector/model/TestGraphTableModel.scala @@ -5,11 +5,10 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.unsafe.types.UTF8String import org.scalatest.FunSpec import uk.co.gresearch.spark.dgraph.connector +import uk.co.gresearch.spark.dgraph.connector._ import uk.co.gresearch.spark.dgraph.connector.encoder.{JsonNodeInternalRowEncoder, StringTripleEncoder} import uk.co.gresearch.spark.dgraph.connector.executor.{ExecutorProvider, JsonGraphQlExecutor} import uk.co.gresearch.spark.dgraph.connector.model.TestChunkIterator.getChunk -import uk.co.gresearch.spark.dgraph.connector.{GraphQl, Json, Partition, PartitionQuery, Predicate, Target, Uid, UidRange} - class TestGraphTableModel extends FunSpec { @@ -209,7 +208,12 @@ class TestGraphTableModel extends FunSpec { } -case class TestModel(execution: ExecutorProvider, encoder: JsonNodeInternalRowEncoder, chunkSize: Int) extends GraphTableModel { +case class TestModel(execution: ExecutorProvider, + encoder: JsonNodeInternalRowEncoder, + chunkSize: Int, + metrics: PartitionMetrics = NoPartitionMetrics()) + extends GraphTableModel { override def toGraphQl(query: PartitionQuery, chunk: Option[connector.Chunk]): GraphQl = query.forPropertiesAndEdges(chunk) + override def withMetrics(metrics: PartitionMetrics): TestModel = copy(metrics = metrics) } diff --git a/src/test/scala/uk/co/gresearch/spark/dgraph/connector/sources/TestNodeSource.scala b/src/test/scala/uk/co/gresearch/spark/dgraph/connector/sources/TestNodeSource.scala index 0fefde88..ad9e7920 100644 --- a/src/test/scala/uk/co/gresearch/spark/dgraph/connector/sources/TestNodeSource.scala +++ b/src/test/scala/uk/co/gresearch/spark/dgraph/connector/sources/TestNodeSource.scala @@ -19,9 +19,8 @@ package uk.co.gresearch.spark.dgraph.connector.sources import java.sql.Timestamp -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, In, IsNotNull, Literal} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition -import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.scalatest.FunSpec import uk.co.gresearch.spark.SparkTestSession