Skip to content

Commit

Permalink
Add per partition read metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Jul 14, 2020
1 parent 06eb9a6 commit 48b7fb4
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 17 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased] - YYYY-MM-DD

### Added
- Added Spark filter pushdown to improve efficiency when loading only a subgraph. Filters like
- Add Spark filter pushdown to improve efficiency when loading only a subgraph. Filters like
`.where($"predicate" === "name")` will be pushed to Dgraph and only the relevant graph data will
be read ([issue #7](https://github.com/G-Research/spark-dgraph-connector/issues/7)).
- Improve performance of `PredicatePartitioner` for multiple predicates per partition. Restoring
default number of predicates per partition of `1000` from before 0.3.0 ([issue #22](https://github.com/G-Research/spark-dgraph-connector/issues/22)).
- The `PredicatePartitioner` combined with `UidRangePartitioner` is the default partitioner now.
- Add stream-like reading of partitions from Dgraph. Partitions are split into smaller chunks.
Works with predicate partitioning only, so it cannot be combined with uid partitioning.
- Add Dgraph metrics to measure throughput, visible in Spark UI Stages page and through `SparkListener`.

### Security
- Moved Google Guava dependency version fix to 24.1.1-jre due to [known security vulnerability
- Move Google Guava dependency version fix to 24.1.1-jre due to [known security vulnerability
fixed in 24.1.1](https://github.com/advisories/GHSA-mvr2-9pj6-7w5j)

## [0.3.0] - 2020-06-22
Expand Down
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,35 @@ The following table lists all supported Spark filters:
|`In` |<ul><li>predicate column</li><li>predicate value column</li><li>object value columns (not for [String Triples source](#string-triples))</li><li>object type column</li></ul>|<ul><li>`.where($"predicate".isin("release_date", "revenue"))`</li><li>`.where($"dgraph.type".isin("Person","Film"))`</li><li>`.where($"objectLong".isin(123,456))`</li><li>`.where($"objectType".isin("string","long"))`</li></ul>|
|`IsNotNull` |<ul><li>object value columns (not for [String Triples source](#string-triples))</li></ul>|<ul><li>`.where($"dgraph.type".isNotNull)`</li><li>`.where($"objectLong".isNotNull)`</li></ul>|

## Metrics

The connector collects metrics per partition that provide insights in throughout and timing of the communication
to the Dgraph cluster. For each request to Dgraph (a chunk), the number of received bytes, uids and retrieval time are recorded and
summed per partition. The values can be seen on the Spark UI for the respective stages that performs the read:

![Dgraph metrics as shown on Spark UI Stages page](static/accumulators.png "Dgraph metrics as shown on Spark UI Stages page")

The connector uses [Spark Accumulators](http://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/Accumulator.html)
to collect these metrics. They can be accessed by the Spark driver via a `SparkListener`:

val handler = new SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit =
stageCompleted.stageInfo.accumulables.values.foreach(println)
}

spark.sparkContext.addSparkListener(handler)
spark.read.dgraphTriples("localhost:9080").count()


The following metrics are available:

|Metric|Description|
|------|-----------|
|`Dgraph Bytes`|Size of JSON responses from the Dgraph cluster in Byte.|
|`Dgraph Chunks`|Number of requests sent to the Dgraph cluster.|
|`Dgraph Time`|Time waited for Dgraph to respond in Seconds.|
|`Dgraph Uids`|Number of Uids read.|


## Special Use Cases

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package uk.co.gresearch.spark.dgraph.connector

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.{DoubleAccumulator, LongAccumulator}

trait PartitionMetrics {

def incReadBytes(bytes: Long): Unit

def incReadUids(uids: Long): Unit

def incReadChunks(chunks: Long): Unit

def incChunkTime(time: Double): Unit

}

case class AccumulatorPartitionMetrics(readBytes: LongAccumulator,
readUids: LongAccumulator,
readChunks: LongAccumulator,
chunkTime: DoubleAccumulator)
extends PartitionMetrics with Serializable {

override def incReadBytes(bytes: Long): Unit = {
println(s"increment accum $readBytes")
readBytes.add(bytes)
}

override def incReadUids(uids: Long): Unit = readUids.add(uids)

override def incReadChunks(chunks: Long): Unit = readChunks.add(chunks)

override def incChunkTime(time: Double): Unit = chunkTime.add(time)

}

object AccumulatorPartitionMetrics {
def apply(): AccumulatorPartitionMetrics = AccumulatorPartitionMetrics(SparkSession.builder().getOrCreate())

def apply(spark: SparkSession): AccumulatorPartitionMetrics = AccumulatorPartitionMetrics(spark.sparkContext)

def apply(context: SparkContext): AccumulatorPartitionMetrics =
AccumulatorPartitionMetrics(
context.longAccumulator("Dgraph Bytes"),
context.longAccumulator("Dgraph Uids"),
context.longAccumulator("Dgraph Chunks"),
context.doubleAccumulator("Dgraph Time")
)
}

case class NoPartitionMetrics() extends PartitionMetrics with Serializable {
override def incReadBytes(bytes: Long): Unit = { }

override def incReadUids(uids: Long): Unit = { }

override def incReadChunks(chunks: Long): Unit = { }

override def incChunkTime(time: Double): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel
case class TriplePartitionReaderFactory(model: GraphTableModel) extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] =
partition match {
case p: Partition => new TriplePartitionReader(p, model)
case p: Partition => TriplePartitionReader(p, model)
case _ => throw new IllegalArgumentException(
s"Expected ${Partition.getClass.getSimpleName}, not ${partition.getClass.getSimpleName}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ case class TripleScan(partitioner: Partitioner, model: GraphTableModel) extends

override def planInputPartitions(): Array[InputPartition] = partitioner.getPartitions.toArray

override def createReaderFactory(): PartitionReaderFactory = new TriplePartitionReaderFactory(model)
override def createReaderFactory(): PartitionReaderFactory =
TriplePartitionReaderFactory(model.withMetrics(AccumulatorPartitionMetrics()))

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package uk.co.gresearch.spark.dgraph.connector.model

import uk.co.gresearch.spark.dgraph.connector.encoder.TripleEncoder
import uk.co.gresearch.spark.dgraph.connector.executor.ExecutorProvider
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, PartitionQuery}
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, NoPartitionMetrics, PartitionMetrics, PartitionQuery}

/**
* Models only the edges of a graph as a table.
*/
case class EdgeTableModel(execution: ExecutorProvider, encoder: TripleEncoder, chunkSize: Int) extends GraphTableModel {
case class EdgeTableModel(execution: ExecutorProvider,
encoder: TripleEncoder,
chunkSize: Int,
metrics: PartitionMetrics = NoPartitionMetrics())
extends GraphTableModel {

/**
* Turn a partition query into a GraphQl query.
Expand All @@ -20,4 +24,6 @@ case class EdgeTableModel(execution: ExecutorProvider, encoder: TripleEncoder, c
// TODO: query for edges-only when supported
query.forPropertiesAndEdges(chunk)

override def withMetrics(metrics: PartitionMetrics): EdgeTableModel = copy(metrics = metrics)

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.spark.sql.types.StructType
import uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder
import uk.co.gresearch.spark.dgraph.connector.executor.{ExecutorProvider, JsonGraphQlExecutor}
import uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.filter
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, Partition, PartitionQuery, Uid}
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, Partition, PartitionMetrics, PartitionQuery, Uid}

import scala.collection.JavaConverters._

Expand All @@ -21,6 +21,9 @@ trait GraphTableModel {
val execution: ExecutorProvider
val encoder: JsonNodeInternalRowEncoder
val chunkSize: Int
val metrics: PartitionMetrics

def withMetrics(metrics: PartitionMetrics): GraphTableModel

/**
* Returns the schema of this table. If the table is not readable and doesn't have a schema, an
Expand Down Expand Up @@ -65,6 +68,12 @@ trait GraphTableModel {
s"read ${json.string.length} bytes with ${partition.predicates.map(p => s"${p.size} predicates for ").getOrElse("")}" +
s"${chunk.length} uids after ${chunk.after.toHexString} ${until.map(e => s"until ${e.toHexString} ").getOrElse("")}" +
s"with ${array.size()} nodes in ${(endTs - startTs)/1000.0}s")

metrics.incReadBytes(json.string.getBytes.length)
metrics.incReadUids(array.size())
metrics.incReadChunks(1)
metrics.incChunkTime((endTs - startTs)/1000.0)

array
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package uk.co.gresearch.spark.dgraph.connector.model

import uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder
import uk.co.gresearch.spark.dgraph.connector.executor.ExecutorProvider
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, PartitionQuery}
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, NoPartitionMetrics, PartitionMetrics, PartitionQuery}

/**
* Models only the nodes of a graph as a table.
*/
case class NodeTableModel(execution: ExecutorProvider, encoder: JsonNodeInternalRowEncoder, chunkSize: Int) extends GraphTableModel {
case class NodeTableModel(execution: ExecutorProvider,
encoder: JsonNodeInternalRowEncoder,
chunkSize: Int,
metrics: PartitionMetrics = NoPartitionMetrics())
extends GraphTableModel {

/**
* Turn a partition query into a GraphQl query.
Expand All @@ -19,4 +23,6 @@ case class NodeTableModel(execution: ExecutorProvider, encoder: JsonNodeInternal
override def toGraphQl(query: PartitionQuery, chunk: Option[Chunk]): GraphQl =
query.forProperties(chunk)

override def withMetrics(metrics: PartitionMetrics): NodeTableModel = copy(metrics = metrics)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package uk.co.gresearch.spark.dgraph.connector.model

import uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder
import uk.co.gresearch.spark.dgraph.connector.executor.ExecutorProvider
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, PartitionQuery}
import uk.co.gresearch.spark.dgraph.connector.{Chunk, GraphQl, NoPartitionMetrics, PartitionMetrics, PartitionQuery}

/**
* Models all triples of a graph as a table, nodes with properties and edges.
*/
case class TripleTableModel(execution: ExecutorProvider, encoder: JsonNodeInternalRowEncoder, chunkSize: Int) extends GraphTableModel {
case class TripleTableModel(execution: ExecutorProvider,
encoder: JsonNodeInternalRowEncoder,
chunkSize: Int,
metrics: PartitionMetrics = NoPartitionMetrics())
extends GraphTableModel {

/**
* Turn a partition query into a GraphQl query.
Expand All @@ -19,4 +23,6 @@ case class TripleTableModel(execution: ExecutorProvider, encoder: JsonNodeIntern
override def toGraphQl(query: PartitionQuery, chunk: Option[Chunk]): GraphQl =
query.forPropertiesAndEdges(chunk)

override def withMetrics(metrics: PartitionMetrics): TripleTableModel = copy(metrics = metrics)

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package uk.co.gresearch.spark.dgraph.connector.partitioner

import uk.co.gresearch.spark.dgraph.connector
import uk.co.gresearch.spark.dgraph.connector.{Filters, Partition}
import uk.co.gresearch.spark.dgraph.connector.{Filters, Partition, PartitionMetrics}

trait Partitioner {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String
import org.scalatest.FunSpec
import uk.co.gresearch.spark.dgraph.connector
import uk.co.gresearch.spark.dgraph.connector._
import uk.co.gresearch.spark.dgraph.connector.encoder.{JsonNodeInternalRowEncoder, StringTripleEncoder}
import uk.co.gresearch.spark.dgraph.connector.executor.{ExecutorProvider, JsonGraphQlExecutor}
import uk.co.gresearch.spark.dgraph.connector.model.TestChunkIterator.getChunk
import uk.co.gresearch.spark.dgraph.connector.{GraphQl, Json, Partition, PartitionQuery, Predicate, Target, Uid, UidRange}


class TestGraphTableModel extends FunSpec {

Expand Down Expand Up @@ -209,7 +208,12 @@ class TestGraphTableModel extends FunSpec {

}

case class TestModel(execution: ExecutorProvider, encoder: JsonNodeInternalRowEncoder, chunkSize: Int) extends GraphTableModel {
case class TestModel(execution: ExecutorProvider,
encoder: JsonNodeInternalRowEncoder,
chunkSize: Int,
metrics: PartitionMetrics = NoPartitionMetrics())
extends GraphTableModel {
override def toGraphQl(query: PartitionQuery, chunk: Option[connector.Chunk]): GraphQl =
query.forPropertiesAndEdges(chunk)
override def withMetrics(metrics: PartitionMetrics): TestModel = copy(metrics = metrics)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package uk.co.gresearch.spark.dgraph.connector.sources

import java.sql.Timestamp

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, In, IsNotNull, Literal}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.scalatest.FunSpec
import uk.co.gresearch.spark.SparkTestSession
Expand Down

0 comments on commit 48b7fb4

Please sign in to comment.