Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Kafka to Hoodie support #23

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.marmaray.common.exceptions.JobRuntimeException;
import com.uber.marmaray.common.exceptions.MissingPropertyException;
import com.uber.marmaray.common.sinks.hoodie.HoodieSink;
import com.uber.marmaray.utilities.ConfigUtil;
import com.uber.marmaray.utilities.StringTypes;
import lombok.Getter;
Expand Down Expand Up @@ -65,6 +66,22 @@ public class HoodieConfiguration implements Serializable {
* Schema for Hoodie dataset
*/
public static final String HOODIE_AVRO_SCHEMA = HOODIE_COMMON_PROPERTY_PREFIX + "schema";

/**
* Record Key for Hoodie dataset
*/
public static final String HOODIE_RECORD_KEY = HOODIE_COMMON_PROPERTY_PREFIX + "record_key";

/**
* Partition path for Hoodie dataset
*/
public static final String HOODIE_PARTITION_PATH = HOODIE_COMMON_PROPERTY_PREFIX + "partition_path";

/**
* Partition path for Hoodie dataset
*/
public static final String HOODIE_SINK_OP = HOODIE_COMMON_PROPERTY_PREFIX + "sink_op";

/**
* Flag to control whether it should combine before insert
*/
Expand Down Expand Up @@ -250,6 +267,31 @@ public String getTableName() {
return this.getConf().getProperty(getTablePropertyKey(HOODIE_TABLE_NAME, this.tableKey)).get();
}

/**
* @return hoodie record key.
*/
public Optional<String> getHoodieRecordKey() {
return this.conf.getProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey));
}

/**
* @return hoodie partition path.
*/
public Optional<String> getHoodiePartitionPath() {
return this.conf.getProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey));
}

/**
* @return hoodie sink operation
*/
public HoodieSink.HoodieSinkOp getHoodieSinkOp() {
Optional<String> sinkOp = this.conf.getProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey));
if (sinkOp.isPresent()) {
return HoodieSink.HoodieSinkOp.valueOf(sinkOp.get().toUpperCase());
}
return HoodieSink.HoodieSinkOp.BULK_INSERT;
}

/**
* @return hoodie metrics prefix.
* */
Expand Down Expand Up @@ -492,11 +534,26 @@ public Builder withBasePath(@NotEmpty final String basePath) {
return this;
}

public Builder withRecordKey(@NotEmpty final String recordKey) {
this.conf.setProperty(getTablePropertyKey(HOODIE_RECORD_KEY, this.tableKey), recordKey);
return this;
}

public Builder withPartitionPath(@NotEmpty final String partitionPath) {
this.conf.setProperty(getTablePropertyKey(HOODIE_PARTITION_PATH, this.tableKey), partitionPath);
return this;
}

public Builder withSchema(@NotEmpty final String schema) {
this.conf.setProperty(getTablePropertyKey(HOODIE_AVRO_SCHEMA, this.tableKey), schema);
return this;
}

public Builder withSinkOp(@NotEmpty final String sinkOp) {
this.conf.setProperty(getTablePropertyKey(HOODIE_SINK_OP, this.tableKey), sinkOp);
return this;
}

