Skip to content

Commit

Permalink
Support multiple keys in bigdiffy
Browse files Browse the repository at this point in the history
  • Loading branch information
martinbomio committed Nov 6, 2018
1 parent 1937910 commit 5669ac1
Showing 1 changed file with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,28 +414,27 @@ object BigDiffy extends Command {
sys.exit(1)
}

private def avroKeyFn(key: String): GenericRecord => String = {
private def avroKeyFn(keys: Seq[String]): GenericRecord => String = {
@tailrec
def get(xs: Array[String], i: Int, r: GenericRecord): String =
if (i == xs.length - 1) {
r.get(xs(i)).toString
} else {
get(xs, i + 1, r.get(xs(i)).asInstanceOf[GenericRecord])
}
val xs = key.split('.')
(r: GenericRecord) => get(xs, 0, r)
(r: GenericRecord) => keys.map { k => get(k.split('.'), 0, r) }.mkString("_")
}

private def tableRowKeyFn(key: String): TableRow => String = {
private def tableRowKeyFn(keys: Seq[String]): TableRow => String = {
@tailrec
def get(xs: Array[String], i: Int, r: java.util.Map[String, AnyRef]): String =
if (i == xs.length - 1) {
r.get(xs(i)).toString
} else {
get(xs, i + 1, r.get(xs(i)).asInstanceOf[java.util.Map[String, AnyRef]])
}
val xs = key.split('.')
(r: TableRow) => get(xs, 0, r)

(r: TableRow) => keys.map { k => get(k.split('.'), 0, r) }.mkString("_")
}

def pathWithShards(path: String): String = path.replaceAll("\\/+$", "") + "/part"
Expand All @@ -459,9 +458,9 @@ object BigDiffy extends Command {
def run(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

val (inputMode, key, lhs, rhs, output, header, ignore, unordered, outputMode) = {
val (inputMode, keys, lhs, rhs, output, header, ignore, unordered, outputMode) = {
try {
(args("input-mode"), args("key"), args("lhs"), args("rhs"), args("output"),
(args("input-mode"), args.list("key"), args("lhs"), args("rhs"), args("output"),
args.boolean("with-header", false), args.list("ignore").toSet,
args.list("unordered").toSet, args.optional("output-mode"))
} catch {
Expand All @@ -485,15 +484,15 @@ object BigDiffy extends Command {
val path = fs.globStatus(new Path(rhs)).head.getPath
val schema = new AvroSampler(path).sample(1, true).head.getSchema
val diffy = new AvroDiffy[GenericRecord](ignore, unordered)
BigDiffy.diffAvro[GenericRecord](sc, lhs, rhs, avroKeyFn(key), diffy, schema)
BigDiffy.diffAvro[GenericRecord](sc, lhs, rhs, avroKeyFn(keys), diffy, schema)
case "bigquery" =>
// TODO: handle schema evolution
val bq = BigQueryClient.defaultInstance()
val lSchema = bq.getTableSchema(lhs)
val rSchema = bq.getTableSchema(rhs)
val schema = mergeTableSchema(lSchema, rSchema)
val diffy = new TableRowDiffy(schema, ignore, unordered)
BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(key), diffy)
BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(keys), diffy)
case m =>
throw new IllegalArgumentException(s"input mode $m not supported")
}
Expand Down

0 comments on commit 5669ac1

Please sign in to comment.