From eaabe81e9017d041132415cf13f57b008b6348be Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Mon, 30 Sep 2019 12:41:36 +0530 Subject: [PATCH 01/11] Fix bug for edge case of initial kafka offset --- .../kafka/KafkaWorkUnitCalculator.java | 156 +++++++++--------- 1 file changed, 78 insertions(+), 78 deletions(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java b/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java index 3b39dfe..901e6e0 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sources/kafka/KafkaWorkUnitCalculator.java @@ -68,7 +68,7 @@ @Slf4j @RequiredArgsConstructor public class KafkaWorkUnitCalculator implements IWorkUnitCalculator { + KafkaWorkUnitCalculatorResult, StringValue> { public static final String KAFKA_METADATA_PREFIX = "kafka_metadata"; public static final String KAFKA_METADATA_WITH_SEPARATOR = KAFKA_METADATA_PREFIX + StringTypes.COLON; @@ -116,27 +116,28 @@ public void initPreviousRunState(@NonNull final IMetadataManager me final String topicSpecificName = getTopicSpecificMetadataKey(topicName); final List toDelete = new LinkedList<>(); metadataManager.getAllKeys().forEach(key -> { - if (key.startsWith(topicSpecificName)) { - // this is my specific topic - metadata.put(Integer.parseInt(key.substring(topicSpecificName.length())), + if (key.startsWith(topicSpecificName)) { + // this is my specific topic + metadata.put(Integer.parseInt(key.substring(topicSpecificName.length())), Long.parseLong(metadataManager.get(key).get().getValue())); - } else if (key.startsWith(KAFKA_METADATA_WITH_SEPARATOR)) { - // this is a specific topic, but not mine. ignore. - assert true; - } else if (key.startsWith(KAFKA_METADATA_PREFIX)) { - // this is unspecified topic - metadata.put(Integer.parseInt(key.substring(KAFKA_METADATA_PREFIX.length())), + } else if (key.startsWith(KAFKA_METADATA_WITH_SEPARATOR)) { + // this is a specific topic, but not mine. ignore. + assert true; + } else if (key.startsWith(KAFKA_METADATA_PREFIX)) { + // this is unspecified topic + metadata.put(Integer.parseInt(key.substring(KAFKA_METADATA_PREFIX.length())), Long.parseLong(metadataManager.get(key).get().getValue())); - // delete the old, unspecified metadata - toDelete.add(key); - } - }); + // delete the old, unspecified metadata + toDelete.add(key); + } + }); toDelete.forEach(metadataManager::remove); this.previousRunState = Optional.of(new KafkaRunState(metadata)); } /** * Get the specific metadata name for kafka that we'll be using + * * @param topicName the name of the topic to get the metadata for * @return the processed name */ @@ -150,9 +151,9 @@ public void saveNextRunState(@NonNull final IMetadataManager metada final String topicName = this.conf.getTopicName(); final String topicSpecificName = getTopicSpecificMetadataKey(topicName); nextRunState.getPartitionOffsets().entrySet().forEach( - entry -> { - metadataManager.set(topicSpecificName + entry.getKey(), new StringValue(entry.getValue().toString())); - }); + entry -> { + metadataManager.set(topicSpecificName + entry.getKey(), new StringValue(entry.getValue().toString())); + }); } @Override @@ -174,15 +175,15 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { // Read checkpointed topic partition offsets and update it with newly added partitions. final Map oldPartitionOffsets = readExistingPartitionOffsets(); - if (oldPartitionOffsets.isEmpty()) { - // If it's a first run then initialize new partitions with latest partition offsets. - return new KafkaWorkUnitCalculatorResult(Collections.emptyList(), - new KafkaRunState(this.offsetSelector.getPartitionOffsets( - this.conf, - latestLeaderOffsets.keySet(), - earliestLeaderOffsets, - latestLeaderOffsets))); - } +// if (oldPartitionOffsets.isEmpty()) { +// // If it's a first run then initialize new partitions with latest partition offsets. +// return new KafkaWorkUnitCalculatorResult(Collections.emptyList(), +// new KafkaRunState(this.offsetSelector.getPartitionOffsets( +// this.conf, +// latestLeaderOffsets.keySet(), +// earliestLeaderOffsets, +// latestLeaderOffsets))); +// } final Map newPartitionOffsets = updatePartitionStartOffsets(oldPartitionOffsets, earliestLeaderOffsets, latestLeaderOffsets); @@ -190,12 +191,12 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { long totalNewMessages = 0; final List partitionMessages = new ArrayList<>(latestLeaderOffsets.size()); for (Entry entry : latestLeaderOffsets.entrySet()) { - if (!newPartitionOffsets.containsKey(entry.getKey().partition())) { - log.error("Unable to find offsets for topic {} partition {}", - entry.getKey().topic(), entry.getKey().partition()); - continue; - } - final long messages = entry.getValue() - newPartitionOffsets.get(entry.getKey().partition()); +// if (!newPartitionOffsets.containsKey(entry.getKey().partition())) { +// log.error("Unable to find offsets for topic {} partition {}", +// entry.getKey().topic(), entry.getKey().partition()); +// continue; +// } + final long messages = entry.getValue(); log.debug("topicPartition:{}:messages:{}:latestOffset:{}", entry.getKey(), messages, entry.getValue()); if (messages == 0) { continue; @@ -209,12 +210,12 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { return new KafkaWorkUnitCalculatorResult(Collections.emptyList(), new KafkaRunState(newPartitionOffsets)); } final List workUnits = - calculatePartitionOffsetRangesToRead(partitionMessages, newPartitionOffsets, - totalNewMessages); + calculatePartitionOffsetRangesToRead(partitionMessages, newPartitionOffsets, + totalNewMessages); // compute run state for the next run. final KafkaRunState nextRunState = createNextRunState(workUnits); final KafkaWorkUnitCalculatorResult kafkaWorkUnitCalculatorResult = - new KafkaWorkUnitCalculatorResult(workUnits, nextRunState); + new KafkaWorkUnitCalculatorResult(workUnits, nextRunState); computeRunMetrics(latestLeaderOffsets, nextRunState, workUnits); log.info("workunits: {}", kafkaWorkUnitCalculatorResult); @@ -222,14 +223,14 @@ public KafkaWorkUnitCalculatorResult computeWorkUnits() { } private List calculatePartitionOffsetRangesToRead( - @NonNull final List partitionMessages, - @NonNull final Map partitionStartOffsets, final long numMessages) { + @NonNull final List partitionMessages, + @NonNull final Map partitionStartOffsets, final long numMessages) { // This will make sure that we can read more messages from partition with more than average messages per // partition at the same time we will read all the messages from partition with less than avg messags. Collections.sort(partitionMessages); final long maxMessagesToRead = this.conf.getMaxMessagesToRead(); log.info("topicName:{}:newMessages:{}:maxMessagesToRead:{}", this.conf.getTopicName(), numMessages, - maxMessagesToRead); + maxMessagesToRead); final boolean hasExtraMessages = numMessages > maxMessagesToRead; final long numMessagesToRead = Math.min(numMessages, maxMessagesToRead); @@ -247,8 +248,8 @@ private List calculatePartitionOffsetRangesToRead( } if (numMsgsToBeSelected > 0) { offsetRanges.add(OffsetRange.create(m.getTopicPartition(), - partitionStartOffsets.get(m.getTopicPartition().partition()), - partitionStartOffsets.get(m.getTopicPartition().partition()) + numMsgsToBeSelected)); + partitionStartOffsets.get(m.getTopicPartition().partition()), + partitionStartOffsets.get(m.getTopicPartition().partition()) + numMsgsToBeSelected)); } } return offsetRanges; @@ -267,21 +268,21 @@ private Map readExistingPartitionOffsets() { private KafkaRunState createNextRunState(@NonNull final List workUnits) { final Map partitionOffsets = new HashMap<>(); workUnits.forEach( - offsetRange -> { - final int partition = offsetRange.partition(); - if (partitionOffsets.containsKey(partition)) { - partitionOffsets - .put(partition, Math.max(partitionOffsets.get(partition), offsetRange.untilOffset())); - } else { - partitionOffsets.put(partition, offsetRange.untilOffset()); + offsetRange -> { + final int partition = offsetRange.partition(); + if (partitionOffsets.containsKey(partition)) { + partitionOffsets + .put(partition, Math.max(partitionOffsets.get(partition), offsetRange.untilOffset())); + } else { + partitionOffsets.put(partition, offsetRange.untilOffset()); + } } - } ); return new KafkaRunState(partitionOffsets); } - private Map buildResetPartitionOffsetMap(@NonNull final Map earliestTPOffsets, - @NonNull final Map latestTPOffsets) { + private Map buildResetPartitionOffsetMap(@NonNull final Map earliestTPOffsets, + @NonNull final Map latestTPOffsets) { Preconditions.checkState(kafkaOffsetResetter.isPresent(), "KafkaOffsetResetter should be present " + "when this method is called"); @@ -324,7 +325,7 @@ private Map handleDataLossAndMaybeResetOffsets(@NonNull final Map final long lossStartOffset, final long lossEndOffset) { final String errMsg = String.format("DATA_LOSS:MISSED_KAFKA_MESSAGES:topic:%s:partition:%d:" - + "startOffset:%d:endOffset:%d", topicPartition.topic(), + + "startOffset:%d:endOffset:%d", topicPartition.topic(), topicPartition.partition(), lossStartOffset, lossEndOffset); log.error(errMsg); if (kafkaOffsetResetter.isPresent()) { @@ -342,24 +343,23 @@ private Map handleDataLossAndMaybeResetOffsets(@NonNull final Map private Map updatePartitionStartOffsets(@NonNull final Map partitionOffsetMap, @NonNull final Map earliestTPOffsets, @NonNull final Map latestTPOffsets) { - if (!partitionOffsetMap.isEmpty()) { - for (Entry entry : earliestTPOffsets.entrySet()) { - final TopicPartition topicPartition = entry.getKey(); - if (!partitionOffsetMap.containsKey(topicPartition.partition())) { - // New partition is found. - log.info("Found new partition for topic:{}:partition:{}", topicPartition.topic(), - topicPartition.partition()); - partitionOffsetMap.put(topicPartition.partition(), entry.getValue()); - } else if (entry.getValue() > partitionOffsetMap.get(topicPartition.partition())) { - // data loss detected - return handleDataLossAndMaybeResetOffsets(earliestTPOffsets, latestTPOffsets, - topicPartition, partitionOffsetMap, - partitionOffsetMap.get(topicPartition.partition()), entry.getValue()); - } + for (Entry entry : earliestTPOffsets.entrySet()) { + final TopicPartition topicPartition = entry.getKey(); + if (!partitionOffsetMap.containsKey(topicPartition.partition())) { + // New partition is found. + log.info("Found new partition for topic:{}:partition:{}", topicPartition.topic(), + topicPartition.partition()); + partitionOffsetMap.put(topicPartition.partition(), entry.getValue()); + } else if (entry.getValue() > partitionOffsetMap.get(topicPartition.partition())) { + // data loss detected + return handleDataLossAndMaybeResetOffsets(earliestTPOffsets, latestTPOffsets, + topicPartition, partitionOffsetMap, + partitionOffsetMap.get(topicPartition.partition()), entry.getValue()); } } return partitionOffsetMap; } + /* Creates metrics for the current execution based on the source. */ @@ -376,22 +376,22 @@ private void computeRunMetrics(@NonNull final Map latestLe totalTags.put(PARTITION_TAG, TOTAL_PARTITION); final MessageCounters counter = new MessageCounters(); offsetRanges.forEach(offsetRange -> { - final Long oldCount = offsetMap.getOrDefault(offsetRange.topicPartition(), 0L); - offsetMap.put(offsetRange.topicPartition(), oldCount + offsetRange.count()); - }); + final Long oldCount = offsetMap.getOrDefault(offsetRange.topicPartition(), 0L); + offsetMap.put(offsetRange.topicPartition(), oldCount + offsetRange.count()); + }); latestLeaderOffsets.forEach( - (topicPartition, leaderOffset) -> - computePartitionMetrics( - topicPartition, leaderOffset, nextRunState, - topicMetrics, offsetMap.getOrDefault(topicPartition, 0L), counter) + (topicPartition, leaderOffset) -> + computePartitionMetrics( + topicPartition, leaderOffset, nextRunState, + topicMetrics, offsetMap.getOrDefault(topicPartition, 0L), counter) ); topicMetrics.createLongMetric(DataFeedMetricNames.ROWCOUNT_BEHIND, - counter.getTotalAvailable() - counter.getTotalCurrent(), totalTags); + counter.getTotalAvailable() - counter.getTotalCurrent(), totalTags); topicMetrics.createLongMetric(DataFeedMetricNames.INPUT_ROWCOUNT, counter.getTotalInput(), totalTags); if (this.chargebackCalculator.isPresent()) { this.chargebackCalculator.get().addCost( - this.topicMetrics.get().getBaseTags().get(DataFeedMetrics.DATA_FEED_NAME), + this.topicMetrics.get().getBaseTags().get(DataFeedMetrics.DATA_FEED_NAME), ChargebackMetricType.ROW_COUNT, counter.getTotalInput()); } } @@ -426,7 +426,7 @@ private void computePartitionMetrics(@NonNull final TopicPartition topicPartitio * It holds current set of work units and also {@link KafkaRunState} for the next run. */ public final class KafkaWorkUnitCalculatorResult implements - IWorkUnitCalculator.IWorkUnitCalculatorResult { + IWorkUnitCalculator.IWorkUnitCalculatorResult { @Getter private final KafkaRunState nextRunState; @@ -455,9 +455,9 @@ public String toString() { final StringBuilder sb = new StringBuilder(); sb.append("offsetRanges="); this.workUnits.forEach( - workUnit -> sb.append( - workUnit.partition()).append(":").append(workUnit.fromOffset()).append("->") - .append(workUnit.untilOffset()).append(";")); + workUnit -> sb.append( + workUnit.partition()).append(":").append(workUnit.fromOffset()).append("->") + .append(workUnit.untilOffset()).append(";")); return sb.toString(); } From ec8fd43c8178b4ab2d9946331dba4aa31a5816fb Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Mon, 30 Sep 2019 12:41:58 +0530 Subject: [PATCH 02/11] Add KafkaToHoodie Job --- .../examples/job/KafkaToHoodieJob.java | 378 ++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java new file mode 100644 index 0000000..ccd79df --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -0,0 +1,378 @@ +package com.uber.marmaray.examples.job; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.google.common.base.Optional; +import com.uber.marmaray.common.configuration.*; +import com.uber.marmaray.common.converters.data.*; +import com.uber.marmaray.common.exceptions.JobRuntimeException; +import com.uber.marmaray.common.job.JobDag; +import com.uber.marmaray.common.job.JobManager; +import com.uber.marmaray.common.metadata.HoodieBasedMetadataManager; +import com.uber.marmaray.common.metadata.IMetadataManager; +import com.uber.marmaray.common.metrics.DataFeedMetricNames; +import com.uber.marmaray.common.metrics.DataFeedMetrics; +import com.uber.marmaray.common.metrics.ErrorCauseTagNames; +import com.uber.marmaray.common.metrics.JobMetricNames; +import com.uber.marmaray.common.metrics.JobMetrics; +import com.uber.marmaray.common.metrics.LongMetric; +import com.uber.marmaray.common.metrics.ModuleTagNames; +import com.uber.marmaray.common.metrics.TimerMetric; +import com.uber.marmaray.common.reporters.ConsoleReporter; +import com.uber.marmaray.common.reporters.Reporters; +import com.uber.marmaray.common.sinks.hoodie.HoodieSink; +import com.uber.marmaray.common.sources.ISource; +import com.uber.marmaray.common.sources.IWorkUnitCalculator; +import com.uber.marmaray.common.sources.kafka.KafkaSource; +import com.uber.marmaray.common.sources.kafka.KafkaWorkUnitCalculator; +import com.uber.marmaray.common.spark.SparkArgs; +import com.uber.marmaray.common.spark.SparkFactory; +import com.uber.marmaray.utilities.SparkUtil; +import com.uber.marmaray.utilities.ErrorExtractor; +import com.uber.marmaray.utilities.FSUtils; +import com.uber.marmaray.utilities.JobUtil; +import com.uber.marmaray.utilities.listener.TimeoutManager; +import lombok.Getter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.hibernate.validator.constraints.NotEmpty; +import parquet.Preconditions; + +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.uber.marmaray.common.AvroPayload; +import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.converters.data.HoodieSinkDataConverter; +import com.uber.marmaray.common.exceptions.InvalidDataException; +import com.uber.marmaray.common.schema.ISchemaService; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.*; +import java.io.Serializable; + + +class KafkaSchemaServiceReader implements ISchemaService.ISchemaServiceReader, Serializable { + + private final String schemaString; + private transient Schema schema; + + KafkaSchemaServiceReader(@NotEmpty final Schema schema) { + this.schemaString = schema.toString(); + this.schema = schema; + } + + private Schema getSchema() { + if (this.schema == null) { + this.schema = new Schema.Parser().parse(this.schemaString); + } + return this.schema; + } + + @Override + public GenericRecord read(final byte[] buffer) throws InvalidDataException { + final DatumReader datumReader = new GenericDatumReader<>(getSchema()); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(buffer, null); + try { + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new InvalidDataException("Error decoding data", e); + } + + + // JSON reader +// DatumReader reader = new GenericDatumReader<>(this.getSchema()); +// +// try { +// JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(this.getSchema(), new String(buffer)); +// return reader.read(null, jsonDecoder); +// } catch (IOException e) { +// throw new InvalidDataException("Error decoding data", e); +// } + } +} + +class CustomHoodieSinkDataConverter extends HoodieSinkDataConverter { + CustomHoodieSinkDataConverter(Configuration conf, ErrorExtractor errorExtractor) { + super(conf, errorExtractor); + } + + @Override + protected String getRecordKey(AvroPayload avroPayload) { + return "Region"; + } + + @Override + protected String getPartitionPath(AvroPayload avroPayload) { + return "test"; + } +} + + + +/** + * Job to load data from kafka to hoodie + */ +@Slf4j +public class KafkaToHoodieJob { + + /** + * Generic entry point + * + * @param args arguments for the job, from the command line + * @throws IOException + */ + public static void main(final String[] args) throws IOException { + new KafkaToHoodieJob().run(args); + } + + /** + * Main execution method for the job. + * + * @param args command line arguments + * @throws IOException + */ + private void run(final String[] args) throws IOException { +// final String schema = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"Record\", \"fields\": [{\"name\": \"Region\", \"type\": \"string\"}, {\"name\": \"Country\", \"type\": \"string\"}] }"; +// final Schema schemaObj = new org.apache.avro.Schema.Parser().parse(schema); +// final GenericData.Record record = new GenericData.Record(schemaObj); +// record.put("Region", "Sub-Saharan Africa"); +// record.put("Country", "Chad"); +// +// final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); +// final GenericDatumWriter datumWriter = new GenericDatumWriter(schemaObj); +// final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); +// +// datumWriter.write(record, encoder); +// encoder.flush(); +// +// final String recordString = new String(outputStream.toByteArray()); + + final Instant jobStartTime = Instant.now(); + + final Configuration conf = getConfiguration(args); + + final Reporters reporters = new Reporters(); + reporters.addReporter(new ConsoleReporter()); + + final Map metricTags = Collections.emptyMap(); + final DataFeedMetrics dataFeedMetrics = new DataFeedMetrics("kafka to hoodie ingestion", metricTags); + + log.info("Initializing configurations for job"); + final TimerMetric confInitMetric = new TimerMetric(DataFeedMetricNames.INIT_CONFIG_LATENCY_MS, + metricTags); + + final KafkaSourceConfiguration kafkaSourceConf; + final HoodieConfiguration hoodieConf; + try { + kafkaSourceConf = new KafkaSourceConfiguration(conf); + hoodieConf = new HoodieConfiguration(conf, "test_hoodie"); + } catch (final Exception e) { + final LongMetric configError = new LongMetric(DataFeedMetricNames.DISPERSAL_CONFIGURATION_INIT_ERRORS, 1); + configError.addTags(metricTags); + configError.addTags(DataFeedMetricNames + .getErrorModuleCauseTags(ModuleTagNames.CONFIGURATION, ErrorCauseTagNames.CONFIG_ERROR)); + reporters.report(configError); + reporters.getReporters().forEach(dataFeedMetrics::gauageFailureMetric); + throw e; + } + confInitMetric.stop(); + reporters.report(confInitMetric); + + log.info("Reading schema"); + final TimerMetric convertSchemaLatencyMs = + new TimerMetric(DataFeedMetricNames.CONVERT_SCHEMA_LATENCY_MS, metricTags); + +// final StructType inputSchema = DataTypes.createStructType(new StructField[]{ +// DataTypes.createStructField("Region", DataTypes.StringType, true), +// DataTypes.createStructField("Country", DataTypes.StringType, true) +// }); +// +// final DataFrameSchemaConverter schemaConverter = new DataFrameSchemaConverter(); +// final Schema outputSchema = schemaConverter.convertToCommonSchema(inputSchema); + + final String schema = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"Record\", \"fields\": [{\"name\": \"Region\", \"type\": \"string\"}, {\"name\": \"Country\", \"type\": \"string\"}] }"; + final Schema outputSchema = new org.apache.avro.Schema.Parser().parse(schema); + convertSchemaLatencyMs.stop(); + reporters.report(convertSchemaLatencyMs); + + final SparkArgs sparkArgs = new SparkArgs( + Arrays.asList(outputSchema), + SparkUtil.getSerializationClasses(), + conf); + final SparkFactory sparkFactory = new SparkFactory(sparkArgs); + final JobManager jobManager = JobManager.createJobManager(conf, "marmaray", + "frequency", sparkFactory, reporters); + + final JavaSparkContext jsc = sparkFactory.getSparkContext(); + + log.info("Initializing metadata manager for job"); + final TimerMetric metadataManagerInitMetric = + new TimerMetric(DataFeedMetricNames.INIT_METADATAMANAGER_LATENCY_MS, metricTags); + final IMetadataManager metadataManager; + try { + metadataManager = initMetadataManager(hoodieConf, jsc); + } catch (final JobRuntimeException e) { + final LongMetric configError = new LongMetric(DataFeedMetricNames.DISPERSAL_CONFIGURATION_INIT_ERRORS, 1); + configError.addTags(metricTags); + configError.addTags(DataFeedMetricNames + .getErrorModuleCauseTags(ModuleTagNames.METADATA_MANAGER, ErrorCauseTagNames.CONFIG_ERROR)); + reporters.report(configError); + reporters.getReporters().forEach(dataFeedMetrics::gauageFailureMetric); + throw e; + } + metadataManagerInitMetric.stop(); + reporters.report(metadataManagerInitMetric); + + try { + log.info("Initializing converters & schemas for job"); + final SQLContext sqlContext = SQLContext.getOrCreate(jsc.sc()); + + log.info("Common schema is: {}", outputSchema.toString()); + + // Schema + log.info("Initializing source data converter"); + KafkaSchemaServiceReader serviceReader = new KafkaSchemaServiceReader(outputSchema); + final KafkaSourceDataConverter dataConverter = new KafkaSourceDataConverter(serviceReader, conf, new ErrorExtractor()); + + log.info("Initializing source & sink for job"); + final ISource kafkaSource = new KafkaSource(kafkaSourceConf, Optional.of(jsc), dataConverter, Optional.absent(), Optional.absent()); + + // Sink + HoodieSinkDataConverter hoodieSinkDataConverter = new CustomHoodieSinkDataConverter(conf, new ErrorExtractor()); + HoodieSink hoodieSink = new HoodieSink(hoodieConf, hoodieSinkDataConverter, jsc, HoodieSink.HoodieSinkOp.INSERT, metadataManager, Optional.absent()); + + log.info("Initializing work unit calculator for job"); + final IWorkUnitCalculator workUnitCalculator = new KafkaWorkUnitCalculator(kafkaSourceConf); + + log.info("Initializing job dag"); + final JobDag jobDag = new JobDag(kafkaSource, hoodieSink, metadataManager, workUnitCalculator, + "test", "test", new JobMetrics("marmaray"), dataFeedMetrics, + reporters); + + jobManager.addJobDag(jobDag); + + log.info("Running dispersal job"); + try { + jobManager.run(); + JobUtil.raiseExceptionIfStatusFailed(jobManager.getJobManagerStatus()); + } catch (final Throwable t) { + if (TimeoutManager.getTimedOut()) { + final LongMetric runTimeError = new LongMetric(DataFeedMetricNames.MARMARAY_JOB_ERROR, 1); + runTimeError.addTags(metricTags); + runTimeError.addTags(DataFeedMetricNames.getErrorModuleCauseTags( + ModuleTagNames.JOB_MANAGER, ErrorCauseTagNames.TIME_OUT)); + reporters.report(runTimeError); + } + final LongMetric configError = new LongMetric(JobMetricNames.RUN_JOB_ERROR_COUNT, 1); + configError.addTags(metricTags); + reporters.report(configError); + throw t; + } + log.info("Dispersal job has been completed"); + + final TimerMetric jobLatencyMetric = + new TimerMetric(JobMetricNames.RUN_JOB_DAG_LATENCY_MS, metricTags, jobStartTime); + jobLatencyMetric.stop(); + reporters.report(jobLatencyMetric); + reporters.finish(); + } finally { + jsc.stop(); + } + } + + /** + * Get configuration from command line + * + * @param args command line arguments passed in + * @return configuration populated from them + */ + private Configuration getConfiguration(@NotEmpty final String[] args) { + final KafkaToHoodieCommandLineOptions options = new KafkaToHoodieCommandLineOptions(args); + if (options.getConfFile() != null) { + return getFileConfiguration(options.getConfFile()); + } else if (options.getJsonConf() != null) { + return getJsonConfiguration(options.getJsonConf()); + } else { + throw new JobRuntimeException("Unable to find conf; this shouldn't be possible"); + } + } + + /** + * Get configuration from JSON-based configuration + * + * @param jsonConf JSON string of configuration + * @return configuration populated from it + */ + private Configuration getJsonConfiguration(@NotEmpty final String jsonConf) { + final Configuration conf = new Configuration(); + conf.loadYamlStream(IOUtils.toInputStream(jsonConf), Optional.absent()); + return conf; + } + + /** + * Load configuration from a file on HDFS + * + * @param filePath path to the HDFS file to load + * @return configuration populated from it + */ + private Configuration getFileConfiguration(@NotEmpty final String filePath) { + final Configuration conf = new Configuration(); + try { + final FileSystem fs = FSUtils.getFs(conf, Optional.absent()); + final Path dataFeedConfFile = new Path(filePath); + log.info("Loading configuration from {}", dataFeedConfFile.toString()); + conf.loadYamlStream(fs.open(dataFeedConfFile), Optional.absent()); + } catch (IOException e) { + final String errorMessage = String.format("Unable to find configuration for %s", filePath); + log.error(errorMessage); + throw new JobRuntimeException(errorMessage, e); + } + return conf; + + } + + /** + * Initialize the metadata store system + * + * @param conf configuration to use + * @param jsc Java spark context + * @return metadata manager + */ + private static IMetadataManager initMetadataManager(@NonNull final HoodieConfiguration conf, @NonNull final JavaSparkContext jsc) { + log.info("Create metadata manager"); + try { + return new HoodieBasedMetadataManager(conf, new AtomicBoolean(true), jsc); + } catch (IOException e) { + throw new JobRuntimeException("Unable to create metadata manager", e); + } + } + + private static final class KafkaToHoodieCommandLineOptions { + @Getter + @Parameter(names = {"--configurationFile", "-c"}, description = "path to configuration file") + private String confFile; + + @Getter + @Parameter(names = {"--jsonConfiguration", "-j"}, description = "json configuration") + private String jsonConf; + + private KafkaToHoodieCommandLineOptions(@NonNull final String[] args) { + final JCommander commander = new JCommander(this); + commander.parse(args); + Preconditions.checkState(this.confFile != null || this.jsonConf != null, + "One of jsonConfiguration or configurationFile must be specified"); + } + } + +} From 793977d108dabc616bad9260ec8505ced518482d Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 18:43:11 +0530 Subject: [PATCH 03/11] Add kafka schema service reader classes --- .../AbstractKafkaSchemaServiceReader.java | 28 +++++++++++ .../kafka/KafkaSchemaAvroServiceReader.java | 28 +++++++++++ .../examples/job/KafkaToHoodieJob.java | 49 +------------------ 3 files changed, 58 insertions(+), 47 deletions(-) create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/AbstractKafkaSchemaServiceReader.java create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaAvroServiceReader.java diff --git a/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/AbstractKafkaSchemaServiceReader.java b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/AbstractKafkaSchemaServiceReader.java new file mode 100644 index 0000000..9e30115 --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/AbstractKafkaSchemaServiceReader.java @@ -0,0 +1,28 @@ +package com.uber.marmaray.common.schema.kafka; + + +import com.uber.marmaray.common.schema.ISchemaService; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.hibernate.validator.constraints.NotEmpty; + +import java.io.Serializable; + +@Slf4j +public abstract class AbstractKafkaSchemaServiceReader implements ISchemaService.ISchemaServiceReader, Serializable { + private final String schemaString; + private transient Schema schema; + + AbstractKafkaSchemaServiceReader(@NotEmpty final Schema schema) { + this.schemaString = schema.toString(); + this.schema = schema; + log.info("Kafka Schema service reader initialised with schema {}", schemaString); + } + + Schema getSchema() { + if (this.schema == null) { + this.schema = new Schema.Parser().parse(this.schemaString); + } + return this.schema; + } +} \ No newline at end of file diff --git a/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaAvroServiceReader.java b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaAvroServiceReader.java new file mode 100644 index 0000000..dd94e9a --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaAvroServiceReader.java @@ -0,0 +1,28 @@ +package com.uber.marmaray.common.schema.kafka; + +import com.uber.marmaray.common.exceptions.InvalidDataException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +import java.io.IOException; + +public class KafkaSchemaAvroServiceReader extends AbstractKafkaSchemaServiceReader { + public KafkaSchemaAvroServiceReader(Schema schema) { + super(schema); + } + + @Override + public GenericRecord read(byte[] buffer) throws InvalidDataException { + final DatumReader datumReader = new GenericDatumReader<>(getSchema()); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(buffer, null); + try { + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new InvalidDataException("Error decoding data", e); + } + } +} diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index ccd79df..7ffb5ed 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -20,6 +20,7 @@ import com.uber.marmaray.common.metrics.TimerMetric; import com.uber.marmaray.common.reporters.ConsoleReporter; import com.uber.marmaray.common.reporters.Reporters; +import com.uber.marmaray.common.schema.kafka.KafkaSchemaAvroServiceReader; import com.uber.marmaray.common.sinks.hoodie.HoodieSink; import com.uber.marmaray.common.sources.ISource; import com.uber.marmaray.common.sources.IWorkUnitCalculator; @@ -36,7 +37,6 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,53 +55,8 @@ import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; import com.uber.marmaray.common.converters.data.HoodieSinkDataConverter; -import com.uber.marmaray.common.exceptions.InvalidDataException; -import com.uber.marmaray.common.schema.ISchemaService; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.*; -import java.io.Serializable; -class KafkaSchemaServiceReader implements ISchemaService.ISchemaServiceReader, Serializable { - - private final String schemaString; - private transient Schema schema; - - KafkaSchemaServiceReader(@NotEmpty final Schema schema) { - this.schemaString = schema.toString(); - this.schema = schema; - } - - private Schema getSchema() { - if (this.schema == null) { - this.schema = new Schema.Parser().parse(this.schemaString); - } - return this.schema; - } - - @Override - public GenericRecord read(final byte[] buffer) throws InvalidDataException { - final DatumReader datumReader = new GenericDatumReader<>(getSchema()); - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(buffer, null); - try { - return datumReader.read(null, decoder); - } catch (IOException e) { - throw new InvalidDataException("Error decoding data", e); - } - - - // JSON reader -// DatumReader reader = new GenericDatumReader<>(this.getSchema()); -// -// try { -// JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(this.getSchema(), new String(buffer)); -// return reader.read(null, jsonDecoder); -// } catch (IOException e) { -// throw new InvalidDataException("Error decoding data", e); -// } - } -} - class CustomHoodieSinkDataConverter extends HoodieSinkDataConverter { CustomHoodieSinkDataConverter(Configuration conf, ErrorExtractor errorExtractor) { super(conf, errorExtractor); @@ -242,7 +197,7 @@ private void run(final String[] args) throws IOException { // Schema log.info("Initializing source data converter"); - KafkaSchemaServiceReader serviceReader = new KafkaSchemaServiceReader(outputSchema); + KafkaSchemaAvroServiceReader serviceReader = new KafkaSchemaAvroServiceReader(outputSchema); final KafkaSourceDataConverter dataConverter = new KafkaSourceDataConverter(serviceReader, conf, new ErrorExtractor()); log.info("Initializing source & sink for job"); From e0dfc7bd9842c169d7aabf57c85772f38749bc7e Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 19:14:55 +0530 Subject: [PATCH 04/11] Add Kafka JSON data reader --- .../kafka/KafkaSchemaJSONServiceReader.java | 28 +++++++++++++++++++ .../examples/job/KafkaToHoodieJob.java | 3 +- 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaJSONServiceReader.java diff --git a/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaJSONServiceReader.java b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaJSONServiceReader.java new file mode 100644 index 0000000..65603bb --- /dev/null +++ b/marmaray/src/main/java/com/uber/marmaray/common/schema/kafka/KafkaSchemaJSONServiceReader.java @@ -0,0 +1,28 @@ +package com.uber.marmaray.common.schema.kafka; + +import com.uber.marmaray.common.exceptions.InvalidDataException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.JsonDecoder; + +import java.io.IOException; + +public class KafkaSchemaJSONServiceReader extends AbstractKafkaSchemaServiceReader { + public KafkaSchemaJSONServiceReader(Schema schema) { + super(schema); + } + + @Override + public GenericRecord read(byte[] buffer) throws InvalidDataException { + final DatumReader datumReader = new GenericDatumReader<>(getSchema()); + try { + JsonDecoder decoder = DecoderFactory.get().jsonDecoder(getSchema(), new String(buffer)); + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new InvalidDataException("Error decoding data", e); + } + } +} diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index 7ffb5ed..77223b6 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -21,6 +21,7 @@ import com.uber.marmaray.common.reporters.ConsoleReporter; import com.uber.marmaray.common.reporters.Reporters; import com.uber.marmaray.common.schema.kafka.KafkaSchemaAvroServiceReader; +import com.uber.marmaray.common.schema.kafka.KafkaSchemaJSONServiceReader; import com.uber.marmaray.common.sinks.hoodie.HoodieSink; import com.uber.marmaray.common.sources.ISource; import com.uber.marmaray.common.sources.IWorkUnitCalculator; @@ -197,7 +198,7 @@ private void run(final String[] args) throws IOException { // Schema log.info("Initializing source data converter"); - KafkaSchemaAvroServiceReader serviceReader = new KafkaSchemaAvroServiceReader(outputSchema); + KafkaSchemaJSONServiceReader serviceReader = new KafkaSchemaJSONServiceReader(outputSchema); final KafkaSourceDataConverter dataConverter = new KafkaSourceDataConverter(serviceReader, conf, new ErrorExtractor()); log.info("Initializing source & sink for job"); From f24bee68bdd90ee312e2faba470d22e3df977099 Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 19:26:46 +0530 Subject: [PATCH 05/11] Cleanup KafkaToHoodieJob --- .../examples/job/KafkaToHoodieJob.java | 29 ++----------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index 77223b6..173e42b 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -20,7 +20,6 @@ import com.uber.marmaray.common.metrics.TimerMetric; import com.uber.marmaray.common.reporters.ConsoleReporter; import com.uber.marmaray.common.reporters.Reporters; -import com.uber.marmaray.common.schema.kafka.KafkaSchemaAvroServiceReader; import com.uber.marmaray.common.schema.kafka.KafkaSchemaJSONServiceReader; import com.uber.marmaray.common.sinks.hoodie.HoodieSink; import com.uber.marmaray.common.sources.ISource; @@ -75,7 +74,6 @@ protected String getPartitionPath(AvroPayload avroPayload) { } - /** * Job to load data from kafka to hoodie */ @@ -99,20 +97,6 @@ public static void main(final String[] args) throws IOException { * @throws IOException */ private void run(final String[] args) throws IOException { -// final String schema = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"Record\", \"fields\": [{\"name\": \"Region\", \"type\": \"string\"}, {\"name\": \"Country\", \"type\": \"string\"}] }"; -// final Schema schemaObj = new org.apache.avro.Schema.Parser().parse(schema); -// final GenericData.Record record = new GenericData.Record(schemaObj); -// record.put("Region", "Sub-Saharan Africa"); -// record.put("Country", "Chad"); -// -// final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); -// final GenericDatumWriter datumWriter = new GenericDatumWriter(schemaObj); -// final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); -// -// datumWriter.write(record, encoder); -// encoder.flush(); -// -// final String recordString = new String(outputStream.toByteArray()); final Instant jobStartTime = Instant.now(); @@ -122,7 +106,7 @@ private void run(final String[] args) throws IOException { reporters.addReporter(new ConsoleReporter()); final Map metricTags = Collections.emptyMap(); - final DataFeedMetrics dataFeedMetrics = new DataFeedMetrics("kafka to hoodie ingestion", metricTags); + final DataFeedMetrics dataFeedMetrics = new DataFeedMetrics("KafkaToHoodieJob", metricTags); log.info("Initializing configurations for job"); final TimerMetric confInitMetric = new TimerMetric(DataFeedMetricNames.INIT_CONFIG_LATENCY_MS, @@ -149,16 +133,7 @@ private void run(final String[] args) throws IOException { final TimerMetric convertSchemaLatencyMs = new TimerMetric(DataFeedMetricNames.CONVERT_SCHEMA_LATENCY_MS, metricTags); -// final StructType inputSchema = DataTypes.createStructType(new StructField[]{ -// DataTypes.createStructField("Region", DataTypes.StringType, true), -// DataTypes.createStructField("Country", DataTypes.StringType, true) -// }); -// -// final DataFrameSchemaConverter schemaConverter = new DataFrameSchemaConverter(); -// final Schema outputSchema = schemaConverter.convertToCommonSchema(inputSchema); - - final String schema = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"Record\", \"fields\": [{\"name\": \"Region\", \"type\": \"string\"}, {\"name\": \"Country\", \"type\": \"string\"}] }"; - final Schema outputSchema = new org.apache.avro.Schema.Parser().parse(schema); + final Schema outputSchema = new Schema.Parser().parse(hoodieConf.getHoodieWriteConfig().getSchema()); convertSchemaLatencyMs.stop(); reporters.report(convertSchemaLatencyMs); From 6ae40495518d963026ba8b6d2accff33a3b3bfff Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 19:35:48 +0530 Subject: [PATCH 06/11] Fix warnings --- .../examples/job/KafkaToHoodieJob.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index 173e42b..3cdab5f 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -84,7 +84,7 @@ public class KafkaToHoodieJob { * Generic entry point * * @param args arguments for the job, from the command line - * @throws IOException + * @throws IOException Exception */ public static void main(final String[] args) throws IOException { new KafkaToHoodieJob().run(args); @@ -94,7 +94,7 @@ public static void main(final String[] args) throws IOException { * Main execution method for the job. * * @param args command line arguments - * @throws IOException + * @throws IOException Exception */ private void run(final String[] args) throws IOException { @@ -138,7 +138,7 @@ private void run(final String[] args) throws IOException { reporters.report(convertSchemaLatencyMs); final SparkArgs sparkArgs = new SparkArgs( - Arrays.asList(outputSchema), + Collections.singletonList(outputSchema), SparkUtil.getSerializationClasses(), conf); final SparkFactory sparkFactory = new SparkFactory(sparkArgs); @@ -174,14 +174,18 @@ private void run(final String[] args) throws IOException { // Schema log.info("Initializing source data converter"); KafkaSchemaJSONServiceReader serviceReader = new KafkaSchemaJSONServiceReader(outputSchema); - final KafkaSourceDataConverter dataConverter = new KafkaSourceDataConverter(serviceReader, conf, new ErrorExtractor()); + final KafkaSourceDataConverter dataConverter = new KafkaSourceDataConverter(serviceReader, conf, + new ErrorExtractor()); log.info("Initializing source & sink for job"); - final ISource kafkaSource = new KafkaSource(kafkaSourceConf, Optional.of(jsc), dataConverter, Optional.absent(), Optional.absent()); + final ISource kafkaSource = new KafkaSource(kafkaSourceConf, Optional.of(jsc), dataConverter, + Optional.absent(), Optional.absent()); // Sink - HoodieSinkDataConverter hoodieSinkDataConverter = new CustomHoodieSinkDataConverter(conf, new ErrorExtractor()); - HoodieSink hoodieSink = new HoodieSink(hoodieConf, hoodieSinkDataConverter, jsc, HoodieSink.HoodieSinkOp.INSERT, metadataManager, Optional.absent()); + HoodieSinkDataConverter hoodieSinkDataConverter = new CustomHoodieSinkDataConverter(conf, + new ErrorExtractor()); + HoodieSink hoodieSink = new HoodieSink(hoodieConf, hoodieSinkDataConverter, jsc, + HoodieSink.HoodieSinkOp.INSERT, metadataManager, Optional.absent()); log.info("Initializing work unit calculator for job"); final IWorkUnitCalculator workUnitCalculator = new KafkaWorkUnitCalculator(kafkaSourceConf); @@ -280,7 +284,8 @@ private Configuration getFileConfiguration(@NotEmpty final String filePath) { * @param jsc Java spark context * @return metadata manager */ - private static IMetadataManager initMetadataManager(@NonNull final HoodieConfiguration conf, @NonNull final JavaSparkContext jsc) { + private static IMetadataManager initMetadataManager(@NonNull final HoodieConfiguration conf, + @NonNull final JavaSparkContext jsc) { log.info("Create metadata manager"); try { return new HoodieBasedMetadataManager(conf, new AtomicBoolean(true), jsc); From d0de64ecb64d429ad4e575354990b73aef1eee63 Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 22:41:21 +0530 Subject: [PATCH 07/11] Change how row key and partition paths are being sent to Marmaray --- .../configuration/HoodieConfiguration.java | 34 +++++++ .../data/DummyHoodieSinkDataConverter.java | 5 +- .../data/HoodieSinkDataConverter.java | 38 +++++++- .../data/TSBasedHoodieSinkDataConverter.java | 35 ++----- .../common/sinks/hoodie/HoodieSink.java | 97 +++++++++++-------- .../examples/job/KafkaToHoodieJob.java | 23 +---- .../common/sinks/hoodie/TestHoodieSink.java | 69 +++++++------ 7 files changed, 179 insertions(+), 122 deletions(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java index 171f228..ed0cdde 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java @@ -65,6 +65,16 @@ public class HoodieConfiguration implements Serializable { * Schema for Hoodie dataset */ public static final String HOODIE_AVRO_SCHEMA = HOODIE_COMMON_PROPERTY_PREFIX + "schema"; + + /** + * Record Key for Hoodie dataset + */ + public static final String HOODIE_RECORD_KEY = HOODIE_COMMON_PROPERTY_PREFIX + "record_key"; + + /** + * Partition path for Hoodie dataset + */ + public static final String HOODIE_PARTITION_PATH = HOODIE_COMMON_PROPERTY_PREFIX + "partition_path"; /** * Flag to control whether it should combine before insert */ @@ -250,6 +260,20 @@ public String getTableName() { return this.getConf().getProperty(getTablePropertyKey(HOODIE_TABLE_NAME, this.tableKey)).get(); } + /** + * @return hoodie record key. + */ + public Optional getHoodieRecordKey() { + return this.conf.getProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey)); + } + + /** + * @return hoodie partition path. + */ + public Optional getHoodiePartitionPath() { + return this.conf.getProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey)); + } + /** * @return hoodie metrics prefix. * */ @@ -492,6 +516,16 @@ public Builder withBasePath(@NotEmpty final String basePath) { return this; } + public Builder withRecordKey(@NotEmpty final String recordKey) { + this.conf.setProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey), recordKey); + return this; + } + + public Builder withPartitionPath(@NotEmpty final String partitionPath) { + this.conf.setProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey), partitionPath); + return this; + } + public Builder withSchema(@NotEmpty final String schema) { this.conf.setProperty(getTablePropertyKey(HOODIE_AVRO_SCHEMA, this.tableKey), schema); return this; diff --git a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java index 54224e5..d93d36b 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/DummyHoodieSinkDataConverter.java @@ -19,6 +19,7 @@ import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.utilities.ErrorExtractor; import lombok.NonNull; @@ -29,7 +30,9 @@ */ public class DummyHoodieSinkDataConverter extends HoodieSinkDataConverter { public DummyHoodieSinkDataConverter() { - super(new Configuration(), new ErrorExtractor()); + + super(new Configuration(), new ErrorExtractor(), HoodieConfiguration.newBuilder(new Configuration(), + "test").build()); } @Override diff --git a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java index 265cedd..d7fd8c0 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java @@ -22,7 +22,9 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.converters.converterresult.ConverterResult; +import com.uber.marmaray.common.exceptions.InvalidDataException; import com.uber.marmaray.common.metrics.DataFeedMetrics; import com.uber.marmaray.common.metrics.JobMetrics; import com.uber.marmaray.common.sinks.hoodie.HoodieSink; @@ -35,27 +37,33 @@ import java.util.Collections; import java.util.List; +import com.google.common.base.Optional; + /** * {@link HoodieSinkDataConverter} extends {@link SinkDataConverter} * This class is used by {@link HoodieSink} to generate {@link com.uber.hoodie.common.model.HoodieRecord} from * {@link com.uber.marmaray.common.AvroPayload}. */ -public abstract class HoodieSinkDataConverter extends SinkDataConverter> { +public class HoodieSinkDataConverter extends SinkDataConverter> { // store the schema as a string since Schema doesn't serialize. Used in extended classes. protected String schema; private final ErrorExtractor errorExtractor; + private final HoodieConfiguration hoodieConfiguration; - public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor) { + public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor, + @NonNull final HoodieConfiguration hoodieConfiguration) { super(conf, errorExtractor); this.errorExtractor = errorExtractor; + this.hoodieConfiguration = hoodieConfiguration; } public HoodieSinkDataConverter(@NonNull final Configuration conf, final String schema, - @NonNull final ErrorExtractor errorExtractor) { + @NonNull final ErrorExtractor errorExtractor, HoodieConfiguration hoodieConfiguration) { super(conf, errorExtractor); this.schema = schema; this.errorExtractor = errorExtractor; + this.hoodieConfiguration = hoodieConfiguration; } @Override @@ -82,7 +90,17 @@ protected final List hoodieRecordKey = hoodieConfiguration.getHoodieRecordKey(); + if (hoodieRecordKey.isPresent()) { + final Object recordKeyFieldVal = payload.getData().get(hoodieRecordKey.get()); + if (recordKeyFieldVal == null) { + throw new InvalidDataException("required field is missing:" + hoodieRecordKey.get()); + } + return recordKeyFieldVal.toString(); + } + throw new Exception("Hoodie Record Key missing"); + } /** * The implementation of it should use fields from {@link AvroPayload} to generate partition path which is needed @@ -90,7 +108,17 @@ protected final List hoodiePartitionPath = hoodieConfiguration.getHoodiePartitionPath(); + if (hoodiePartitionPath.isPresent()) { + final Object partitionFieldVal = payload.getData().get(hoodiePartitionPath.get()); + if (partitionFieldVal == null) { + throw new InvalidDataException("required field is missing:" + hoodiePartitionPath.get()); + } + return partitionFieldVal.toString(); + } + throw new Exception("Hoodie Partition Path missing"); + } protected HoodieRecordPayload getPayload(@NonNull final AvroPayload payload) { return new HoodieAvroPayload(java.util.Optional.of(payload.getData())); diff --git a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java index 44bea41..38c0cda 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/TSBasedHoodieSinkDataConverter.java @@ -18,6 +18,7 @@ import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; +import com.uber.marmaray.common.configuration.HoodieConfiguration; import com.uber.marmaray.common.exceptions.InvalidDataException; import com.uber.hoodie.common.model.HoodieKey; import com.uber.marmaray.utilities.HoodieSinkConverterErrorExtractor; @@ -33,47 +34,29 @@ /** * {@link TSBasedHoodieSinkDataConverter} extends {@link HoodieSinkDataConverter} * - * This class generates {@link HoodieKey} from given {@link AvroPayload}. The passed in {@link AvroPayload} requires - * {@link #partitionFieldName} with timestamp in {@link #timeUnit}. + * This class generates partition path from given {@link AvroPayload}. The passed in {@link AvroPayload} requires + * {@link HoodieConfiguration} with timestamp in {@link #timeUnit}. * - * {@link AvroPayload} also requires a {@link #recordKeyFieldName} which should be the primary key for the record. */ @Slf4j public class TSBasedHoodieSinkDataConverter extends HoodieSinkDataConverter { public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat(DATE_PARTITION_FORMAT); - @NotEmpty - private final String recordKeyFieldName; - @NotEmpty - private final String partitionFieldName; + @NonNull private final TimeUnit timeUnit; public TSBasedHoodieSinkDataConverter(@NonNull final Configuration conf, - @NotEmpty final String recordKeyFieldName, @NotEmpty final String partitionFieldName, - @NonNull final TimeUnit timeUnit) { - super(conf, new HoodieSinkConverterErrorExtractor()); - this.recordKeyFieldName = recordKeyFieldName; - this.partitionFieldName = partitionFieldName; + @NonNull final HoodieConfiguration hoodieConfiguration, + @NonNull final TimeUnit timeUnit) { + super(conf, new HoodieSinkConverterErrorExtractor(), hoodieConfiguration); this.timeUnit = timeUnit; } - @Override - protected String getRecordKey(@NonNull final AvroPayload payload) throws Exception { - final Object recordKeyFieldVal = payload.getField(recordKeyFieldName); - if (recordKeyFieldVal == null) { - throw new InvalidDataException("required field is missing:" + recordKeyFieldName); - } - return recordKeyFieldVal.toString(); - } - @Override protected String getPartitionPath(final AvroPayload payload) throws Exception { - final Object partitionFieldVal = payload.getField(partitionFieldName); - if (partitionFieldVal == null) { - throw new InvalidDataException("required field is missing:" + partitionFieldName); - } - final Date date = new Date(this.timeUnit.toMillis((long) Double.parseDouble(partitionFieldVal.toString()))); + String partitionFieldVal = super.getPartitionPath(payload); + final Date date = new Date(this.timeUnit.toMillis((long) Double.parseDouble(partitionFieldVal))); return PARTITION_FORMATTER.format(date); } } diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java index 2d6316a..0b342bb 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java @@ -175,7 +175,7 @@ public void write(@NonNull final RDDWrapper> h final HoodieWriteConfig hoodieWriteConfig = this.hoodieConf.getHoodieWriteConfig(); try (final HoodieWriteClientWrapper hoodieWriteClient = getHoodieWriteClient(hoodieWriteConfig)) { final String commitTime = - this.commitTime.isPresent() ? this.commitTime.get() : hoodieWriteClient.startCommit(); + this.commitTime.isPresent() ? this.commitTime.get() : hoodieWriteClient.startCommit(); // Handle writes to hoodie. It can be an insert or upsert. final HoodieWriteResult result = handleWrite(hoodieWriteClient, hoodieRecords.getData(), commitTime, op); @@ -191,7 +191,7 @@ public void write(@NonNull final RDDWrapper> h protected void initDataset() { try { HoodieUtil.initHoodieDataset(FSUtils.getFs(this.hoodieConf.getConf(), - Optional.of(this.hoodieConf.getBasePath())), this.hoodieConf); + Optional.of(this.hoodieConf.getBasePath())), this.hoodieConf); } catch (IOException e) { log.error("Error initializing hoodie dataset.", e); throw new JobRuntimeException("Could not initialize hoodie dataset", e); @@ -202,6 +202,7 @@ protected void initDataset() { * If {@link HoodieConfiguration#HOODIE_AUTO_TUNE_PARALLELISM} is enabled then it will use * {@link HoodieConfiguration#HOODIE_TARGET_FILE_SIZE} and {@link SinkStatManager#getAvgRecordSize()} to figure * out what should be the optimal insert parallelism. + * * @param numRecords */ public boolean updateInsertParallelism(final long numRecords) { @@ -209,7 +210,7 @@ public boolean updateInsertParallelism(final long numRecords) { final int newParallelism = calculateNewBulkInsertParallelism(numRecords); if (0 < newParallelism) { this.hoodieConf.setTableProperty(HoodieConfiguration.HOODIE_INSERT_PARALLELISM, - Integer.toString(newParallelism)); + Integer.toString(newParallelism)); log.info("new hoodie insert parallelism is set to :{}", newParallelism); return true; } @@ -221,6 +222,7 @@ public boolean updateInsertParallelism(final long numRecords) { * If {@link HoodieConfiguration#HOODIE_AUTO_TUNE_PARALLELISM} is enabled then it will use * {@link HoodieConfiguration#HOODIE_TARGET_FILE_SIZE} and {@link SinkStatManager#getAvgRecordSize()} to figure * out what should be the optimal bulk insert parallelism. + * * @param numRecords */ public boolean updateBulkInsertParallelism(final long numRecords) { @@ -228,7 +230,7 @@ public boolean updateBulkInsertParallelism(final long numRecords) { final int newParallelism = calculateNewBulkInsertParallelism(numRecords); if (0 < newParallelism) { this.hoodieConf.setTableProperty(HoodieConfiguration.HOODIE_BULKINSERT_PARALLELISM, - Integer.toString(newParallelism)); + Integer.toString(newParallelism)); log.info("new hoodie bulk insert parallelism is set to :{}", newParallelism); return true; } @@ -244,7 +246,7 @@ protected int calculateNewBulkInsertParallelism(final long numRecords) { final int currentParallelism = this.hoodieConf.getBulkInsertParallelism(); log.info( "StatsManager:targetFileSize:{}:avgRecordSize:{}:numRecords:{}:" - + "newBulkInsertParallelism:{}:currentBulkInsertParallelism:{}", + + "newBulkInsertParallelism:{}:currentBulkInsertParallelism:{}", targetFileSize, avgRecordSize, numRecords, newParallelism, currentParallelism); return newParallelism; } @@ -252,8 +254,8 @@ protected int calculateNewBulkInsertParallelism(final long numRecords) { @VisibleForTesting protected HoodieWriteClientWrapper getHoodieWriteClient(@NonNull final HoodieWriteConfig hoodieWriteConfig) { final HoodieWriteClient hoodieWriteClient = - new HoodieWriteClient(this.jsc, hoodieWriteConfig, - this.hoodieConf.shouldRollbackInFlight()); + new HoodieWriteClient(this.jsc, hoodieWriteConfig, + this.hoodieConf.shouldRollbackInFlight()); return new HoodieWriteClientWrapper(hoodieWriteClient, this.bulkInsertPartitioner); } @@ -262,15 +264,15 @@ protected HoodieWriteClientWrapper getHoodieWriteClient(@NonNull final HoodieWri * {@link HoodieBasedMetadataManager#shouldSaveChanges()} flag. */ public void commit(@NonNull final HoodieWriteClientWrapper hoodieWriteClient, - @NotEmpty final String commitTime, - @NonNull final Optional> writesStatuses) { + @NotEmpty final String commitTime, + @NonNull final Optional> writesStatuses) { this.commit(hoodieWriteClient, commitTime, writesStatuses, this.shouldSaveChangesInFuture); } public void commit(@NonNull final HoodieWriteClientWrapper hoodieWriteClient, - @NotEmpty final String commitTime, - @NonNull final Optional> writesStatuses, - final boolean shouldSaveChangesInFuture) { + @NotEmpty final String commitTime, + @NonNull final Optional> writesStatuses, + final boolean shouldSaveChangesInFuture) { updateSinkStat(writesStatuses); logWriteMetrics(writesStatuses); @@ -328,9 +330,9 @@ private void logWriteMetrics(final Optional> writesStatuses final LongAccumulator totalCount = writesStatuses.get().rdd().sparkContext().longAccumulator(); final LongAccumulator errorCount = writesStatuses.get().rdd().sparkContext().longAccumulator(); writesStatuses.get().foreach(writeStatus -> { - errorCount.add(writeStatus.getFailedRecords().size()); - totalCount.add(writeStatus.getTotalRecords()); - }); + errorCount.add(writeStatus.getFailedRecords().size()); + totalCount.add(writeStatus.getTotalRecords()); + }); this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.ERROR_ROWCOUNT, errorCount.value(), this.dataFeedMetricsTags); this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.OUTPUT_ROWCOUNT, @@ -341,6 +343,7 @@ private void logWriteMetrics(final Optional> writesStatuses /** * {@link #updateSinkStat(Optional)} will compute {@link SinkStat} and persist changes into {@link IMetadataManager}. * As a part of {@link SinkStat} computation; it will compute avg record size for current run. + * * @param writesStatuses */ private void updateSinkStat(final Optional> writesStatuses) { @@ -349,16 +352,16 @@ private void updateSinkStat(final Optional> writesStatuses) final LongAccumulator fileCount = writesStatuses.get().rdd().sparkContext().longAccumulator(); final LongAccumulator totalSize = writesStatuses.get().rdd().sparkContext().longAccumulator(); writesStatuses.get().foreach( - writeStatus -> { - final long writeBytes = writeStatus.getStat().getTotalWriteBytes(); - final long numInserts = writeStatus.getStat().getNumWrites() - - writeStatus.getStat().getNumUpdateWrites(); - if (writeBytes > 0 && numInserts > 0) { - avgRecordSizeCounter.add(writeBytes / numInserts); + writeStatus -> { + final long writeBytes = writeStatus.getStat().getTotalWriteBytes(); + final long numInserts = writeStatus.getStat().getNumWrites() + - writeStatus.getStat().getNumUpdateWrites(); + if (writeBytes > 0 && numInserts > 0) { + avgRecordSizeCounter.add(writeBytes / numInserts); + } + fileCount.add(1); + totalSize.add(writeBytes); } - fileCount.add(1); - totalSize.add(writeBytes); - } ); final long avgRecordSize = (int) avgRecordSizeCounter.avg(); if (avgRecordSize > 0) { @@ -367,9 +370,9 @@ private void updateSinkStat(final Optional> writesStatuses) } if (this.dataFeedMetrics.isPresent()) { this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.TOTAL_FILE_COUNT, fileCount.value(), - this.dataFeedMetricsTags); + this.dataFeedMetricsTags); this.dataFeedMetrics.get().createLongMetric(DataFeedMetricNames.TOTAL_WRITE_SIZE, totalSize.value(), - this.dataFeedMetricsTags); + this.dataFeedMetricsTags); } } this.sinkStatMgr.persist(); @@ -444,7 +447,7 @@ public HoodieWriteResult handleWrite( } private JavaRDD> dedupRecords(@NonNull final HoodieWriteClientWrapper writeClient, - @NonNull final JavaRDD> hoodieRecords) { + @NonNull final JavaRDD> hoodieRecords) { return writeClient.filterExists(hoodieRecords).persist(StorageLevel.DISK_ONLY()); } @@ -454,11 +457,11 @@ private JavaRDD> dedupRecords(@NonNull final H * see {@link UserDefinedBulkInsertPartitioner}. */ public static UserDefinedBulkInsertPartitioner getDataPartitioner(@NonNull final HoodieConfiguration hoodieConf, - @NonNull final Optional defaultDataPartitioner) { + @NonNull final Optional defaultDataPartitioner) { try { return (UserDefinedBulkInsertPartitioner) Class.forName(hoodieConf.getHoodieDataPartitioner( - defaultDataPartitioner.isPresent() ? defaultDataPartitioner.get() - : DefaultHoodieDataPartitioner.class.getName())).newInstance(); + defaultDataPartitioner.isPresent() ? defaultDataPartitioner.get() + : DefaultHoodieDataPartitioner.class.getName())).newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) { throw new JobRuntimeException("exception in initializing data partitioner", e); } @@ -485,22 +488,22 @@ public void startCommitWithTime(@NotEmpty final String commitTime) { } public boolean commit(@NotEmpty final String commitTime, @NonNull final JavaRDD writeStatuses, - final java.util.Optional> extraMetadata) { + final java.util.Optional> extraMetadata) { return this.hoodieWriteClient.commit(commitTime, writeStatuses, extraMetadata); } public JavaRDD insert(@NonNull final JavaRDD> records, - @NotEmpty final String commitTime) { + @NotEmpty final String commitTime) { return this.hoodieWriteClient.insert(records, commitTime); } public JavaRDD bulkInsert(@NonNull final JavaRDD> records, - @NotEmpty final String commitTime) { + @NotEmpty final String commitTime) { return this.hoodieWriteClient.bulkInsert(records, commitTime, Option.apply(this.bulkInsertPartitioner)); } public JavaRDD upsert(@NonNull final JavaRDD> records, - @NotEmpty final String commitTime) { + @NotEmpty final String commitTime) { return this.hoodieWriteClient.upsert(records, commitTime); } @@ -522,7 +525,7 @@ public void close() { } public JavaRDD> filterExists( - final JavaRDD> hoodieRecords) { + final JavaRDD> hoodieRecords) { return this.hoodieWriteClient.filterExists(hoodieRecords); } } @@ -531,17 +534,29 @@ public JavaRDD> filterExists( * Supported hoodie write operations. */ public enum HoodieSinkOp { - /** {@link HoodieWriteClient#insert(JavaRDD, String)}*/ + /** + * {@link HoodieWriteClient#insert(JavaRDD, String)} + */ INSERT, - /** {@link HoodieWriteClient#bulkInsert(JavaRDD, String)}*/ + /** + * {@link HoodieWriteClient#bulkInsert(JavaRDD, String)} + */ BULK_INSERT, - /** {@link HoodieWriteClient#insert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)}*/ + /** + * {@link HoodieWriteClient#insert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)} + */ DEDUP_INSERT, - /** {@link HoodieWriteClient#bulkInsert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)}*/ + /** + * {@link HoodieWriteClient#bulkInsert(JavaRDD, String)} {@link HoodieWriteClient#filterExists(JavaRDD)} + */ DEDUP_BULK_INSERT, - /** {@link com.uber.hoodie.HoodieWriteClient#upsert(org.apache.spark.api.java.JavaRDD, java.lang.String)}*/ + /** + * {@link com.uber.hoodie.HoodieWriteClient#upsert(org.apache.spark.api.java.JavaRDD, java.lang.String)} + */ UPSERT, - /** No operation */ + /** + * No operation + */ NO_OP } diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index 3cdab5f..11e04cf 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -47,33 +47,14 @@ import java.io.IOException; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import com.uber.marmaray.common.AvroPayload; import com.uber.marmaray.common.configuration.Configuration; import com.uber.marmaray.common.converters.data.HoodieSinkDataConverter; -class CustomHoodieSinkDataConverter extends HoodieSinkDataConverter { - CustomHoodieSinkDataConverter(Configuration conf, ErrorExtractor errorExtractor) { - super(conf, errorExtractor); - } - - @Override - protected String getRecordKey(AvroPayload avroPayload) { - return "Region"; - } - - @Override - protected String getPartitionPath(AvroPayload avroPayload) { - return "test"; - } -} - - /** * Job to load data from kafka to hoodie */ @@ -182,8 +163,8 @@ private void run(final String[] args) throws IOException { Optional.absent(), Optional.absent()); // Sink - HoodieSinkDataConverter hoodieSinkDataConverter = new CustomHoodieSinkDataConverter(conf, - new ErrorExtractor()); + HoodieSinkDataConverter hoodieSinkDataConverter = new HoodieSinkDataConverter(conf, new ErrorExtractor(), + hoodieConf); HoodieSink hoodieSink = new HoodieSink(hoodieConf, hoodieSinkDataConverter, jsc, HoodieSink.HoodieSinkOp.INSERT, metadataManager, Optional.absent()); diff --git a/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java b/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java index 3a0076d..f047840 100644 --- a/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java +++ b/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java @@ -156,11 +156,13 @@ public void testHoodieSinkWriteInsertWithoutMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); @@ -201,11 +203,13 @@ public void testHoodieSinkWriteUpsertWithoutMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), UPSERT); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); @@ -243,11 +247,12 @@ public void testHoodieSinkWriteInsertWithMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final HoodieBasedMetadataManager hoodieBasedMetadataManager = new HoodieBasedMetadataManager(hoodieConf, new AtomicBoolean(true), this.jsc.get()); hoodieBasedMetadataManager.set("randomKey", new StringValue("randomValue")); @@ -291,11 +296,12 @@ public void testHoodieSinkWriteUpsertWithMetadata() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final HoodieBasedMetadataManager hoodieBasedMetadataManager = new HoodieBasedMetadataManager(hoodieConf, new AtomicBoolean(true), this.jsc.get()); hoodieBasedMetadataManager.set("randomKey", new StringValue("randomValue")); @@ -339,12 +345,13 @@ public void testHoodieSinkWriteDedupeInsert() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr) - .withCombineBeforeInsert(true).withCombineBeforeUpsert(true).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withCombineBeforeInsert(true) + .withCombineBeforeUpsert(true).withRecordKey(RECORD_KEY).withPartitionPath(TS_KEY) + .enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); @@ -400,11 +407,14 @@ public void testHoodieSinkMetrics() throws IOException { final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); final String brokenSchemaStr = getSchema(TS_KEY, RECORD_KEY, 0, 0).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); + final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); @@ -444,11 +454,14 @@ public void testUserDefinedCommitTime() throws IOException { final String basePath = FileTestUtil.getTempFolder(); final String tableName = "test-table"; final String schemaStr = getSchema(TS_KEY, RECORD_KEY, 4, 8).toString(); - final HoodieSinkDataConverter hoodieKeyGenerator = - new TSBasedHoodieSinkDataConverter(this.conf, RECORD_KEY, TS_KEY, TimeUnit.MILLISECONDS); + final HoodieConfiguration hoodieConf = - HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") + .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) + .withPartitionPath(TS_KEY).enableMetrics(false).build(); + final HoodieSinkDataConverter hoodieKeyGenerator = + new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); + final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); From 1d7d5fbf1f0c4b1cef59402695e7b65381b097d8 Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 22:44:31 +0530 Subject: [PATCH 08/11] Fix lint --- .../common/converters/data/HoodieSinkDataConverter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java index d7fd8c0..c6a5824 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/converters/data/HoodieSinkDataConverter.java @@ -59,7 +59,8 @@ public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final } public HoodieSinkDataConverter(@NonNull final Configuration conf, final String schema, - @NonNull final ErrorExtractor errorExtractor, HoodieConfiguration hoodieConfiguration) { + @NonNull final ErrorExtractor errorExtractor, + HoodieConfiguration hoodieConfiguration) { super(conf, errorExtractor); this.schema = schema; this.errorExtractor = errorExtractor; From 2aabd8e57a9eb287a6c2057cbc0092a7813c4b7e Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 23:16:10 +0530 Subject: [PATCH 09/11] Move hoodie sink op to configuration --- .../configuration/HoodieConfiguration.java | 23 ++++++++ .../common/sinks/hoodie/HoodieErrorSink.java | 3 +- .../common/sinks/hoodie/HoodieSink.java | 6 +- .../examples/job/KafkaToHoodieJob.java | 4 +- .../marmaray/utilities/ErrorTableUtil.java | 3 +- .../common/sinks/hoodie/TestHoodieSink.java | 56 +++++++++---------- 6 files changed, 55 insertions(+), 40 deletions(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java index ed0cdde..0298851 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java @@ -27,6 +27,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.marmaray.common.exceptions.JobRuntimeException; import com.uber.marmaray.common.exceptions.MissingPropertyException; +import com.uber.marmaray.common.sinks.hoodie.HoodieSink; import com.uber.marmaray.utilities.ConfigUtil; import com.uber.marmaray.utilities.StringTypes; import lombok.Getter; @@ -75,6 +76,12 @@ public class HoodieConfiguration implements Serializable { * Partition path for Hoodie dataset */ public static final String HOODIE_PARTITION_PATH = HOODIE_COMMON_PROPERTY_PREFIX + "partition_path"; + + /** + * Partition path for Hoodie dataset + */ + public static final String HOODIE_SINK_OP = HOODIE_COMMON_PROPERTY_PREFIX + "sink_op"; + /** * Flag to control whether it should combine before insert */ @@ -274,6 +281,17 @@ public Optional getHoodiePartitionPath() { return this.conf.getProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey)); } + /** + * @return hoodie sink operation + */ + public HoodieSink.HoodieSinkOp getHoodieSinkOp() { + Optional sinkOp = this.conf.getProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey)); + if (sinkOp.isPresent()) { + return HoodieSink.HoodieSinkOp.valueOf(sinkOp.get().toUpperCase()); + } + return HoodieSink.HoodieSinkOp.BULK_INSERT; + } + /** * @return hoodie metrics prefix. * */ @@ -531,6 +549,11 @@ public Builder withSchema(@NotEmpty final String schema) { return this; } + public Builder withSinkOp(@NotEmpty final String sinkOp) { + this.conf.setProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey), sinkOp); + return this; + } + public Builder withBulkInsertParallelism(final int parallelism) { this.conf.setProperty( getTablePropertyKey(HOODIE_BULKINSERT_PARALLELISM, this.tableKey), Integer.toString(parallelism)); diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java index 7dc97b5..9a29856 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieErrorSink.java @@ -44,10 +44,9 @@ public class HoodieErrorSink extends HoodieSink { public HoodieErrorSink(@NonNull final HoodieConfiguration hoodieConf, @NonNull final HoodieSinkDataConverter hoodieSinkDataConverter, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, @NonNull final IMetadataManager metadataMgr, final boolean shouldSaveChangesInFuture) { - super(hoodieConf, hoodieSinkDataConverter, jsc, op, metadataMgr, shouldSaveChangesInFuture, Optional.absent()); + super(hoodieConf, hoodieSinkDataConverter, jsc, metadataMgr, shouldSaveChangesInFuture, Optional.absent()); } public void writeRecordsAndErrors(@NonNull final HoodieWriteResult result) { diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java index 0b342bb..6246f02 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieSink.java @@ -116,23 +116,21 @@ public class HoodieSink implements ISink, scala.Serializable { public HoodieSink(@NonNull final HoodieConfiguration hoodieConf, @NonNull final HoodieSinkDataConverter hoodieSinkDataConverter, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, @NonNull final IMetadataManager metadataMgr, @NonNull final Optional defaultDataPartitioner) { - this(hoodieConf, hoodieSinkDataConverter, jsc, op, metadataMgr, false, defaultDataPartitioner); + this(hoodieConf, hoodieSinkDataConverter, jsc, metadataMgr, false, defaultDataPartitioner); } public HoodieSink(@NonNull final HoodieConfiguration hoodieConf, @NonNull final HoodieSinkDataConverter hoodieSinkDataConverter, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, @NonNull final IMetadataManager metadataMgr, final boolean shouldSaveChangesInFuture, @NonNull final Optional defaultDataPartitioner) { this.hoodieConf = hoodieConf; this.hoodieSinkDataConverter = hoodieSinkDataConverter; this.jsc = jsc; - this.op = op; + this.op = hoodieConf.getHoodieSinkOp(); this.metadataMgr = metadataMgr; this.sinkStatMgr = new SinkStatManager(this.hoodieConf.getTableName(), this.metadataMgr); this.sinkStatMgr.init(); diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index 11e04cf..5bc4c3e 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -165,8 +165,8 @@ private void run(final String[] args) throws IOException { // Sink HoodieSinkDataConverter hoodieSinkDataConverter = new HoodieSinkDataConverter(conf, new ErrorExtractor(), hoodieConf); - HoodieSink hoodieSink = new HoodieSink(hoodieConf, hoodieSinkDataConverter, jsc, - HoodieSink.HoodieSinkOp.INSERT, metadataManager, Optional.absent()); + HoodieSink hoodieSink = new HoodieSink(hoodieConf, hoodieSinkDataConverter, jsc, metadataManager, + Optional.absent()); log.info("Initializing work unit calculator for job"); final IWorkUnitCalculator workUnitCalculator = new KafkaWorkUnitCalculator(kafkaSourceConf); diff --git a/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java b/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java index 0a0446f..29e2e34 100644 --- a/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java +++ b/marmaray/src/main/java/com/uber/marmaray/utilities/ErrorTableUtil.java @@ -120,8 +120,7 @@ public static void writeErrorRecordsToErrorTable(@NonNull final SparkContext sc, final HoodieBasedMetadataManager metadataManager = new HoodieBasedMetadataManager(hoodieConf, shouldSaveChanges, jsc); final HoodieSink hoodieSink = new HoodieErrorSink(hoodieConf, new DummyHoodieSinkDataConverter(), jsc, - HoodieSink.HoodieSinkOp.BULK_INSERT, metadataManager, - false); + metadataManager,false); JavaRDD errorRecords = errorData.getData().map(error -> generateGenericErrorRecord( errorExtractor, errorTableSchema, error, applicationId)); diff --git a/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java b/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java index f047840..f562bcb 100644 --- a/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java +++ b/marmaray/src/test/java/com/uber/marmaray/common/sinks/hoodie/TestHoodieSink.java @@ -86,16 +86,14 @@ class MockHoodieSink extends HoodieSink { private HoodieWriteClientWrapper mockWriteClient; public MockHoodieSink(@NonNull final HoodieConfiguration hoodieConf, - @NonNull final HoodieSinkDataConverter hoodieKeyGenerator, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op) { - super(hoodieConf, hoodieKeyGenerator, jsc, op, new MemoryMetadataManager(), Optional.absent()); + @NonNull final HoodieSinkDataConverter hoodieKeyGenerator, @NonNull final JavaSparkContext jsc) { + super(hoodieConf, hoodieKeyGenerator, jsc, new MemoryMetadataManager(), Optional.absent()); } public MockHoodieSink(@NonNull final HoodieConfiguration hoodieConf, @NonNull final HoodieSinkDataConverter hoodieKeyGenerator, @NonNull final JavaSparkContext jsc, - @NonNull final HoodieSinkOp op, @NonNull final IMetadataManager metadataMgr) { - super(hoodieConf, hoodieKeyGenerator, jsc, op, metadataMgr, Optional.absent()); + super(hoodieConf, hoodieKeyGenerator, jsc, metadataMgr, Optional.absent()); } @Override @@ -122,11 +120,10 @@ public void testUpdateInsertParallelism() { final String schemaStr = getSchema("TS", "RECORD_KEY", 4, 8).toString(); final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + .withBasePath(basePath).withSchema(schemaStr).withSinkOp("NO_OP").enableMetrics(false).build(); final HoodieSink mockSink = - spy(new HoodieSink(hoodieConf, mock(HoodieSinkDataConverter.class), - mock(JavaSparkContext.class), HoodieSink.HoodieSinkOp.NO_OP, new NoOpMetadataManager(), - Optional.absent())); + spy(new HoodieSink(hoodieConf, mock(HoodieSinkDataConverter.class), mock(JavaSparkContext.class), + new NoOpMetadataManager(), Optional.absent())); when(mockSink.calculateNewBulkInsertParallelism(anyLong())).thenReturn(18); Assert.assertTrue(mockSink.updateInsertParallelism(1000)); Assert.assertEquals(18, hoodieConf.getInsertParallelism()); @@ -140,11 +137,10 @@ public void testUpdateBulkInsertParallelism() { final String schemaStr = getSchema("TS", "RECORD_KEY", 4, 8).toString(); final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") - .withBasePath(basePath).withSchema(schemaStr).enableMetrics(false).build(); + .withBasePath(basePath).withSchema(schemaStr).withSinkOp("NO_OP").enableMetrics(false).build(); final HoodieSink mockSink = - spy(new HoodieSink(hoodieConf, mock(HoodieSinkDataConverter.class), - mock(JavaSparkContext.class), HoodieSink.HoodieSinkOp.NO_OP, new NoOpMetadataManager(), - Optional.absent())); + spy(new HoodieSink(hoodieConf, mock(HoodieSinkDataConverter.class), mock(JavaSparkContext.class), + new NoOpMetadataManager(), Optional.absent())); when(mockSink.calculateNewBulkInsertParallelism(anyLong())).thenReturn(18); Assert.assertTrue(mockSink.updateBulkInsertParallelism(1000)); Assert.assertEquals(18, hoodieConf.getBulkInsertParallelism()); @@ -159,11 +155,11 @@ public void testHoodieSinkWriteInsertWithoutMetadata() throws IOException { final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) - .withPartitionPath(TS_KEY).enableMetrics(false).build(); + .withPartitionPath(TS_KEY).withSinkOp("INSERT").enableMetrics(false).build(); final HoodieSinkDataConverter hoodieKeyGenerator = new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get()); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); final Map emptyTags = new HashMap<>(); @@ -206,11 +202,11 @@ public void testHoodieSinkWriteUpsertWithoutMetadata() throws IOException { final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) - .withPartitionPath(TS_KEY).enableMetrics(false).build(); + .withPartitionPath(TS_KEY).withSinkOp("UPSERT").enableMetrics(false).build(); final HoodieSinkDataConverter hoodieKeyGenerator = new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), UPSERT); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get()); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); final Map emptyTags = new HashMap<>(); @@ -250,13 +246,13 @@ public void testHoodieSinkWriteInsertWithMetadata() throws IOException { final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) - .withPartitionPath(TS_KEY).enableMetrics(false).build(); + .withPartitionPath(TS_KEY).withSinkOp("INSERT").enableMetrics(false).build(); final HoodieSinkDataConverter hoodieKeyGenerator = new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final HoodieBasedMetadataManager hoodieBasedMetadataManager = new HoodieBasedMetadataManager(hoodieConf, new AtomicBoolean(true), this.jsc.get()); hoodieBasedMetadataManager.set("randomKey", new StringValue("randomValue")); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT, + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), hoodieBasedMetadataManager); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); @@ -299,13 +295,13 @@ public void testHoodieSinkWriteUpsertWithMetadata() throws IOException { final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) - .withPartitionPath(TS_KEY).enableMetrics(false).build(); + .withPartitionPath(TS_KEY).withSinkOp("UPSERT").enableMetrics(false).build(); final HoodieSinkDataConverter hoodieKeyGenerator = new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final HoodieBasedMetadataManager hoodieBasedMetadataManager = new HoodieBasedMetadataManager(hoodieConf, new AtomicBoolean(true), this.jsc.get()); hoodieBasedMetadataManager.set("randomKey", new StringValue("randomValue")); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), UPSERT, + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), hoodieBasedMetadataManager); final JavaRDD inputRDD = @@ -348,14 +344,14 @@ public void testHoodieSinkWriteDedupeInsert() throws IOException { final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") .withBasePath(basePath).withSchema(schemaStr).withCombineBeforeInsert(true) - .withCombineBeforeUpsert(true).withRecordKey(RECORD_KEY).withPartitionPath(TS_KEY) - .enableMetrics(false).build(); + .withCombineBeforeUpsert(true).withRecordKey(RECORD_KEY).withSinkOp("DEDUP_INSERT") + .withPartitionPath(TS_KEY).enableMetrics(false).build(); final HoodieSinkDataConverter hoodieKeyGenerator = new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), DEDUP_INSERT); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get()); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -389,7 +385,7 @@ public void testHoodieSinkWriteDedupeInsert() throws IOException { // If we try to re-insert then it should find all the records as a a part filterExists test and should not // call bulkInsert. - final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), DEDUP_INSERT); + final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get()); hoodieSink.write(inputRDD); final HoodieWriteClientWrapper hoodieWriteClientWrapper2 = hoodieSink.getMockWriteClient(); @@ -411,11 +407,11 @@ public void testHoodieSinkMetrics() throws IOException { final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) - .withPartitionPath(TS_KEY).enableMetrics(false).build(); + .withPartitionPath(TS_KEY).withSinkOp("INSERT").enableMetrics(false).build(); final HoodieSinkDataConverter hoodieKeyGenerator = new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); - final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), INSERT); + final MockHoodieSink hoodieSink = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get()); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink.setDataFeedMetrics(dfm); @@ -458,14 +454,14 @@ public void testUserDefinedCommitTime() throws IOException { final HoodieConfiguration hoodieConf = HoodieConfiguration.newBuilder(tableName).withTableName(tableName).withMetricsPrefix("test") .withBasePath(basePath).withSchema(schemaStr).withRecordKey(RECORD_KEY) - .withPartitionPath(TS_KEY).enableMetrics(false).build(); + .withPartitionPath(TS_KEY).withSinkOp("BULK_INSERT").enableMetrics(false).build(); final HoodieSinkDataConverter hoodieKeyGenerator = new TSBasedHoodieSinkDataConverter(conf, hoodieConf, TimeUnit.MILLISECONDS); final JavaRDD inputRDD = this.jsc.get().parallelize(getRandomData(schemaStr, TS_KEY, RECORD_KEY, 10)); - final MockHoodieSink hoodieSink1 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), BULK_INSERT); + final MockHoodieSink hoodieSink1 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get()); final Map emptyTags = new HashMap<>(); final DataFeedMetrics dfm = new DataFeedMetrics(JOB_NAME, emptyTags); hoodieSink1.setDataFeedMetrics(dfm); @@ -481,7 +477,7 @@ public void testUserDefinedCommitTime() throws IOException { HoodieActiveTimeline.COMMIT_FORMATTER.format( new Date(new Date().getTime() - TimeUnit.DAYS.toMillis(365))); - final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get(), BULK_INSERT); + final MockHoodieSink hoodieSink2 = new MockHoodieSink(hoodieConf, hoodieKeyGenerator, jsc.get()); hoodieSink2.setDataFeedMetrics(dfm); hoodieSink2.setCommitTime(com.google.common.base.Optional.of(customCommit)); From f3f017ec620c951a246a583930489fe01c28ffdd Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 23:18:28 +0530 Subject: [PATCH 10/11] Fix logs --- .../java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index 5bc4c3e..e62a659 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -178,7 +178,7 @@ private void run(final String[] args) throws IOException { jobManager.addJobDag(jobDag); - log.info("Running dispersal job"); + log.info("Running ingestion job"); try { jobManager.run(); JobUtil.raiseExceptionIfStatusFailed(jobManager.getJobManagerStatus()); @@ -195,7 +195,7 @@ private void run(final String[] args) throws IOException { reporters.report(configError); throw t; } - log.info("Dispersal job has been completed"); + log.info("Ingestion job has been completed"); final TimerMetric jobLatencyMetric = new TimerMetric(JobMetricNames.RUN_JOB_DAG_LATENCY_MS, metricTags, jobStartTime); From e1be8ffa9283ccc22194cba25b0cb75a37e3aa1f Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Thu, 10 Oct 2019 23:42:06 +0530 Subject: [PATCH 11/11] Add dummy commit --- .../java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java | 1 - 1 file changed, 1 deletion(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java index 7ffb5ed..3161c4d 100644 --- a/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java +++ b/marmaray/src/main/java/com/uber/marmaray/examples/job/KafkaToHoodieJob.java @@ -80,7 +80,6 @@ protected String getPartitionPath(AvroPayload avroPayload) { */ @Slf4j public class KafkaToHoodieJob { - /** * Generic entry point *