Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple keys in bigdiffy #138

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.spotify.scio.values.SCollection
import com.twitter.algebird._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.io.{FileSystems, TextIO}
import org.apache.beam.sdk.io.TextIO

import scala.annotation.tailrec
import scala.collection.JavaConverters._
Expand All @@ -49,21 +49,29 @@ object DiffType extends Enumeration {
val SAME, DIFFERENT, MISSING_LHS, MISSING_RHS = Value
}

case class MultiKey(keys: Seq[String]) extends AnyVal {
override def toString: String = keys.mkString("_")
}

object MultiKey {
def apply(key: String): MultiKey = MultiKey(Seq(key))
}

/**
* Key-field level [[DiffType]] and delta.
*
* If DiffType are SAME, MISSING_LHS, or MISSING_RHS they will appear once with no Delta
* If DiffType is DIFFERENT, there is one KeyStats for every field that is different for that key
* with that field's Delta
*
* key - primary being compared.
* keys - primary being compared.
* diffType - how the two records of the given key compares.
* delta - a single field's difference including field name, values, and distance
*/
case class KeyStats(key: String, diffType: DiffType.Value, delta: Option[Delta]) {
case class KeyStats(keys: MultiKey, diffType: DiffType.Value, delta: Option[Delta]) {
override def toString: String = {
val deltaStr = delta.map(_.toString).getOrElse("")
s"$key\t$diffType\t$deltaStr"
s"$keys\t$diffType\t$deltaStr"
}
}

Expand Down Expand Up @@ -120,9 +128,9 @@ case class FieldStats(field: String,

/** Big diff between two data sets given a primary key. */
class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T],
diffy: Diffy[T], keyFn: T => String) {
diffy: Diffy[T], keyFn: T => MultiKey) {

private lazy val _deltas: SCollection[(String, (Seq[Delta], DiffType.Value))] =
private lazy val _deltas: BigDiffy.DeltaSCollection =
BigDiffy.computeDeltas(lhs, rhs, diffy, keyFn)

private lazy val globalAndFieldStats: SCollection[(GlobalStats, Iterable[FieldStats])] =
Expand All @@ -133,7 +141,7 @@ class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T],
*
* Output tuples are (key, field, LHS, RHS). Note that LHS and RHS may not be serializable.
*/
lazy val deltas: SCollection[(String, String, Any, Any)] =
lazy val deltas: SCollection[(MultiKey, String, Any, Any)] =
_deltas.flatMap { case (k, (ds, dt)) =>
ds.map(d => (k, d.field, d.left, d.right))
}
Expand Down Expand Up @@ -164,10 +172,10 @@ object BigDiffy extends Command {
val command: String = "bigDiffy"

// (field, deltas, diff type)
type DeltaSCollection = SCollection[(String, (Seq[Delta], DiffType.Value))]
type DeltaSCollection = SCollection[(MultiKey, (Seq[Delta], DiffType.Value))]

private def computeDeltas[T](lhs: SCollection[T], rhs: SCollection[T],
d: Diffy[T], keyFn: T => String): DeltaSCollection = {
d: Diffy[T], keyFn: T => MultiKey): DeltaSCollection = {
// extract keys and prefix records with L/R sub-key
val lKeyed = lhs.map(t => (keyFn(t), ("l", t)))
val rKeyed = rhs.map(t => (keyFn(t), ("r", t)))
Expand Down Expand Up @@ -253,28 +261,28 @@ object BigDiffy extends Command {

/** Diff two data sets. */
def diff[T: ClassTag](lhs: SCollection[T], rhs: SCollection[T],
d: Diffy[T], keyFn: T => String): BigDiffy[T] =
d: Diffy[T], keyFn: T => MultiKey): BigDiffy[T] =
new BigDiffy[T](lhs, rhs, d, keyFn)

/** Diff two Avro data sets. */
def diffAvro[T <: GenericRecord : ClassTag](sc: ScioContext,
lhs: String, rhs: String,
keyFn: T => String,
keyFn: T => MultiKey,
diffy: AvroDiffy[T],
schema: Schema = null): BigDiffy[T] =
diff(sc.avroFile[T](lhs, schema), sc.avroFile[T](rhs, schema), diffy, keyFn)

/** Diff two ProtoBuf data sets. */
def diffProtoBuf[T <: AbstractMessage : ClassTag](sc: ScioContext,
lhs: String, rhs: String,
keyFn: T => String,
keyFn: T => MultiKey,
diffy: ProtoBufDiffy[T]): BigDiffy[T] =
diff(sc.protobufFile(lhs), sc.protobufFile(rhs), diffy, keyFn)

/** Diff two TableRow data sets. */
def diffTableRow(sc: ScioContext,
lhs: String, rhs: String,
keyFn: TableRow => String,
keyFn: TableRow => MultiKey,
diffy: TableRowDiffy): BigDiffy[TableRow] =
diff(sc.bigQueryTable(lhs), sc.bigQueryTable(rhs), diffy, keyFn)

Expand Down Expand Up @@ -327,7 +335,7 @@ object BigDiffy extends Command {
case BQ =>
// Saving to BQ, header irrelevant
bigDiffy.keyStats.map(stat =>
KeyStatsBigQuery(stat.key, stat.diffType.toString, stat.delta.map(d => {
KeyStatsBigQuery(stat.keys.toString, stat.diffType.toString, stat.delta.map(d => {
val dv = d.delta match {
case TypedDelta(dt, v) =>
DeltaValueBigQuery(dt.toString, Option(v))
Expand Down Expand Up @@ -383,7 +391,7 @@ object BigDiffy extends Command {
|
| --input-mode=(avro|bigquery) Diff-ing Avro or BQ records
| [--output-mode=(gcs|bigquery)] Saves to a text file in GCS or a BigQuery dataset. Defaults to GCS
| --key=<key> '.' separated key field
| --key=<key> '.' separated key field. Specify multiple --key params for multi key usage.
| --lhs=<path> LHS File path or BigQuery table
| --rhs=<path> RHS File path or BigQuery table
| --output=<output> File path prefix for output
Expand Down Expand Up @@ -411,28 +419,30 @@ object BigDiffy extends Command {
sys.exit(1)
}

private def avroKeyFn(key: String): GenericRecord => String = {
private[diffy] def avroKeyFn(keys: Seq[String]): GenericRecord => MultiKey = {
@tailrec
def get(xs: Array[String], i: Int, r: GenericRecord): String =
if (i == xs.length - 1) {
r.get(xs(i)).toString
} else {
get(xs, i + 1, r.get(xs(i)).asInstanceOf[GenericRecord])
}
val xs = key.split('.')
(r: GenericRecord) => get(xs, 0, r)

val xs = keys.map(_.split('.'))
(r: GenericRecord) => MultiKey(xs.map(x => get(x, 0, r)))
}

private def tableRowKeyFn(key: String): TableRow => String = {
private[diffy] def tableRowKeyFn(keys: Seq[String]): TableRow => MultiKey = {
@tailrec
def get(xs: Array[String], i: Int, r: java.util.Map[String, AnyRef]): String =
if (i == xs.length - 1) {
r.get(xs(i)).toString
} else {
get(xs, i + 1, r.get(xs(i)).asInstanceOf[java.util.Map[String, AnyRef]])
}
val xs = key.split('.')
(r: TableRow) => get(xs, 0, r)

val xs = keys.map(_.split('.'))
(r: TableRow) => MultiKey(xs.map(x => get(x, 0, r)))
}

def pathWithShards(path: String): String = path.replaceAll("\\/+$", "") + "/part"
Expand All @@ -456,9 +466,9 @@ object BigDiffy extends Command {
def run(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

val (inputMode, key, lhs, rhs, output, header, ignore, unordered, outputMode) = {
val (inputMode, keys, lhs, rhs, output, header, ignore, unordered, outputMode) = {
try {
(args("input-mode"), args("key"), args("lhs"), args("rhs"), args("output"),
(args("input-mode"), args.list("key"), args("lhs"), args("rhs"), args("output"),
args.boolean("with-header", false), args.list("ignore").toSet,
args.list("unordered").toSet, args.optional("output-mode"))
} catch {
Expand All @@ -481,15 +491,15 @@ object BigDiffy extends Command {
val schema = new AvroSampler(rhs, conf = Some(sc.options))
.sample(1, head = true).head.getSchema
val diffy = new AvroDiffy[GenericRecord](ignore, unordered)
BigDiffy.diffAvro[GenericRecord](sc, lhs, rhs, avroKeyFn(key), diffy, schema)
BigDiffy.diffAvro[GenericRecord](sc, lhs, rhs, avroKeyFn(keys), diffy, schema)
case "bigquery" =>
// TODO: handle schema evolution
val bq = BigQueryClient.defaultInstance()
val lSchema = bq.getTableSchema(lhs)
val rSchema = bq.getTableSchema(rhs)
val schema = mergeTableSchema(lSchema, rSchema)
val diffy = new TableRowDiffy(schema, ignore, unordered)
BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(key), diffy)
BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(keys), diffy)
case m =>
throw new IllegalArgumentException(s"input mode $m not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@

package com.spotify.ratatool.diffy

import com.spotify.ratatool.avro.specific.{RequiredNestedRecord, TestRecord}
import com.spotify.ratatool.scalacheck._
import com.spotify.scio.testing.PipelineSpec
import org.apache.avro.util.Utf8
import org.apache.beam.sdk.coders.AvroCoder
import org.apache.beam.sdk.util.CoderUtils
import org.scalacheck.Gen
import org.scalacheck.rng.Seed

import com.spotify.ratatool.avro.specific.{RequiredNestedRecord, TestRecord}
import com.spotify.ratatool.scalacheck._
import com.spotify.scio.testing.PipelineSpec

import com.google.api.services.bigquery.model.TableRow

class BigDiffyTest extends PipelineSpec {

val keys = (1 to 1000).map("key" + _)
val keys = (1 to 1000).map(k => MultiKey("key" + k))
val coder = AvroCoder.of(classOf[TestRecord])

/** Fixed to a small range so that Std. Dev. & Variance calculations are easier to predict */
Expand All @@ -50,7 +53,7 @@ class BigDiffyTest extends PipelineSpec {
val rhs = lhs.map(CoderUtils.clone(coder, _))
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 1000L, 0L, 0L, 0L))
result.deltas should beEmpty
result.keyStats should containInAnyOrder (keys.map(KeyStats(_, DiffType.SAME, None)))
Expand All @@ -60,19 +63,20 @@ class BigDiffyTest extends PipelineSpec {

it should "work with deltas" in {
runWithContext { sc =>
val keyedDoubles = lhs.map(i =>
(i.getRequiredFields.getStringField.toString , i.getRequiredFields.getDoubleField))
val keyedDoubles = lhs.map { i =>
(MultiKey(i.getRequiredFields.getStringField.toString), i.getRequiredFields.getDoubleField)
}
val rhs = lhs.map(CoderUtils.clone(coder, _)).map { r =>
r.getRequiredFields.setDoubleField(r.getRequiredFields.getDoubleField + 10.0)
r
}
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 0L, 1000L, 0L, 0L))
result.deltas.map(d => (d._1, d._2)) should containInAnyOrder (
keys.map((_, field)))
result.keyStats should containInAnyOrder (keyedDoubles.map{case (k, d) =>
result.keyStats should containInAnyOrder (keyedDoubles.map { case (k, d) =>
KeyStats(k, DiffType.DIFFERENT, Option(Delta("required_fields.double_field", d, d + 10.0,
TypedDelta(DeltaType.NUMERIC, 10.0))))})
result.fieldStats.map(f => (f.field, f.count, f.fraction)) should containSingleValue (
Expand All @@ -91,12 +95,12 @@ class BigDiffyTest extends PipelineSpec {
val rhs = lhs.map(CoderUtils.clone(coder, _))
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs2), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 500L, 0L, 500L, 0L))
result.deltas should beEmpty
result.keyStats should containInAnyOrder (
(1 to 500).map(i => KeyStats("key" + i, DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats("key" + i, DiffType.MISSING_LHS, None)))
(1 to 500).map(i => KeyStats(MultiKey("key" + i), DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats(MultiKey("key" + i), DiffType.MISSING_LHS, None)))
result.fieldStats should beEmpty
}
}
Expand All @@ -106,14 +110,51 @@ class BigDiffyTest extends PipelineSpec {
val rhs = lhs.filter(_.getRequiredFields.getIntField <= 500).map(CoderUtils.clone(coder, _))
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 500L, 0L, 0L, 500L))
result.deltas should beEmpty
result.keyStats should containInAnyOrder (
(1 to 500).map(i => KeyStats("key" + i, DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats("key" + i, DiffType.MISSING_RHS, None)))
(1 to 500).map(i => KeyStats(MultiKey("key" + i), DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats(MultiKey("key" + i), DiffType.MISSING_RHS, None)))
result.fieldStats should beEmpty
}
}

"BigDiffy avroKeyFn" should "work with single key" in {
val record = specificRecordOf[TestRecord].sample.get
val keyValue = BigDiffy.avroKeyFn(Seq("required_fields.int_field"))(record)

keyValue.toString shouldBe record.getRequiredFields.getIntField.toString
}

"BigDiffy avroKeyFn" should "work with multiple key" in {
val record = specificRecordOf[TestRecord].sample.get
val keys = Seq("required_fields.int_field", "required_fields.double_field")
val keyValues = BigDiffy.avroKeyFn(keys)(record)
val expectedKey =
s"${record.getRequiredFields.getIntField}_${record.getRequiredFields.getDoubleField}"

keyValues.toString shouldBe expectedKey
}

"BigDiffy tableRowKeyFn" should "work with single key" in {
val record = new TableRow()
record.set("key", "foo")
val keyValue = BigDiffy.tableRowKeyFn(Seq("key"))(record)

keyValue.toString shouldBe "foo"
}

"BigDiffy tableRowKeyFn" should "work with multiple key" in {
val subRecord = new TableRow()
subRecord.set("key", "foo")
subRecord.set("other_key", "bar")
val record = new TableRow()
record.set("record", subRecord)

val keys = Seq("record.key", "record.other_key")
val keyValues = BigDiffy.tableRowKeyFn(keys)(record.asInstanceOf[TableRow])

keyValues.toString shouldBe "foo_bar"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package com.spotify.ratatool.examples.diffy

import com.spotify.ratatool.avro.specific.ExampleRecord
import com.spotify.ratatool.diffy.{AvroDiffy, BigDiffy}
import com.spotify.ratatool.diffy.{AvroDiffy, BigDiffy, MultiKey}
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.coders.AvroCoder
import org.apache.beam.sdk.util.CoderUtils

object PreProcessBigDiffy {
def recordKeyFn(r: ExampleRecord): String = {
r.getRecordId.toString
def recordKeyFn(r: ExampleRecord): MultiKey = {
MultiKey(r.getRecordId.toString)
}

def mapFn(coder: => AvroCoder[ExampleRecord])(r: ExampleRecord): ExampleRecord = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package com.spotify.ratatool.examples.diffy
import java.net.URI

import com.spotify.ratatool.GcsConfiguration
import com.spotify.ratatool.diffy.{BigDiffy, ProtoBufDiffy}
import com.spotify.ratatool.diffy.{BigDiffy, MultiKey, ProtoBufDiffy}
import com.spotify.ratatool.examples.proto.Schemas.ExampleRecord
import org.apache.hadoop.fs.{FileSystem, Path}

import com.spotify.scio._

object ProtobufBigDiffyExample {
def recordKeyFn(t: ExampleRecord): String = {
t.getStringField
def recordKeyFn(t: ExampleRecord): MultiKey = {
MultiKey(t.getStringField)
}

def main(cmdlineArgs: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package com.spotify.ratatool.shapeless

import com.spotify.ratatool.Command
import com.spotify.ratatool.diffy.{BigDiffy, Delta, Diffy}
import com.spotify.ratatool.diffy.{BigDiffy, Delta, Diffy, MultiKey}
import com.spotify.ratatool.diffy.BigDiffy.diff
import com.spotify.scio.values.SCollection
import shapeless._
import shapeless.labelled.FieldType

import scala.reflect.ClassTag

@SerialVersionUID(42L)
Expand Down Expand Up @@ -173,7 +172,7 @@ object CaseClassDiffy {
/** Diff two SCollection[T] **/
def diffCaseClass[T : ClassTag : MapEncoder](lhs: SCollection[T],
rhs: SCollection[T],
keyFn: T => String,
keyFn: T => MultiKey,
diffy: CaseClassDiffy[T]) : BigDiffy[T] =
diff(lhs, rhs, diffy, keyFn)
}