diff --git a/build.gradle b/build.gradle
index cbbdbfe7..f1440a9a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,5 +1,5 @@
/*
- * Copyright 2019 Aiven Oy
+ * Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,10 +29,17 @@ plugins {
// https://docs.gradle.org/current/userguide/publishing_maven.html
id "maven-publish"
+
+ // https://docs.gradle.org/current/userguide/idea_plugin.html
+ id "idea"
}
repositories {
jcenter()
+ // For kafka-avro-serializer and kafka-connect-avro-converter
+ maven {
+ url "https://packages.confluent.io/maven"
+ }
}
java {
@@ -103,6 +110,37 @@ publishing {
ext {
kafkaVersion = "2.2.0"
slf4jVersion = "1.7.25"
+ confluentPlatformVersion = "4.1.4"
+}
+
+sourceSets {
+ integrationTest {
+ java {
+ srcDirs = ['src/integration-test/java']
+ }
+ resources {
+ srcDirs = ['src/integration-test/resources']
+ }
+
+ compileClasspath += sourceSets.main.output + configurations.testRuntimeClasspath
+ runtimeClasspath += output + compileClasspath
+ }
+}
+
+idea {
+ module {
+ testSourceDirs += project.sourceSets.integrationTest.java.srcDirs
+ testSourceDirs += project.sourceSets.integrationTest.resources.srcDirs
+ }
+}
+
+configurations {
+ integrationTestImplementation.extendsFrom testImplementation
+ integrationTestRuntime.extendsFrom testRuntimeClasspath
+}
+
+test {
+ useJUnit()
}
dependencies {
@@ -128,8 +166,50 @@ dependencies {
testImplementation "org.apache.derby:derby:10.11.1.1"
testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"
+ integrationTestImplementation "org.apache.kafka:connect-api:$kafkaVersion"
+ integrationTestImplementation "org.apache.kafka:connect-runtime:$kafkaVersion"
+ integrationTestImplementation "org.apache.kafka:connect-json:$kafkaVersion"
+ integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion"
+ integrationTestImplementation "io.confluent:kafka-avro-serializer:$confluentPlatformVersion"
+ integrationTestImplementation "io.confluent:kafka-connect-avro-converter:$confluentPlatformVersion"
+ integrationTestImplementation "org.apache.avro:avro:1.8.1"
+
+ integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.7.1'
+ integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.1'
+
+ integrationTestImplementation "org.testcontainers:junit-jupiter:1.14.3"
+ integrationTestImplementation "org.testcontainers:testcontainers:1.14.3"
+ integrationTestImplementation "org.testcontainers:kafka:1.14.3"
+ integrationTestImplementation "org.testcontainers:postgresql:1.14.3"
+
+ integrationTestImplementation "cloud.localstack:localstack-utils:0.2.5"
+
+ integrationTestImplementation sourceSets.test.output
+}
+
+task integrationTest(type: Test) {
+ description = 'Runs the integration tests.'
+ group = 'verification'
+ testClassesDirs = sourceSets.integrationTest.output.classesDirs
+ classpath = sourceSets.integrationTest.runtimeClasspath
+
+ dependsOn test, distTar
+
+ useJUnitPlatform()
+
+ // Run always.
+ outputs.upToDateWhen { false }
+
+ if (project.hasProperty("tablePollIntervalMs")) {
+ //pass table.poll.interval.ms
+ systemProperty("integration-test.table.poll.interval.ms", project.hasProperty("tablePollIntervalMs"))
+ }
+
+ // Pass the distribution file path to the tests.
+ systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path)
}
+
processResources {
filesMatching('aiven-kafka-connect-jdbc-version.properties') {
expand(version: version)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 61eff178..31b517d4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -25,7 +25,7 @@
files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>
+ files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect|JdbcConnectService).java"/>
diff --git a/src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java b/src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java
new file mode 100644
index 00000000..f09671a6
--- /dev/null
+++ b/src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2020 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.connect.jdbc;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import cloud.localstack.docker.LocalstackDockerExtension;
+import cloud.localstack.docker.annotation.LocalstackDockerProperties;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@ExtendWith(LocalstackDockerExtension.class)
+@LocalstackDockerProperties(services = {"jdbc"})
+@Testcontainers
+public abstract class AbstractIT {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(AbstractIT.class);
+
+ protected static final String TEST_TOPIC_NAME = "test_pg_topic";
+
+ protected static KafkaProducer producer;
+
+ @Container
+ protected KafkaContainer kafkaContainer =
+ new KafkaContainer()
+ .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
+
+ @Container
+ protected SchemaRegistryContainer schemaRegistryContainer =
+ new SchemaRegistryContainer(kafkaContainer);
+
+ protected JdbcConnectService jdbcConnectService;
+
+ @BeforeEach
+ void startKafka() throws Exception {
+ LOGGER.info("Configure Kafka connect plugins");
+ final var pluginDir = setupPluginDir();
+ setupKafka();
+ setupKafkaConnect(pluginDir);
+ createProducer();
+ }
+
+ protected AdminClient adminClient() {
+ final Properties adminClientConfig = new Properties();
+ adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ return AdminClient.create(adminClientConfig);
+ }
+
+ private static Path setupPluginDir() throws Exception {
+ final var testDir = Files.createTempDirectory("aiven-kafka-connect-jdbc-test-");
+ final var distFile = Paths.get(System.getProperty("integration-test.distribution.file.path"));
+ assert Files.exists(distFile);
+
+ final var pluginDir = Paths.get(testDir.toString(), "plugins/aiven-kafka-connect-jdbc/");
+ Files.createDirectories(pluginDir);
+
+ final String cmd = String.format("tar -xf %s --strip-components=1 -C %s",
+ distFile.toString(), pluginDir.toString());
+ final Process p = Runtime.getRuntime().exec(cmd);
+ assert p.waitFor() == 0;
+ return pluginDir;
+ }
+
+ private void setupKafka() throws Exception {
+ LOGGER.info("Setup Kafka");
+ try (final var adminClient = adminClient()) {
+ LOGGER.info("Create topic {}", TEST_TOPIC_NAME);
+ final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1);
+ adminClient.createTopics(List.of(newTopic)).all().get();
+ }
+ }
+
+ private void setupKafkaConnect(final Path pluginDir) throws Exception {
+ LOGGER.info("Start Kafka Connect");
+ jdbcConnectService =
+ new JdbcConnectService(kafkaContainer.getBootstrapServers(), pluginDir);
+ jdbcConnectService.start();
+ }
+
+ private void createProducer() {
+ LOGGER.info("Create kafka producer");
+ final Map producerProps = new HashMap<>();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.KafkaAvroSerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.KafkaAvroSerializer");
+ producerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
+ producer = new KafkaProducer<>(producerProps);
+ }
+
+ @AfterEach
+ void stopKafka() throws Exception {
+ jdbcConnectService.stop();
+ }
+
+ protected Future sendMessageAsync(final int partition,
+ final String key,
+ final GenericRecord value) {
+ final var msg = new ProducerRecord<>(
+ TEST_TOPIC_NAME, partition,
+ key == null ? null : key,
+ value == null ? null : value);
+ return producer.send(msg);
+ }
+
+}
diff --git a/src/integration-test/java/io/aiven/connect/jdbc/JdbcConnectService.java b/src/integration-test/java/io/aiven/connect/jdbc/JdbcConnectService.java
new file mode 100644
index 00000000..7e7031e7
--- /dev/null
+++ b/src/integration-test/java/io/aiven/connect/jdbc/JdbcConnectService.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2020 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.connect.jdbc;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcConnectService {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(JdbcConnectService.class);
+
+ private final String bootstrapServers;
+
+ private final int offsetFlushInterval = 5000;
+
+ private final Path pluginDir;
+
+ private Herder herder;
+
+ private Connect connect;
+
+ public JdbcConnectService(final String bootstrapServers, final Path pluginDir) {
+ this.bootstrapServers = bootstrapServers;
+ this.pluginDir = pluginDir;
+ }
+
+ void start() {
+ final Map workerProps = Map.of(
+ "bootstrap.servers", bootstrapServers,
+ "offset.flush.interval.ms", Integer.toString(offsetFlushInterval),
+ // These don't matter much (each connector sets its own converters),
+ // but need to be filled with valid classes.
+ "key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter",
+ "value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter",
+ "internal.key.converter", "org.apache.kafka.connect.json.JsonConverter",
+ "internal.key.converter.schemas.enable", "false",
+ "internal.value.converter", "org.apache.kafka.connect.json.JsonConverter",
+ "internal.value.converter.schemas.enable", "false",
+ // Don't need it since we'll memory MemoryOffsetBackingStore.
+ "offset.storage.file.filename", "",
+ "plugin.path", pluginDir.toString());
+
+ final Time time = Time.SYSTEM;
+ final String workerId = "test-worker";
+ final String kafkaClusterId = "test-cluster";
+
+ final var plugins = new Plugins(workerProps);
+ final var config = new StandaloneConfig(workerProps);
+
+ final var worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore());
+ herder = new StandaloneHerder(worker, kafkaClusterId);
+ connect = new Connect(herder, new RestServer(config));
+
+ connect.start();
+ }
+
+ public void createConnector(final Map config) throws ExecutionException, InterruptedException {
+ assert herder != null;
+
+ final FutureCallback> cb =
+ new FutureCallback<>((error, info) -> {
+ if (error != null) {
+ LOGGER.error("Failed to create job");
+ } else {
+ LOGGER.info("Created connector {}", info.result().name());
+ }
+ });
+ herder.putConnectorConfig(
+ config.get(ConnectorConfig.NAME_CONFIG),
+ config, false, cb
+ );
+
+ final Herder.Created connectorInfoCreated = cb.get();
+ assert connectorInfoCreated.created();
+ }
+
+ void stop() {
+ herder.stop();
+ connect.stop();
+ }
+
+}
diff --git a/src/integration-test/java/io/aiven/connect/jdbc/SchemaRegistryContainer.java b/src/integration-test/java/io/aiven/connect/jdbc/SchemaRegistryContainer.java
new file mode 100644
index 00000000..8590af94
--- /dev/null
+++ b/src/integration-test/java/io/aiven/connect/jdbc/SchemaRegistryContainer.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2020 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.connect.jdbc;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.Base58;
+
+public final class SchemaRegistryContainer extends GenericContainer {
+ public static final int SCHEMA_REGISTRY_PORT = 8081;
+
+ public SchemaRegistryContainer(final KafkaContainer kafka) {
+ this("5.2.1", kafka);
+ }
+
+ public SchemaRegistryContainer(final String confluentPlatformVersion, final KafkaContainer kafka) {
+ super("confluentinc/cp-schema-registry:" + confluentPlatformVersion);
+
+ dependsOn(kafka);
+ withNetwork(kafka.getNetwork());
+ withNetworkAliases("schema-registry-" + Base58.randomString(6));
+
+ withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+ String.format("PLAINTEXT://%s:%s", kafka.getNetworkAliases().get(0), 9092));
+ withExposedPorts(SCHEMA_REGISTRY_PORT);
+ withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost");
+ }
+
+ public String getSchemaRegistryUrl() {
+ return String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(SCHEMA_REGISTRY_PORT));
+ }
+}
diff --git a/src/integration-test/java/io/aiven/connect/jdbc/pg/AbstractPgSqlAwareIT.java b/src/integration-test/java/io/aiven/connect/jdbc/pg/AbstractPgSqlAwareIT.java
new file mode 100644
index 00000000..5247ea34
--- /dev/null
+++ b/src/integration-test/java/io/aiven/connect/jdbc/pg/AbstractPgSqlAwareIT.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2020 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.connect.jdbc.pg;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+
+import io.aiven.connect.jdbc.AbstractIT;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.postgresql.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.utility.Base58;
+
+public class AbstractPgSqlAwareIT extends AbstractIT {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLContainer.class);
+
+ static long tablePollIntervalMs = 5000;
+
+ protected Connection pgConnection;
+
+ static {
+ try {
+ Class.forName(Driver.class.getName());
+ } catch (final ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Container
+ public PostgreSQLContainer> pgSqlContainer =
+ new PostgreSQLContainer<>(PostgreSQLContainer.IMAGE + ":11.10")
+ .withDatabaseName("test-connector-db")
+ .withUsername("test-user")
+ .withPassword(Base58.randomString(10));
+
+ @BeforeAll
+ static void setVariables() throws Exception {
+ tablePollIntervalMs = Long.valueOf(
+ System.getProperty(
+ "integration-test.table.poll.interval.ms",
+ String.valueOf(tablePollIntervalMs))
+ );
+ }
+
+ @BeforeEach
+ void startUp() throws Exception {
+ LOGGER.info("Create test table");
+ pgConnection =
+ DriverManager.getConnection(
+ pgSqlContainer.getJdbcUrl(),
+ pgSqlContainer.getUsername(),
+ pgSqlContainer.getPassword());
+ }
+
+ @AfterEach
+ void closeDbConnection() throws Exception {
+ pgConnection.close();
+ }
+
+}
diff --git a/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSinkConnectorIT.java b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSinkConnectorIT.java
new file mode 100644
index 00000000..13e747f3
--- /dev/null
+++ b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSinkConnectorIT.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2020 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.connect.jdbc.pg;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import io.aiven.connect.jdbc.JdbcSinkConnector;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PgSqlSinkConnectorIT extends AbstractPgSqlAwareIT {
+
+ static final Schema VALUE_SCHEMA = new Schema.Parser().parse(
+ "{"
+ + "\"type\":\"record\","
+ + "\"name\":\"pg_sql_types\","
+ + "\"fields\":"
+ + "["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ + "{\"name\":\"json_value\", \"type\":\"string\"},"
+ + "{\"name\":\"jsonb_value\", \"type\":\"string\"}"
+ + "]}");
+
+ static final String SELECT_QUERY = "SELECT id, json_value, jsonb_value FROM " + TEST_TOPIC_NAME + " ORDER BY id";
+
+ private final Random partitionRnd = new Random();
+
+ @BeforeEach
+ void createDbTable() throws Exception {
+ try (final var stm = pgConnection.createStatement()) {
+ final var createTableSql =
+ String.format(
+ "CREATE TABLE %s ("
+ + " id INT PRIMARY KEY NOT NULL,"
+ + " json_value JSON NOT NULL,"
+ + " jsonb_value JSONB NOT NULL"
+ + " )",
+ TEST_TOPIC_NAME
+ );
+ LOGGER.info("Create table: {}", createTableSql);
+ stm.execute(createTableSql);
+ }
+ }
+
+ /**
+ * Test for possible PgSQL cast types like: JSON, JSONB, ARRAY, UUID and etc.
+ */
+ @Test
+ void pgSqlSupportTypesForInsertMode() throws Exception {
+ createSinkConnector("pgsql-sink-insert-mode-connector", "insert");
+ final var sendFutures = new ArrayList>();
+ int cnt = 0;
+ for (int i = 0; i < 3; i++) {
+ final var keyAndRecord = createKeyRecord(cnt);
+ sendFutures.add(
+ sendMessageAsync(
+ partitionRnd.nextInt(4),
+ keyAndRecord.getLeft(),
+ keyAndRecord.getRight()));
+ cnt++;
+ }
+ producer.flush();
+
+ Thread.sleep(tablePollIntervalMs);
+
+ for (final Future sendFuture : sendFutures) {
+ sendFuture.get();
+ }
+
+ try (final var stm = pgConnection.createStatement();
+ final var rs = stm.executeQuery(SELECT_QUERY)) {
+ var counter = 0;
+ while (rs.next()) {
+ assertDbRecord(
+ String.format(
+ "id=%s, json_value={\"json_value\": %s}, jsonb_value={\"jsonb_value\": %s}",
+ counter, counter, counter),
+ rs);
+ counter++;
+ }
+ }
+
+ }
+
+ @Test
+ void pgSqlSupportTypesSupportForUpdateMode() throws Exception {
+ createSinkConnector("pgsql-sink-connector-update-mode", "update");
+ try (final var stm = pgConnection.createStatement()) {
+ final var insertSql = "INSERT INTO " + TEST_TOPIC_NAME
+ + "(id, json_value, jsonb_value) VALUES(1, '{}'::json, '{}'::jsonb)";
+ stm.executeUpdate(insertSql);
+ }
+
+ final var keyAndRecord = createKeyRecord(1);
+ final var sentMessage =
+ sendMessageAsync(partitionRnd.nextInt(4), keyAndRecord.getLeft(), keyAndRecord.getRight());
+
+ producer.flush();
+
+ Thread.sleep(tablePollIntervalMs);
+
+ sentMessage.get();
+ try (final var stm = pgConnection.createStatement();
+ final var rs = stm.executeQuery(SELECT_QUERY)) {
+ while (rs.next()) {
+ assertDbRecord(
+ "id=1, json_value={\"json_value\": 1}, jsonb_value={\"jsonb_value\": 1}",
+ rs);
+ }
+ }
+ }
+
+ @Test
+ void pgSqlSupportTypesForUpsertMode() throws Exception {
+ createSinkConnector("pgsql-sink-connector-upsert-mode", "upsert");
+ try (final var stm = pgConnection.createStatement()) {
+ final var insertSql = "INSERT INTO " + TEST_TOPIC_NAME
+ + "(id, json_value, jsonb_value) VALUES(1, '{}'::json, '{}'::jsonb)";
+ stm.executeUpdate(insertSql);
+ }
+
+ final var updateKeyAndRecord = createKeyRecord(1);
+ final var insertKeyAndRecord = createKeyRecord(2);
+ final var sentMessages =
+ List.of(
+ sendMessageAsync(
+ partitionRnd.nextInt(4),
+ updateKeyAndRecord.getLeft(),
+ updateKeyAndRecord.getRight()),
+ sendMessageAsync(
+ partitionRnd.nextInt(4),
+ insertKeyAndRecord.getLeft(),
+ insertKeyAndRecord.getRight()));
+ producer.flush();
+ Thread.sleep(tablePollIntervalMs);
+ for (final var sentMessage : sentMessages) {
+ sentMessage.get();
+ }
+ try (final var stm = pgConnection.createStatement();
+ final var rs = stm.executeQuery(SELECT_QUERY)) {
+ var counter = 1;
+ while (rs.next()) {
+ assertDbRecord(
+ String.format(
+ "id=%s, json_value={\"json_value\": %s}, jsonb_value={\"jsonb_value\": %s}",
+ counter, counter, counter
+ ), rs
+ );
+ counter++;
+ }
+ }
+ }
+
+ private void createSinkConnector(final String connectorName, final String insertMode) throws Exception {
+ LOGGER.info("Create sink connector");
+ final var config = new HashMap();
+ config.put("name", connectorName);
+ config.put("tasks.max", "1");
+ config.put("connector.class", JdbcSinkConnector.class.getCanonicalName());
+ config.put("insert.mode", insertMode);
+ config.put("batch.size", "1");
+ config.put("pk.mode", "record_value");
+ config.put("pk.fields", "id");
+ config.put("topics", TEST_TOPIC_NAME);
+ config.put("connection.url", pgSqlContainer.getJdbcUrl());
+ config.put("connection.user", pgSqlContainer.getUsername());
+ config.put("connection.password", pgSqlContainer.getPassword());
+ config.put("auto.create", "false");
+ config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
+ config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
+ config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
+ config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
+ jdbcConnectService.createConnector(config);
+ }
+
+ private Pair createKeyRecord(final int id) {
+ final var key = "key-" + id;
+ final GenericRecord value = new GenericData.Record(VALUE_SCHEMA);
+ value.put("id", id);
+ value.put("json_value", "{\"json_value\": " + id + "}");
+ value.put("jsonb_value", "{\"jsonb_value\": " + id + "}");
+ return Pair.of(key, value);
+ }
+
+ private void assertDbRecord(final String expectedRecord, final ResultSet rs) throws SQLException {
+ final var dbRecord = String.format(
+ "id=%s, json_value=%s, jsonb_value=%s",
+ rs.getInt("id"),
+ rs.getString("json_value"),
+ rs.getString("jsonb_value")
+ );
+ LOGGER.info("DB record: {}", dbRecord);
+ assertEquals(expectedRecord, dbRecord);
+ }
+
+}
diff --git a/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSourceConnectorIT.java b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSourceConnectorIT.java
new file mode 100644
index 00000000..65fa4465
--- /dev/null
+++ b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSourceConnectorIT.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2020 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.connect.jdbc.pg;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import io.aiven.connect.jdbc.JdbcSourceConnector;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonProcessingException;
+import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PgSqlSourceConnectorIT extends AbstractPgSqlAwareIT {
+
+ @BeforeEach
+ void createDbTable() throws Exception {
+ try (final var stm = pgConnection.createStatement()) {
+ stm.execute(
+ String.format("CREATE TABLE %s ("
+ + " id INT PRIMARY KEY NOT NULL,"
+ + " json_value JSON NOT NULL,"
+ + " jsonb_value JSONB NOT NULL"
+ + " )", TEST_TOPIC_NAME));
+ }
+ final var insertSql =
+ "INSERT INTO " + TEST_TOPIC_NAME
+ + "(id, json_value, jsonb_value) "
+ + "VALUES(?, ?::json, ?::jsonb)";
+ try (final var stm = pgConnection.prepareStatement(insertSql)) {
+ var rowCounter = 0;
+ for (int i = 1; i < 4; i++) {
+ stm.setInt(1, i);
+ stm.setString(2, String.format("{\"json_value\": %s}", i));
+ stm.setString(3, String.format("{\"jsonb_value\": %s}", i));
+ rowCounter += stm.executeUpdate();
+ }
+ assert rowCounter == 3;
+ }
+ }
+
+ @Test
+ void pgSqlSupportTypes() throws Exception {
+ final var config = new HashMap();
+ config.put("name", "pg-types-source-connector");
+ config.put("tasks.max", "1");
+ config.put("mode", "incrementing");
+ config.put("incrementing.column.name", "id");
+ config.put("connector.class", JdbcSourceConnector.class.getCanonicalName());
+ config.put("topic.prefix", "");
+ config.put("tables", "pg_source_table");
+ config.put("connection.url", pgSqlContainer.getJdbcUrl());
+ config.put("connection.user", pgSqlContainer.getUsername());
+ config.put("connection.password", pgSqlContainer.getPassword());
+ config.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ config.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+
+ jdbcConnectService.createConnector(config);
+
+ Thread.sleep(tablePollIntervalMs);
+
+ final var records = consumerRecords();
+ var counter = 1;
+ for (final var e : records.entrySet()) {
+ assertEquals(counter, e.getKey());
+ assertEquals(String.format("{\"json_value\": %s}", counter), e.getValue().getLeft());
+ assertEquals(String.format("{\"jsonb_value\": %s}", counter), e.getValue().getRight());
+ counter++;
+ }
+ }
+
+ private Map> consumerRecords() throws JsonProcessingException {
+ final var kafkaProperties = new Properties();
+ kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "pg_source_table_group");
+ kafkaProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ LOGGER.info("Create consumer with properties: {}", kafkaProperties);
+ final var consumer = new KafkaConsumer<>(kafkaProperties, new StringDeserializer(), new StringDeserializer());
+ consumer.subscribe(List.of(TEST_TOPIC_NAME));
+ final var objectMapper = new ObjectMapper();
+ final var records = new TreeMap>(Integer::compareTo);
+ try (final var c = consumer) {
+ for (final var record : c.poll(Duration.of(100, ChronoUnit.MILLIS))) {
+ final var json = objectMapper.readTree(record.value());
+ final var payload = json.get("payload");
+ records.put(
+ payload.get("id").asInt(),
+ Pair.of(payload.get("json_value").asText(), payload.get("jsonb_value").asText())
+ );
+ }
+ }
+ return records;
+ }
+
+}