Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][io] Upgrade Debezium connector version to 2.6.1.Final #14

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ flexible messaging model and an intuitive client API.</description>
<json-smart.version>2.4.10</json-smart.version>
<opensearch.version>1.2.4</opensearch.version>
<elasticsearch-java.version>8.12.1</elasticsearch-java.version>
<debezium.version>1.9.7.Final</debezium.version>
<debezium.version>2.6.1.Final</debezium.version>
<debezium.postgresql.version>42.5.5</debezium.postgresql.version>
<debezium.mysql.version>8.0.30</debezium.mysql.version>
<!-- Override version that brings CVE-2022-3143 with debezium -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);

// database.history : implementation class for database history.
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY);

// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -52,12 +52,12 @@
import org.apache.pulsar.client.api.Schema;

/**
* A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified
* A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified
* topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
*/
@Slf4j
@ThreadSafe
public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
public final class PulsarDatabaseHistory extends AbstractSchemaHistory {

public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
.withDisplayName("Database history topic name")
Expand Down Expand Up @@ -94,11 +94,11 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
.withValidation(Field::isOptional);

public static final Field.Set ALL_FIELDS = Field.setOf(
TOPIC,
SERVICE_URL,
CLIENT_BUILDER,
DatabaseHistory.NAME,
READER_CONFIG);
TOPIC,
SERVICE_URL,
CLIENT_BUILDER,
SchemaHistory.NAME,
READER_CONFIG);

private final ObjectMapper mapper = new ObjectMapper();
private final DocumentReader reader = DocumentReader.defaultReader();
Expand All @@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
public void configure(
Configuration config,
HistoryRecordComparator comparator,
DatabaseHistoryListener listener,
SchemaHistoryListener listener,
boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
Expand Down Expand Up @@ -148,7 +148,7 @@ public void configure(
}

// Copy the relevant portions of the configuration and add useful defaults ...
this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());

log.info("Configure to store the debezium database history {} to pulsar topic {}",
dbHistoryName, topicName);
Expand Down Expand Up @@ -201,7 +201,7 @@ public void start() {
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'start()'"
+ " is called before storing database history records.");
Expand All @@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
try {
producer.send(record.toString());
} catch (PulsarClientException e) {
throw new DatabaseHistoryException(e);
throw new SchemaHistoryException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -81,8 +81,8 @@ protected void cleanup() throws Exception {
private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
Configuration.Builder configBuidler = Configuration.create()
.with(PulsarDatabaseHistory.TOPIC, topicName)
.with(DatabaseHistory.NAME, "my-db-history")
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);
.with(SchemaHistory.NAME, "my-db-history")
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);

if (testWithClientBuilder) {
ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString());
Expand All @@ -102,7 +102,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
}

// Start up the history ...
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
history.start();

// Should be able to call start more than once ...
Expand Down Expand Up @@ -161,7 +161,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
// Stop the history (which should stop the producer) ...
history.stop();
history = new PulsarDatabaseHistory();
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
// no need to start

