From 2f23c4f11046bffffddf0677b5a9f17ff87d58cf Mon Sep 17 00:00:00 2001 From: mukesh-ctds Date: Fri, 26 Jul 2024 11:43:00 +0530 Subject: [PATCH 1/3] Updated debezium to 2.6.1.Final --- pom.xml | 2 +- .../pulsar/io/debezium/DebeziumSource.java | 2 +- .../io/debezium/PulsarDatabaseHistory.java | 30 +++++++++--------- .../debezium/PulsarDatabaseHistoryTest.java | 18 +++++------ .../containers/DebeziumMongoDbContainer.java | 2 +- .../containers/DebeziumMySQLContainer.java | 2 +- .../DebeziumPostgreSqlContainer.java | 2 +- .../integration/io/sources/SourceTester.java | 6 ++++ .../debezium/DebeziumMongoDbSourceTester.java | 31 ++++++++++++------- .../debezium/DebeziumMsSqlSourceTester.java | 14 +++++---- .../debezium/DebeziumMySqlSourceTester.java | 7 +++-- .../DebeziumOracleDbSourceTester.java | 4 ++- .../DebeziumPostgreSqlSourceTester.java | 6 ++-- .../debezium/PulsarDebeziumSourcesTest.java | 6 ++-- 14 files changed, 78 insertions(+), 54 deletions(-) diff --git a/pom.xml b/pom.xml index c497ea12e838b..ce051cbb233ed 100644 --- a/pom.xml +++ b/pom.xml @@ -201,7 +201,7 @@ flexible messaging model and an intuitive client API. 2.4.10 1.2.4 8.12.1 - 1.9.7.Final + 2.6.1.Final 42.5.5 8.0.30 diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index 6c422c4f036a9..749e5db6934c9 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -88,7 +88,7 @@ public void open(Map config, SourceContext sourceContext) throws setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); // database.history : implementation class for database history. - setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY); + setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY); // database.history.pulsar.service.url String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name()); diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java index 7ca0d309cf973..ecf1a15ccafb9 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -26,12 +26,12 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.document.DocumentReader; -import io.debezium.relational.history.AbstractDatabaseHistory; -import io.debezium.relational.history.DatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryException; -import io.debezium.relational.history.DatabaseHistoryListener; +import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryException; +import io.debezium.relational.history.SchemaHistoryListener; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -52,12 +52,12 @@ import org.apache.pulsar.client.api.Schema; /** - * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified + * A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified * topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic. */ @Slf4j @ThreadSafe -public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { +public final class PulsarDatabaseHistory extends AbstractSchemaHistory { public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic") .withDisplayName("Database history topic name") @@ -94,11 +94,11 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { .withValidation(Field::isOptional); public static final Field.Set ALL_FIELDS = Field.setOf( - TOPIC, - SERVICE_URL, - CLIENT_BUILDER, - DatabaseHistory.NAME, - READER_CONFIG); + TOPIC, + SERVICE_URL, + CLIENT_BUILDER, + SchemaHistory.NAME, + READER_CONFIG); private final ObjectMapper mapper = new ObjectMapper(); private final DocumentReader reader = DocumentReader.defaultReader(); @@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { public void configure( Configuration config, HistoryRecordComparator comparator, - DatabaseHistoryListener listener, + SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { super.configure(config, comparator, listener, useCatalogBeforeSchema); if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { @@ -148,7 +148,7 @@ public void configure( } // Copy the relevant portions of the configuration and add useful defaults ... - this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString()); + this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); log.info("Configure to store the debezium database history {} to pulsar topic {}", dbHistoryName, topicName); @@ -201,7 +201,7 @@ public void start() { } @Override - protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { if (this.producer == null) { throw new IllegalStateException("No producer is available. Ensure that 'start()'" + " is called before storing database history records."); @@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException try { producer.send(record.toString()); } catch (PulsarClientException e) { - throw new DatabaseHistoryException(e); + throw new SchemaHistoryException(e); } } diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java index cf7290f53d186..95c91abdb1b6d 100644 --- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java +++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java @@ -27,8 +27,8 @@ import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; -import io.debezium.relational.history.DatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryListener; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryListener; import io.debezium.text.ParsingException; import io.debezium.util.Collect; @@ -81,8 +81,8 @@ protected void cleanup() throws Exception { private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception { Configuration.Builder configBuidler = Configuration.create() .with(PulsarDatabaseHistory.TOPIC, topicName) - .with(DatabaseHistory.NAME, "my-db-history") - .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL); + .with(SchemaHistory.NAME, "my-db-history") + .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL); if (testWithClientBuilder) { ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString()); @@ -102,7 +102,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit } // Start up the history ... - history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); + history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true); history.start(); // Should be able to call start more than once ... @@ -161,7 +161,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit // Stop the history (which should stop the producer) ... history.stop(); history = new PulsarDatabaseHistory(); - history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); + history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true); // no need to start // Recover from the very beginning to just past the first change ... @@ -241,11 +241,11 @@ public void testExists() throws Exception { Configuration config = Configuration.create() .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic") - .with(DatabaseHistory.NAME, "my-db-history") - .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) + .with(SchemaHistory.NAME, "my-db-history") + .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) .build(); - history.configure(config, null, DatabaseHistoryListener.NOOP, true); + history.configure(config, null, SchemaHistoryListener.NOOP, true); history.start(); // dummytopic should not exist yet diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java index 481725d145b2a..bf6b8cd6462e2 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java @@ -25,7 +25,7 @@ public class DebeziumMongoDbContainer extends ChaosContainer i protected int numEntriesToInsert = 1; protected int numEntriesExpectAfterStart = 9; + /* + *In Debezium 2.5, they introduced several new timestamp fields, + * ts_us, and ts_ns, which represent the millisecond-based time values in microseconds and nanoseconds respectively. + */ public static final Set DEBEZIUM_FIELD_SET = new HashSet() {{ add("before"); add("after"); add("source"); add("op"); add("ts_ms"); + add("ts_us"); + add("ts_ns"); add("transaction"); }}; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java index 8c4a7a91a15fa..74014a3675157 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java @@ -42,15 +42,21 @@ public DebeziumMongoDbSourceTester(PulsarCluster cluster) { this.pulsarCluster = cluster; pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; - sourceConfig.put("mongodb.hosts", "rs0/" + DebeziumMongoDbContainer.NAME + ":27017"); + /* + *The `mongodb.connection.string` property replaces the deprecated `mongodb.hosts` property in release 2.2 + * that was used to provide earlier versions of the connector with the host address of the configuration server replica. + * In the current release, use mongodb.connection.string to provide the connector with the addresses of MongoDB routers, + * also known as mongos. + */ + sourceConfig.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); + sourceConfig.put("mongodb.connection.string", "mongodb://" + DebeziumMongoDbContainer.NAME + ":27017/?replicaSet=rs0"); sourceConfig.put("mongodb.name", "dbserver1"); sourceConfig.put("mongodb.user", "debezium"); sourceConfig.put("mongodb.password", "dbz"); - sourceConfig.put("mongodb.task.id","1"); - sourceConfig.put("database.include.list", "inventory"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("mongodb.task.id", "1"); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/mongodb"); - sourceConfig.put("capture.mode", "oplog"); + sourceConfig.put("topic.prefix", "dbserver1"); } @Override @@ -66,13 +72,16 @@ public void prepareSource() throws Exception { log.info("debezium mongodb server already contains preconfigured data."); } + /* + * mongo is deprecated in 2.6.1.Final release and now we have use mongosh instead + */ @Override public void prepareInsertEvent() throws Exception { this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.find()'"); this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.insert({ " + "_id : NumberLong(\"110\")," + "name : \"test-debezium\"," + @@ -84,20 +93,20 @@ public void prepareInsertEvent() throws Exception { @Override public void prepareDeleteEvent() throws Exception { this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.find()'"); this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.deleteOne({name : \"test-debezium-update\"})'"); } @Override public void prepareUpdateEvent() throws Exception { this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.find()'"); this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.update({" + "_id : 110}," + "{$set:{name:\"test-debezium-update\", description: \"this is update description\"}})'"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java index a745cae60409d..de92a885b3de1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java @@ -54,15 +54,17 @@ public DebeziumMsSqlSourceTester(PulsarCluster cluster) { pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; + sourceConfig.put("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector"); sourceConfig.put("database.hostname", DebeziumMsSqlContainer.NAME); sourceConfig.put("database.port", "1433"); sourceConfig.put("database.user", "sa"); sourceConfig.put("database.password", DebeziumMsSqlContainer.SA_PASSWORD); - sourceConfig.put("database.server.name", "mssql"); - sourceConfig.put("database.dbname", "TestDB"); - sourceConfig.put("snapshot.mode", "schema_only"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("database.names", "TestDB"); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/mssql"); + sourceConfig.put("topic.prefix", "mssql"); + sourceConfig.put("database.encrypt", "false"); + sourceConfig.put("task.id", "1"); } @Override @@ -145,12 +147,12 @@ public int initialDelayForMsgReceive() { @Override public String keyContains() { - return "mssql.dbo.customers.Key"; + return "mssql.TestDB.dbo.customers.Key"; } @Override public String valueContains() { - return "mssql.dbo.customers.Value"; + return "mssql.TestDB.dbo.customers.Value"; } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java index 2f2bb8a2c0c25..0d4bab1560a69 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java @@ -32,7 +32,7 @@ * It reads binlog from MySQL, and store the debezium output into Pulsar. * This test verify that the target topic contains wanted number messages. * - * Debezium MySQL Container is "debezium/example-mysql:0.8", + * Debezium MySQL Container is "debezium/example-mysql:2.5.0.Final", * which is a MySQL database server preconfigured with an inventory database. */ @Slf4j @@ -53,6 +53,7 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam this.pulsarCluster = cluster; pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; + sourceConfig.put("connector.class", "io.debezium.connector.mysql.MySqlConnector"); sourceConfig.put("database.hostname", DebeziumMySQLContainer.NAME); sourceConfig.put("database.port", "3306"); sourceConfig.put("database.user", "debezium"); @@ -60,13 +61,15 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam sourceConfig.put("database.server.id", "184054"); sourceConfig.put("database.server.name", "dbserver1"); sourceConfig.put("database.whitelist", "inventory"); + sourceConfig.put("database.include.list", "inventory"); if (!testWithClientBuilder) { - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); } sourceConfig.put("key.converter", converterClassName); sourceConfig.put("value.converter", converterClassName); sourceConfig.put("topic.namespace", "debezium/mysql-" + (converterClassName.endsWith("AvroConverter") ? "avro" : "json")); + sourceConfig.put("topic.prefix", "dbserver1"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java index 17eaf319b3be2..abe1f3e95cd12 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java @@ -54,6 +54,7 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) { pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; + sourceConfig.put("connector.class", "io.debezium.connector.oracle.OracleConnector"); sourceConfig.put("database.hostname", DebeziumOracleDbContainer.NAME); sourceConfig.put("database.port", "1521"); sourceConfig.put("database.user", "dbzuser"); @@ -63,8 +64,9 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) { sourceConfig.put("snapshot.mode", "schema_only"); sourceConfig.put("schema.include.list", "inv"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/oracle"); + sourceConfig.put("topic.prefix", "XE"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java index 5f82cc52e7c40..881570b100dae 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java @@ -35,7 +35,7 @@ * It reads binlog from Postgres, and store the debezium output into Pulsar. * This test verify that the target topic contains wanted number messages. * - * Debezium Postgresql Container is "debezium/example-postgres:0.10", + * Debezium Postgresql Container is "debezium/example-postgres:2.5.0.Final", * which is a Postgresql database server preconfigured with an inventory database. */ @Slf4j @@ -65,6 +65,7 @@ public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) { pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; + sourceConfig.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); sourceConfig.put("database.hostname", DebeziumPostgreSqlContainer.NAME); sourceConfig.put("database.port", "5432"); sourceConfig.put("database.user", "postgres"); @@ -74,8 +75,9 @@ public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) { sourceConfig.put("database.dbname", "postgres"); sourceConfig.put("schema.whitelist", "inventory"); sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/postgresql"); + sourceConfig.put("topic.prefix", "dbserver1"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java index 5c57c904fc77f..fdb9fe7b7baea 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java @@ -93,7 +93,7 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit + "-" + functionRuntimeType + "-" + randomName(8); // This is the binlog count that contained in mysql container. - final int numMessages = 47; + final int numMessages = 52; @Cleanup PulsarClient client = PulsarClient.builder() @@ -138,7 +138,7 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8); // This is the binlog count that contained in postgresql container. - final int numMessages = 26; + final int numMessages = 29; @Cleanup PulsarClient client = PulsarClient.builder() @@ -211,7 +211,7 @@ private void testDebeziumMsSqlConnect(String converterClassName, boolean jsonWit final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement(); - final String consumeTopicName = "debezium/mssql/mssql.dbo.customers"; + final String consumeTopicName = "debezium/mssql/mssql.TestDB.dbo.customers"; final String sourceName = "test-source-debezium-mssql-" + functionRuntimeType + "-" + randomName(8); final int numMessages = 1; From a102f7d2260ea0b3ca3a11b3a3041b6f604f3e31 Mon Sep 17 00:00:00 2001 From: mukesh-ctds Date: Mon, 29 Jul 2024 18:53:09 +0530 Subject: [PATCH 2/3] Updated to 2.7.0.Final --- pom.xml | 2 +- .../tests/integration/containers/DebeziumMongoDbContainer.java | 2 +- .../tests/integration/containers/DebeziumMySQLContainer.java | 2 +- .../integration/containers/DebeziumPostgreSqlContainer.java | 2 +- .../io/sources/debezium/DebeziumMySqlSourceTester.java | 2 +- .../io/sources/debezium/DebeziumPostgreSqlSourceTester.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index ce051cbb233ed..dfed0abaca831 100644 --- a/pom.xml +++ b/pom.xml @@ -201,7 +201,7 @@ flexible messaging model and an intuitive client API. 2.4.10 1.2.4 8.12.1 - 2.6.1.Final + 2.7.0.Final 42.5.5 8.0.30 diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java index bf6b8cd6462e2..bd7a4c0515b61 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java @@ -25,7 +25,7 @@ public class DebeziumMongoDbContainer extends ChaosContainer Date: Tue, 30 Jul 2024 13:31:03 +0530 Subject: [PATCH 3/3] Version downgraded to 2.6.1.Final --- pom.xml | 2 +- .../tests/integration/containers/DebeziumMongoDbContainer.java | 2 +- .../tests/integration/containers/DebeziumMySQLContainer.java | 2 +- .../integration/containers/DebeziumPostgreSqlContainer.java | 2 +- .../io/sources/debezium/DebeziumMySqlSourceTester.java | 2 +- .../io/sources/debezium/DebeziumPostgreSqlSourceTester.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index dfed0abaca831..ce051cbb233ed 100644 --- a/pom.xml +++ b/pom.xml @@ -201,7 +201,7 @@ flexible messaging model and an intuitive client API. 2.4.10 1.2.4 8.12.1 - 2.7.0.Final + 2.6.1.Final 42.5.5 8.0.30 diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java index bd7a4c0515b61..a8c968bbee91f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java @@ -25,7 +25,7 @@ public class DebeziumMongoDbContainer extends ChaosContainer