Skip to content

Commit

Permalink
[FLINK-34467] add lineage integration for jdbc connector
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu committed Dec 1, 2024
1 parent 134d858 commit 76f3e11
Show file tree
Hide file tree
Showing 29 changed files with 1,236 additions and 13 deletions.
1 change: 1 addition & 0 deletions .java-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
11
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Expand All @@ -53,6 +59,7 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;

/**
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
Expand Down Expand Up @@ -107,7 +114,7 @@
@Deprecated
@Experimental
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
implements ResultTypeQueryable<Row> {
implements LineageVertexProvider, ResultTypeQueryable<Row> {

protected static final long serialVersionUID = 2L;
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
Expand Down Expand Up @@ -344,6 +351,18 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
return new JdbcInputFormatBuilder();
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getProducedType());
String name = LineageUtils.nameOf(connectionProvider);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(name, namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(
Boundedness.BOUNDED, Collections.singleton(dataset));
}

/** Builder for {@link JdbcInputFormat}. */
public static class JdbcInputFormatBuilder {
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -47,7 +51,9 @@
*/
@PublicEvolving
public class JdbcSink<IN>
implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> {
implements LineageVertexProvider,
StatefulSink<IN, JdbcWriterState>,
TwoPhaseCommittingSink<IN, JdbcCommitable> {

private final DeliveryGuarantee deliveryGuarantee;
private final JdbcConnectionProvider connectionProvider;
Expand Down Expand Up @@ -113,4 +119,12 @@ public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
return new JdbcWriterStateSerializer();
}

@Override
public LineageVertex getLineageVertex() {
String name = LineageUtils.nameOf(queryStatement);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset = LineageUtils.datasetOf(name, namespace, Collections.emptyList());
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,28 @@
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;

/** JDBC source. */
@PublicEvolving
public class JdbcSource<OUT>
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
implements LineageVertexProvider,
Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
ResultTypeQueryable<OUT> {

private final Boundedness boundedness;
Expand Down Expand Up @@ -195,4 +203,15 @@ public boolean equals(Object o) {
&& deliveryGuarantee == that.deliveryGuarantee
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getTypeInformation());
String name = LineageUtils.nameOf(connectionProvider);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(name, namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

Expand All @@ -51,11 +57,12 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;

/** InputFormat for {@link JdbcDynamicTableSource}. */
@Internal
public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
implements ResultTypeQueryable<RowData> {
implements LineageVertexProvider, ResultTypeQueryable<RowData> {

private static final long serialVersionUID = 2L;
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
Expand Down Expand Up @@ -296,6 +303,18 @@ public static Builder builder() {
return new Builder();
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(getProducedType());
String name = LineageUtils.nameOf(connectionProvider);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(name, namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(
Boundedness.BOUNDED, Collections.singleton(dataset));
}

/** Builder for {@link JdbcRowDataInputFormat}. */
public static class Builder {
private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,7 +60,7 @@

/** A lookup function for {@link JdbcDynamicTableSource}. */
@Internal
public class JdbcRowDataLookupFunction extends LookupFunction {
public class JdbcRowDataLookupFunction extends LookupFunction implements LineageVertexProvider {

private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
private static final long serialVersionUID = 2L;
Expand All @@ -67,6 +74,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction {

private final List<String> resolvedPredicates;
private final Serializable[] pushdownParams;
private final RowType producedType;

private transient FieldNamedPreparedStatement statement;

Expand Down Expand Up @@ -106,12 +114,12 @@ public JdbcRowDataLookupFunction(
.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
JdbcDialect jdbcDialect = options.getDialect();
this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
this.lookupKeyRowConverter =
jdbcDialect.getRowConverter(
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new)));
this.producedType =
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new));
this.lookupKeyRowConverter = jdbcDialect.getRowConverter(producedType);
this.resolvedPredicates = resolvedPredicates;
this.pushdownParams = pushdownParams;
}
Expand Down Expand Up @@ -224,4 +232,18 @@ public void close() throws IOException {
public Connection getDbConnection() {
return connectionProvider.getConnection();
}

@Override
public LineageVertex getLineageVertex() {
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
new DefaultTypeDatasetFacet(
TypeConversions.fromDataTypeToLegacyInfo(
TypeConversions.fromLogicalToDataType(producedType)));
String name = LineageUtils.nameOf(connectionProvider);
String namespace = LineageUtils.namespaceOf(connectionProvider);
LineageDataset dataset =
LineageUtils.datasetOf(name, namespace, Arrays.asList(defaultTypeDatasetFacet));
return LineageUtils.sourceLineageVertexOf(
Boundedness.BOUNDED, Collections.singleton(dataset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ public Connection reestablishConnection() throws SQLException, ClassNotFoundExce
closeConnection();
return getOrEstablishConnection();
}

public String getDbURL() {
return this.jdbcOptions.getDbURL();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,25 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Collections;

/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction, InputTypeConfigurable {
implements LineageVertexProvider, CheckpointedFunction, InputTypeConfigurable {
private final JdbcOutputFormat<T, ?, ?> outputFormat;
private JdbcOutputSerializer<T> serializer;

Expand Down Expand Up @@ -78,4 +83,12 @@ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfi
JdbcOutputSerializer.of(
((TypeInformation<T>) type).createSerializer(executionConfig));
}

@Override
public LineageVertex getLineageVertex() {
String name = LineageUtils.nameOf(outputFormat.connectionProvider);
String namespace = LineageUtils.namespaceOf(outputFormat.connectionProvider);
LineageDataset dataset = LineageUtils.datasetOf(name, namespace, Collections.emptyList());
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
}
}
Loading

0 comments on commit 76f3e11

Please sign in to comment.