// Recover from the very beginning to just past the first change ...
Expand Down Expand Up @@ -241,11 +241,11 @@ public void testExists() throws Exception {
Configuration config = Configuration.create()
.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
.with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
.with(DatabaseHistory.NAME, "my-db-history")
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.with(SchemaHistory.NAME, "my-db-history")
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.build();

history.configure(config, null, DatabaseHistoryListener.NOOP, true);
history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.start();

// dummytopic should not exist yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class DebeziumMongoDbContainer extends ChaosContainer<DebeziumMongoDbCont
public static final String NAME = "debezium-mongodb-example";

public static final Integer[] PORTS = { 27017 };
private static final String IMAGE_NAME = "debezium/example-mongodb:0.10";
private static final String IMAGE_NAME = "debezium/example-mongodb:2.6.1.Final";

public DebeziumMongoDbContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContaine
public static final String NAME = "debezium-mysql-example";
static final Integer[] PORTS = { 3306 };

private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
private static final String IMAGE_NAME = "debezium/example-mysql:2.6.1.Final";

public DebeziumMySQLContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class DebeziumPostgreSqlContainer extends ChaosContainer<DebeziumPostgreS
public static final String NAME = "debezium-postgresql-example";
static final Integer[] PORTS = { 5432 };

private static final String IMAGE_NAME = "debezium/example-postgres:0.10";
private static final String IMAGE_NAME = "debezium/example-postgres:2.6.1.Final";

public DebeziumPostgreSqlContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> i
protected int numEntriesToInsert = 1;
protected int numEntriesExpectAfterStart = 9;

/*
*In Debezium 2.5, they introduced several new timestamp fields,
* ts_us, and ts_ns, which represent the millisecond-based time values in microseconds and nanoseconds respectively.
*/
public static final Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
add("before");
add("after");
add("source");
add("op");
add("ts_ms");
add("ts_us");
add("ts_ns");
add("transaction");
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@ public DebeziumMongoDbSourceTester(PulsarCluster cluster) {
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("mongodb.hosts", "rs0/" + DebeziumMongoDbContainer.NAME + ":27017");
/*
*The `mongodb.connection.string` property replaces the deprecated `mongodb.hosts` property in release 2.2
* that was used to provide earlier versions of the connector with the host address of the configuration server replica.
* In the current release, use mongodb.connection.string to provide the connector with the addresses of MongoDB routers,
* also known as mongos.
*/
sourceConfig.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
sourceConfig.put("mongodb.connection.string", "mongodb://" + DebeziumMongoDbContainer.NAME + ":27017/?replicaSet=rs0");
sourceConfig.put("mongodb.name", "dbserver1");
sourceConfig.put("mongodb.user", "debezium");
sourceConfig.put("mongodb.password", "dbz");
sourceConfig.put("mongodb.task.id","1");
sourceConfig.put("database.include.list", "inventory");
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("mongodb.task.id", "1");
sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/mongodb");
sourceConfig.put("capture.mode", "oplog");
sourceConfig.put("topic.prefix", "dbserver1");
}

@Override
Expand All @@ -66,13 +72,16 @@ public void prepareSource() throws Exception {
log.info("debezium mongodb server already contains preconfigured data.");
}

/*
* mongo is deprecated in 2.6.1.Final release and now we have use mongosh instead
*/
@Override
public void prepareInsertEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
"mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
"mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"--eval 'db.products.insert({ " +
"_id : NumberLong(\"110\")," +
"name : \"test-debezium\"," +
Expand All @@ -84,20 +93,20 @@ public void prepareInsertEvent() throws Exception {
@Override
public void prepareDeleteEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
"mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
"mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"--eval 'db.products.deleteOne({name : \"test-debezium-update\"})'");
}

@Override
public void prepareUpdateEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
"mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
"mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " +
"--eval 'db.products.update({" +
"_id : 110}," +
"{$set:{name:\"test-debezium-update\", description: \"this is update description\"}})'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ public DebeziumMsSqlSourceTester(PulsarCluster cluster) {

pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");
sourceConfig.put("database.hostname", DebeziumMsSqlContainer.NAME);
sourceConfig.put("database.port", "1433");
sourceConfig.put("database.user", "sa");
sourceConfig.put("database.password", DebeziumMsSqlContainer.SA_PASSWORD);
sourceConfig.put("database.server.name", "mssql");
sourceConfig.put("database.dbname", "TestDB");
sourceConfig.put("snapshot.mode", "schema_only");
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("database.names", "TestDB");
sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/mssql");
sourceConfig.put("topic.prefix", "mssql");
sourceConfig.put("database.encrypt", "false");
sourceConfig.put("task.id", "1");
}

@Override
Expand Down Expand Up @@ -145,12 +147,12 @@ public int initialDelayForMsgReceive() {

@Override
public String keyContains() {
return "mssql.dbo.customers.Key";
return "mssql.TestDB.dbo.customers.Key";
}

@Override
public String valueContains() {
return "mssql.dbo.customers.Value";
return "mssql.TestDB.dbo.customers.Value";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* It reads binlog from MySQL, and store the debezium output into Pulsar.
* This test verify that the target topic contains wanted number messages.
*
* Debezium MySQL Container is "debezium/example-mysql:0.8",
* Debezium MySQL Container is "debezium/example-mysql:2.6.1.Final",
* which is a MySQL database server preconfigured with an inventory database.
*/
@Slf4j
Expand All @@ -53,20 +53,23 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("connector.class", "io.debezium.connector.mysql.MySqlConnector");
sourceConfig.put("database.hostname", DebeziumMySQLContainer.NAME);
sourceConfig.put("database.port", "3306");
sourceConfig.put("database.user", "debezium");
sourceConfig.put("database.password", "dbz");
sourceConfig.put("database.server.id", "184054");
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.whitelist", "inventory");
sourceConfig.put("database.include.list", "inventory");
if (!testWithClientBuilder) {
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
}
sourceConfig.put("key.converter", converterClassName);
sourceConfig.put("value.converter", converterClassName);
sourceConfig.put("topic.namespace", "debezium/mysql-" +
(converterClassName.endsWith("AvroConverter") ? "avro" : "json"));
sourceConfig.put("topic.prefix", "dbserver1");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) {

pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("connector.class", "io.debezium.connector.oracle.OracleConnector");
sourceConfig.put("database.hostname", DebeziumOracleDbContainer.NAME);
sourceConfig.put("database.port", "1521");
sourceConfig.put("database.user", "dbzuser");
Expand All @@ -63,8 +64,9 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) {
sourceConfig.put("snapshot.mode", "schema_only");

sourceConfig.put("schema.include.list", "inv");
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/oracle");
sourceConfig.put("topic.prefix", "XE");
}

@Override
Expand Down
Loading
Loading