public Builder withBulkInsertParallelism(final int parallelism) {
this.conf.setProperty(
getTablePropertyKey(HOODIE_BULKINSERT_PARALLELISM, this.tableKey), Integer.toString(parallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.uber.marmaray.common.AvroPayload;
import com.uber.marmaray.common.configuration.Configuration;
import com.uber.marmaray.common.configuration.HoodieConfiguration;
import com.uber.marmaray.utilities.ErrorExtractor;

import lombok.NonNull;
Expand All @@ -29,7 +30,9 @@
*/
public class DummyHoodieSinkDataConverter extends HoodieSinkDataConverter {
public DummyHoodieSinkDataConverter() {
super(new Configuration(), new ErrorExtractor());

super(new Configuration(), new ErrorExtractor(), HoodieConfiguration.newBuilder(new Configuration(),
"test").build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.marmaray.common.AvroPayload;
import com.uber.marmaray.common.configuration.Configuration;
import com.uber.marmaray.common.configuration.HoodieConfiguration;
import com.uber.marmaray.common.converters.converterresult.ConverterResult;
import com.uber.marmaray.common.exceptions.InvalidDataException;
import com.uber.marmaray.common.metrics.DataFeedMetrics;
import com.uber.marmaray.common.metrics.JobMetrics;
import com.uber.marmaray.common.sinks.hoodie.HoodieSink;
Expand All @@ -35,27 +37,34 @@
import java.util.Collections;
import java.util.List;

import com.google.common.base.Optional;

/**
* {@link HoodieSinkDataConverter} extends {@link SinkDataConverter}
* This class is used by {@link HoodieSink} to generate {@link com.uber.hoodie.common.model.HoodieRecord} from
* {@link com.uber.marmaray.common.AvroPayload}.
*/
public abstract class HoodieSinkDataConverter extends SinkDataConverter<Schema, HoodieRecord<HoodieRecordPayload>> {
public class HoodieSinkDataConverter extends SinkDataConverter<Schema, HoodieRecord<HoodieRecordPayload>> {

// store the schema as a string since Schema doesn't serialize. Used in extended classes.
protected String schema;
private final ErrorExtractor errorExtractor;
private final HoodieConfiguration hoodieConfiguration;

public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor) {
public HoodieSinkDataConverter(@NonNull final Configuration conf, @NonNull final ErrorExtractor errorExtractor,
@NonNull final HoodieConfiguration hoodieConfiguration) {
super(conf, errorExtractor);
this.errorExtractor = errorExtractor;
this.hoodieConfiguration = hoodieConfiguration;
}

public HoodieSinkDataConverter(@NonNull final Configuration conf, final String schema,
@NonNull final ErrorExtractor errorExtractor) {
@NonNull final ErrorExtractor errorExtractor,
HoodieConfiguration hoodieConfiguration) {
super(conf, errorExtractor);
this.schema = schema;
this.errorExtractor = errorExtractor;
this.hoodieConfiguration = hoodieConfiguration;
}

@Override
Expand All @@ -82,15 +91,35 @@ protected final List<ConverterResult<AvroPayload, HoodieRecord<HoodieRecordPaylo
*
* @param payload {@link AvroPayload}.
*/
protected abstract String getRecordKey(@NonNull final AvroPayload payload) throws Exception;
protected String getRecordKey(@NonNull final AvroPayload payload) throws Exception {
Optional<String> hoodieRecordKey = hoodieConfiguration.getHoodieRecordKey();
if (hoodieRecordKey.isPresent()) {
final Object recordKeyFieldVal = payload.getData().get(hoodieRecordKey.get());
if (recordKeyFieldVal == null) {
throw new InvalidDataException("required field is missing:" + hoodieRecordKey.get());
}
return recordKeyFieldVal.toString();
}
throw new Exception("Hoodie Record Key missing");
}

/**
* The implementation of it should use fields from {@link AvroPayload} to generate partition path which is needed
* for {@link HoodieKey}.
*
* @param payload {@link AvroPayload}.
*/
protected abstract String getPartitionPath(@NonNull final AvroPayload payload) throws Exception;
protected String getPartitionPath(@NonNull final AvroPayload payload) throws Exception {
Optional<String> hoodiePartitionPath = hoodieConfiguration.getHoodiePartitionPath();
if (hoodiePartitionPath.isPresent()) {
final Object partitionFieldVal = payload.getData().get(hoodiePartitionPath.get());
if (partitionFieldVal == null) {
throw new InvalidDataException("required field is missing:" + hoodiePartitionPath.get());
}
return partitionFieldVal.toString();
}
throw new Exception("Hoodie Partition Path missing");
}

protected HoodieRecordPayload getPayload(@NonNull final AvroPayload payload) {
return new HoodieAvroPayload(java.util.Optional.of(payload.getData()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.uber.marmaray.common.AvroPayload;
import com.uber.marmaray.common.configuration.Configuration;
import com.uber.marmaray.common.configuration.HoodieConfiguration;
import com.uber.marmaray.common.exceptions.InvalidDataException;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.marmaray.utilities.HoodieSinkConverterErrorExtractor;
Expand All @@ -33,47 +34,29 @@
/**
* {@link TSBasedHoodieSinkDataConverter} extends {@link HoodieSinkDataConverter}
*
* This class generates {@link HoodieKey} from given {@link AvroPayload}. The passed in {@link AvroPayload} requires
* {@link #partitionFieldName} with timestamp in {@link #timeUnit}.
* This class generates partition path from given {@link AvroPayload}. The passed in {@link AvroPayload} requires
* {@link HoodieConfiguration} with timestamp in {@link #timeUnit}.
*
* {@link AvroPayload} also requires a {@link #recordKeyFieldName} which should be the primary key for the record.
*/
@Slf4j
public class TSBasedHoodieSinkDataConverter extends HoodieSinkDataConverter {

public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat(DATE_PARTITION_FORMAT);
@NotEmpty
private final String recordKeyFieldName;
@NotEmpty
private final String partitionFieldName;

@NonNull
private final TimeUnit timeUnit;

public TSBasedHoodieSinkDataConverter(@NonNull final Configuration conf,
@NotEmpty final String recordKeyFieldName, @NotEmpty final String partitionFieldName,
@NonNull final TimeUnit timeUnit) {
super(conf, new HoodieSinkConverterErrorExtractor());
this.recordKeyFieldName = recordKeyFieldName;
this.partitionFieldName = partitionFieldName;
@NonNull final HoodieConfiguration hoodieConfiguration,
@NonNull final TimeUnit timeUnit) {
super(conf, new HoodieSinkConverterErrorExtractor(), hoodieConfiguration);
this.timeUnit = timeUnit;
}

@Override
protected String getRecordKey(@NonNull final AvroPayload payload) throws Exception {
final Object recordKeyFieldVal = payload.getField(recordKeyFieldName);
if (recordKeyFieldVal == null) {
throw new InvalidDataException("required field is missing:" + recordKeyFieldName);
}
return recordKeyFieldVal.toString();
}

@Override
protected String getPartitionPath(final AvroPayload payload) throws Exception {
final Object partitionFieldVal = payload.getField(partitionFieldName);
if (partitionFieldVal == null) {
throw new InvalidDataException("required field is missing:" + partitionFieldName);
}
final Date date = new Date(this.timeUnit.toMillis((long) Double.parseDouble(partitionFieldVal.toString())));
String partitionFieldVal = super.getPartitionPath(payload);
final Date date = new Date(this.timeUnit.toMillis((long) Double.parseDouble(partitionFieldVal)));
return PARTITION_FORMATTER.format(date);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.uber.marmaray.common.schema.kafka;


import com.uber.marmaray.common.schema.ISchemaService;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.hibernate.validator.constraints.NotEmpty;

import java.io.Serializable;

@Slf4j
public abstract class AbstractKafkaSchemaServiceReader implements ISchemaService.ISchemaServiceReader, Serializable {
private final String schemaString;
private transient Schema schema;

AbstractKafkaSchemaServiceReader(@NotEmpty final Schema schema) {
this.schemaString = schema.toString();
this.schema = schema;
log.info("Kafka Schema service reader initialised with schema {}", schemaString);
}

Schema getSchema() {
if (this.schema == null) {
this.schema = new Schema.Parser().parse(this.schemaString);
}
return this.schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.uber.marmaray.common.schema.kafka;

import com.uber.marmaray.common.exceptions.InvalidDataException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import java.io.IOException;

public class KafkaSchemaAvroServiceReader extends AbstractKafkaSchemaServiceReader {
public KafkaSchemaAvroServiceReader(Schema schema) {
super(schema);
}

@Override
public GenericRecord read(byte[] buffer) throws InvalidDataException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(getSchema());
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(buffer, null);
try {
return datumReader.read(null, decoder);
} catch (IOException e) {
throw new InvalidDataException("Error decoding data", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.uber.marmaray.common.schema.kafka;

import com.uber.marmaray.common.exceptions.InvalidDataException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;

import java.io.IOException;

public class KafkaSchemaJSONServiceReader extends AbstractKafkaSchemaServiceReader {
public KafkaSchemaJSONServiceReader(Schema schema) {
super(schema);
}

@Override
public GenericRecord read(byte[] buffer) throws InvalidDataException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(getSchema());
try {
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(getSchema(), new String(buffer));
return datumReader.read(null, decoder);
} catch (IOException e) {
throw new InvalidDataException("Error decoding data", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ public class HoodieErrorSink extends HoodieSink {
public HoodieErrorSink(@NonNull final HoodieConfiguration hoodieConf,
@NonNull final HoodieSinkDataConverter hoodieSinkDataConverter,
@NonNull final JavaSparkContext jsc,
@NonNull final HoodieSinkOp op,
@NonNull final IMetadataManager metadataMgr,
final boolean shouldSaveChangesInFuture) {
super(hoodieConf, hoodieSinkDataConverter, jsc, op, metadataMgr, shouldSaveChangesInFuture, Optional.absent());
super(hoodieConf, hoodieSinkDataConverter, jsc, metadataMgr, shouldSaveChangesInFuture, Optional.absent());
}

public void writeRecordsAndErrors(@NonNull final HoodieWriteResult result) {
Expand Down
Loading