From 988b57762154c145076645b13f6b0f6eab9c5635 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Mon, 8 Feb 2021 14:44:16 +0000 Subject: [PATCH 1/5] Release version 6.3.1 --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index c3f76dae..91baabaa 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ group=io.aiven -version=6.4.0-SNAPSHOT +version=6.3.1 From 4f49aea75d5ceddbe2efb03934533d8379387e8d Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Mon, 8 Feb 2021 14:44:16 +0000 Subject: [PATCH 2/5] Bump version to 6.4.0-SNAPSHOT --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 91baabaa..c3f76dae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ group=io.aiven -version=6.3.1 +version=6.4.0-SNAPSHOT From 0c46aea4659d65427ac1228905718780503bb370 Mon Sep 17 00:00:00 2001 From: Ivan Yurchenko Date: Mon, 8 Feb 2021 17:36:56 +0200 Subject: [PATCH 3/5] Add Github Actions workflow for making releases --- .github/workflows/create_release.yml | 77 ++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 .github/workflows/create_release.yml diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml new file mode 100644 index 00000000..59437bf1 --- /dev/null +++ b/.github/workflows/create_release.yml @@ -0,0 +1,77 @@ +name: Create release + +on: + workflow_dispatch: + inputs: + commit_hash: + description: "Hash of 'Release version x.y.z' commit" + required: true + +jobs: + build: + name: Create Release + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + ref: ${{ github.event.inputs.commit_hash }} + + - name: Check commit title and extract version + run: | + export commit_title=$(git log --pretty=format:%s -1 ${{ github.event.inputs.commit_hash }}) + echo "Commit title: $commit_title" + if [[ $commit_title =~ ^Release\ version\ [0-9]*\.[0-9]*\.[0-9]*$ ]]; then + echo "Valid commit title" + else + echo "Invalid commit title" + exit 1 + fi + export version=$(echo ${commit_title} | sed s/^Release\ version\ //g) + echo "Will use version ${version}" + echo "version=${version}" >> $GITHUB_ENV + + - name: Build + run: | + ./gradlew distTar distZip + + export tar_file=$(ls ./build/distributions/ | grep tar) + export zip_file=$(ls ./build/distributions/ | grep zip) + echo tar_file=${tar_file} >> $GITHUB_ENV + echo zip_file=${zip_file} >> $GITHUB_ENV + + echo tar_path=`realpath ./build/distributions/${tar_file}` >> $GITHUB_ENV + echo zip_path=`realpath ./build/distributions/${zip_file}` >> $GITHUB_ENV + + - name: Create release draft + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: "v${{ env.version }}" + release_name: "v${{ env.version }}" + body: | + *Fill in* + draft: true + prerelease: false + + - name: Upload tar + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: ${{ env.tar_path }} + asset_name: ${{ env.tar_file }} + asset_content_type: application/tar + + - name: Upload zip + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: ${{ env.zip_path }} + asset_name: ${{ env.zip_file }} + asset_content_type: application/zip From 816a254bc75c8fb28ef8f3c4b082e5f15b3ef497 Mon Sep 17 00:00:00 2001 From: Andrey Pleskach Date: Tue, 9 Feb 2021 10:58:25 +0100 Subject: [PATCH 4/5] Setup jdk version in workflow The default JDK version in workflow build is 8 while we use 11. This change set up JDK version to 11 for the build --- .github/workflows/create_release.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml index 59437bf1..6793683a 100644 --- a/.github/workflows/create_release.yml +++ b/.github/workflows/create_release.yml @@ -12,11 +12,16 @@ jobs: name: Create Release runs-on: ubuntu-latest steps: + - name: Setup Java SDK + uses: actions/setup-java@v1.4.3 + with: + java-version: 11 + - name: Checkout code uses: actions/checkout@v2 with: ref: ${{ github.event.inputs.commit_hash }} - + - name: Check commit title and extract version run: | export commit_title=$(git log --pretty=format:%s -1 ${{ github.event.inputs.commit_hash }}) From 2c2960e15bd940010a27b16af41a483cec8705bf Mon Sep 17 00:00:00 2001 From: Andrey Pleskach Date: Thu, 26 Nov 2020 12:07:25 +0100 Subject: [PATCH 5/5] Integation tests for PgSQL connector --- build.gradle | 82 ++++++- checkstyle/suppressions.xml | 2 +- .../io/aiven/connect/jdbc/AbstractIT.java | 143 ++++++++++++ .../connect/jdbc/JdbcConnectService.java | 113 ++++++++++ .../connect/jdbc/SchemaRegistryContainer.java | 46 ++++ .../connect/jdbc/pg/AbstractPgSqlAwareIT.java | 91 ++++++++ .../connect/jdbc/pg/PgSqlSinkConnectorIT.java | 205 ++++++++++++++++++ .../jdbc/pg/PgSqlSourceConnectorIT.java | 137 ++++++++++++ 8 files changed, 817 insertions(+), 2 deletions(-) create mode 100644 src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java create mode 100644 src/integration-test/java/io/aiven/connect/jdbc/JdbcConnectService.java create mode 100644 src/integration-test/java/io/aiven/connect/jdbc/SchemaRegistryContainer.java create mode 100644 src/integration-test/java/io/aiven/connect/jdbc/pg/AbstractPgSqlAwareIT.java create mode 100644 src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSinkConnectorIT.java create mode 100644 src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSourceConnectorIT.java diff --git a/build.gradle b/build.gradle index cbbdbfe7..f1440a9a 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy + * Copyright 2020 Aiven Oy * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,10 +29,17 @@ plugins { // https://docs.gradle.org/current/userguide/publishing_maven.html id "maven-publish" + + // https://docs.gradle.org/current/userguide/idea_plugin.html + id "idea" } repositories { jcenter() + // For kafka-avro-serializer and kafka-connect-avro-converter + maven { + url "https://packages.confluent.io/maven" + } } java { @@ -103,6 +110,37 @@ publishing { ext { kafkaVersion = "2.2.0" slf4jVersion = "1.7.25" + confluentPlatformVersion = "4.1.4" +} + +sourceSets { + integrationTest { + java { + srcDirs = ['src/integration-test/java'] + } + resources { + srcDirs = ['src/integration-test/resources'] + } + + compileClasspath += sourceSets.main.output + configurations.testRuntimeClasspath + runtimeClasspath += output + compileClasspath + } +} + +idea { + module { + testSourceDirs += project.sourceSets.integrationTest.java.srcDirs + testSourceDirs += project.sourceSets.integrationTest.resources.srcDirs + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntimeClasspath +} + +test { + useJUnit() } dependencies { @@ -128,8 +166,50 @@ dependencies { testImplementation "org.apache.derby:derby:10.11.1.1" testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion" + integrationTestImplementation "org.apache.kafka:connect-api:$kafkaVersion" + integrationTestImplementation "org.apache.kafka:connect-runtime:$kafkaVersion" + integrationTestImplementation "org.apache.kafka:connect-json:$kafkaVersion" + integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion" + integrationTestImplementation "io.confluent:kafka-avro-serializer:$confluentPlatformVersion" + integrationTestImplementation "io.confluent:kafka-connect-avro-converter:$confluentPlatformVersion" + integrationTestImplementation "org.apache.avro:avro:1.8.1" + + integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.7.1' + integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.1' + + integrationTestImplementation "org.testcontainers:junit-jupiter:1.14.3" + integrationTestImplementation "org.testcontainers:testcontainers:1.14.3" + integrationTestImplementation "org.testcontainers:kafka:1.14.3" + integrationTestImplementation "org.testcontainers:postgresql:1.14.3" + + integrationTestImplementation "cloud.localstack:localstack-utils:0.2.5" + + integrationTestImplementation sourceSets.test.output +} + +task integrationTest(type: Test) { + description = 'Runs the integration tests.' + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + + dependsOn test, distTar + + useJUnitPlatform() + + // Run always. + outputs.upToDateWhen { false } + + if (project.hasProperty("tablePollIntervalMs")) { + //pass table.poll.interval.ms + systemProperty("integration-test.table.poll.interval.ms", project.hasProperty("tablePollIntervalMs")) + } + + // Pass the distribution file path to the tests. + systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path) } + processResources { filesMatching('aiven-kafka-connect-jdbc-version.properties') { expand(version: version) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 61eff178..31b517d4 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -25,7 +25,7 @@ files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/> + files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect|JdbcConnectService).java"/> diff --git a/src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java b/src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java new file mode 100644 index 00000000..f09671a6 --- /dev/null +++ b/src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java @@ -0,0 +1,143 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import cloud.localstack.docker.LocalstackDockerExtension; +import cloud.localstack.docker.annotation.LocalstackDockerProperties; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@ExtendWith(LocalstackDockerExtension.class) +@LocalstackDockerProperties(services = {"jdbc"}) +@Testcontainers +public abstract class AbstractIT { + + static final Logger LOGGER = LoggerFactory.getLogger(AbstractIT.class); + + protected static final String TEST_TOPIC_NAME = "test_pg_topic"; + + protected static KafkaProducer producer; + + @Container + protected KafkaContainer kafkaContainer = + new KafkaContainer() + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); + + @Container + protected SchemaRegistryContainer schemaRegistryContainer = + new SchemaRegistryContainer(kafkaContainer); + + protected JdbcConnectService jdbcConnectService; + + @BeforeEach + void startKafka() throws Exception { + LOGGER.info("Configure Kafka connect plugins"); + final var pluginDir = setupPluginDir(); + setupKafka(); + setupKafkaConnect(pluginDir); + createProducer(); + } + + protected AdminClient adminClient() { + final Properties adminClientConfig = new Properties(); + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + return AdminClient.create(adminClientConfig); + } + + private static Path setupPluginDir() throws Exception { + final var testDir = Files.createTempDirectory("aiven-kafka-connect-jdbc-test-"); + final var distFile = Paths.get(System.getProperty("integration-test.distribution.file.path")); + assert Files.exists(distFile); + + final var pluginDir = Paths.get(testDir.toString(), "plugins/aiven-kafka-connect-jdbc/"); + Files.createDirectories(pluginDir); + + final String cmd = String.format("tar -xf %s --strip-components=1 -C %s", + distFile.toString(), pluginDir.toString()); + final Process p = Runtime.getRuntime().exec(cmd); + assert p.waitFor() == 0; + return pluginDir; + } + + private void setupKafka() throws Exception { + LOGGER.info("Setup Kafka"); + try (final var adminClient = adminClient()) { + LOGGER.info("Create topic {}", TEST_TOPIC_NAME); + final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1); + adminClient.createTopics(List.of(newTopic)).all().get(); + } + } + + private void setupKafkaConnect(final Path pluginDir) throws Exception { + LOGGER.info("Start Kafka Connect"); + jdbcConnectService = + new JdbcConnectService(kafkaContainer.getBootstrapServers(), pluginDir); + jdbcConnectService.start(); + } + + private void createProducer() { + LOGGER.info("Create kafka producer"); + final Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.KafkaAvroSerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.KafkaAvroSerializer"); + producerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + producer = new KafkaProducer<>(producerProps); + } + + @AfterEach + void stopKafka() throws Exception { + jdbcConnectService.stop(); + } + + protected Future sendMessageAsync(final int partition, + final String key, + final GenericRecord value) { + final var msg = new ProducerRecord<>( + TEST_TOPIC_NAME, partition, + key == null ? null : key, + value == null ? null : value); + return producer.send(msg); + } + +} diff --git a/src/integration-test/java/io/aiven/connect/jdbc/JdbcConnectService.java b/src/integration-test/java/io/aiven/connect/jdbc/JdbcConnectService.java new file mode 100644 index 00000000..7e7031e7 --- /dev/null +++ b/src/integration-test/java/io/aiven/connect/jdbc/JdbcConnectService.java @@ -0,0 +1,113 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc; + +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.Connect; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.apache.kafka.connect.util.FutureCallback; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JdbcConnectService { + + static final Logger LOGGER = LoggerFactory.getLogger(JdbcConnectService.class); + + private final String bootstrapServers; + + private final int offsetFlushInterval = 5000; + + private final Path pluginDir; + + private Herder herder; + + private Connect connect; + + public JdbcConnectService(final String bootstrapServers, final Path pluginDir) { + this.bootstrapServers = bootstrapServers; + this.pluginDir = pluginDir; + } + + void start() { + final Map workerProps = Map.of( + "bootstrap.servers", bootstrapServers, + "offset.flush.interval.ms", Integer.toString(offsetFlushInterval), + // These don't matter much (each connector sets its own converters), + // but need to be filled with valid classes. + "key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter", + "value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter", + "internal.key.converter", "org.apache.kafka.connect.json.JsonConverter", + "internal.key.converter.schemas.enable", "false", + "internal.value.converter", "org.apache.kafka.connect.json.JsonConverter", + "internal.value.converter.schemas.enable", "false", + // Don't need it since we'll memory MemoryOffsetBackingStore. + "offset.storage.file.filename", "", + "plugin.path", pluginDir.toString()); + + final Time time = Time.SYSTEM; + final String workerId = "test-worker"; + final String kafkaClusterId = "test-cluster"; + + final var plugins = new Plugins(workerProps); + final var config = new StandaloneConfig(workerProps); + + final var worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore()); + herder = new StandaloneHerder(worker, kafkaClusterId); + connect = new Connect(herder, new RestServer(config)); + + connect.start(); + } + + public void createConnector(final Map config) throws ExecutionException, InterruptedException { + assert herder != null; + + final FutureCallback> cb = + new FutureCallback<>((error, info) -> { + if (error != null) { + LOGGER.error("Failed to create job"); + } else { + LOGGER.info("Created connector {}", info.result().name()); + } + }); + herder.putConnectorConfig( + config.get(ConnectorConfig.NAME_CONFIG), + config, false, cb + ); + + final Herder.Created connectorInfoCreated = cb.get(); + assert connectorInfoCreated.created(); + } + + void stop() { + herder.stop(); + connect.stop(); + } + +} diff --git a/src/integration-test/java/io/aiven/connect/jdbc/SchemaRegistryContainer.java b/src/integration-test/java/io/aiven/connect/jdbc/SchemaRegistryContainer.java new file mode 100644 index 00000000..8590af94 --- /dev/null +++ b/src/integration-test/java/io/aiven/connect/jdbc/SchemaRegistryContainer.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.Base58; + +public final class SchemaRegistryContainer extends GenericContainer { + public static final int SCHEMA_REGISTRY_PORT = 8081; + + public SchemaRegistryContainer(final KafkaContainer kafka) { + this("5.2.1", kafka); + } + + public SchemaRegistryContainer(final String confluentPlatformVersion, final KafkaContainer kafka) { + super("confluentinc/cp-schema-registry:" + confluentPlatformVersion); + + dependsOn(kafka); + withNetwork(kafka.getNetwork()); + withNetworkAliases("schema-registry-" + Base58.randomString(6)); + + withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + String.format("PLAINTEXT://%s:%s", kafka.getNetworkAliases().get(0), 9092)); + withExposedPorts(SCHEMA_REGISTRY_PORT); + withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost"); + } + + public String getSchemaRegistryUrl() { + return String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(SCHEMA_REGISTRY_PORT)); + } +} diff --git a/src/integration-test/java/io/aiven/connect/jdbc/pg/AbstractPgSqlAwareIT.java b/src/integration-test/java/io/aiven/connect/jdbc/pg/AbstractPgSqlAwareIT.java new file mode 100644 index 00000000..5a82f9a4 --- /dev/null +++ b/src/integration-test/java/io/aiven/connect/jdbc/pg/AbstractPgSqlAwareIT.java @@ -0,0 +1,91 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.pg; + +import java.sql.Connection; +import java.sql.DriverManager; + +import io.aiven.connect.jdbc.AbstractIT; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.postgresql.Driver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.Base58; + +public class AbstractPgSqlAwareIT extends AbstractIT { + + static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLContainer.class); + + static long tablePollIntervalMs = 5000; + + protected Connection pgConnection; + + static { + try { + Class.forName(Driver.class.getName()); + } catch (final ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + @Container + public PostgreSQLContainer pgSqlContainer = + new PostgreSQLContainer<>(PostgreSQLContainer.IMAGE + ":11.10") + .withDatabaseName("test-connector-db") + .withUsername("test-user") + .withPassword(Base58.randomString(10)); + + @BeforeAll + static void setVariables() throws Exception { + tablePollIntervalMs = Long.parseLong( + System.getProperty( + "integration-test.table.poll.interval.ms", + String.valueOf(tablePollIntervalMs)) + ); + } + + @BeforeEach + void startUp() throws Exception { + pgConnection = + DriverManager.getConnection( + pgSqlContainer.getJdbcUrl(), + pgSqlContainer.getUsername(), + pgSqlContainer.getPassword()); + LOGGER.info("Create test table"); + try (final var stm = pgConnection.createStatement()) { + // PgSQL for cast types JSON, JSONB and UUID + stm.execute( + String.format("CREATE TABLE %s (" + + " id INT PRIMARY KEY NOT NULL," + + " json_value JSON NOT NULL," + + " jsonb_value JSONB NOT NULL," + + " uuid_value UUID NOT NULL" + + " )", TEST_TOPIC_NAME)); + } + } + + @AfterEach + void closeDbConnection() throws Exception { + pgConnection.close(); + } + +} diff --git a/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSinkConnectorIT.java b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSinkConnectorIT.java new file mode 100644 index 00000000..6c9d0626 --- /dev/null +++ b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSinkConnectorIT.java @@ -0,0 +1,205 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.pg; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.RecordMetadata; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PgSqlSinkConnectorIT extends AbstractPgSqlAwareIT { + + static final Schema VALUE_SCHEMA = new Schema.Parser().parse( + "{" + + "\"type\":\"record\"," + + "\"name\":\"pg_sql_types\"," + + "\"fields\":" + + "[" + + "{\"name\":\"id\",\"type\":\"int\"}," + + "{\"name\":\"json_value\", \"type\":\"string\"}," + + "{\"name\":\"jsonb_value\", \"type\":\"string\"}," + + "{\"name\":\"uuid_value\", \"type\":\"string\"}" + + "]}"); + + static final String SELECT_QUERY = + "SELECT id, json_value, jsonb_value, uuid_value FROM " + + TEST_TOPIC_NAME + + " ORDER BY id"; + + private final Random partitionRnd = new Random(); + + @Test + void pgSqlSupportTypesForInsertMode() throws Exception { + createSinkConnector("pgsql-sink-insert-mode-connector", "insert"); + final var sendFutures = new ArrayList>(); + final var expectedRecords = new ArrayList(); + int cnt = 0; + for (int i = 0; i < 3; i++) { + final var keyAndRecord = createKeyRecord(cnt); + expectedRecords.add(keyAndRecord.getValue()); + sendFutures.add( + sendMessageAsync( + partitionRnd.nextInt(4), + keyAndRecord.getLeft(), + keyAndRecord.getRight())); + cnt++; + } + producer.flush(); + + Thread.sleep(tablePollIntervalMs); + + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + + System.out.println("expectedRecords: " + expectedRecords); + + try (final var stm = pgConnection.createStatement(); + final var rs = stm.executeQuery(SELECT_QUERY)) { + var counter = 0; + while (rs.next()) { + assertDbRecord(expectedRecords.get(counter), rs); + counter++; + } + } + + } + + @Test + void pgSqlSupportTypesSupportForUpdateMode() throws Exception { + createSinkConnector("pgsql-sink-connector-update-mode", "update"); + try (final var stm = pgConnection.createStatement()) { + final var insertSql = "INSERT INTO " + TEST_TOPIC_NAME + + "(id, json_value, jsonb_value, uuid_value) VALUES(1, '{}'::json, '{}'::jsonb, '" + + UUID.randomUUID() + "'::uuid)"; + stm.executeUpdate(insertSql); + } + + final var keyAndRecord = createKeyRecord(1); + final var sentMessage = + sendMessageAsync(partitionRnd.nextInt(4), keyAndRecord.getLeft(), keyAndRecord.getRight()); + + producer.flush(); + + Thread.sleep(tablePollIntervalMs); + + sentMessage.get(); + try (final var stm = pgConnection.createStatement(); + final var rs = stm.executeQuery(SELECT_QUERY)) { + while (rs.next()) { + assertDbRecord(keyAndRecord.getRight(), rs); + } + } + } + + @Test + void pgSqlSupportTypesForUpsertMode() throws Exception { + createSinkConnector("pgsql-sink-connector-upsert-mode", "upsert"); + try (final var stm = pgConnection.createStatement()) { + final var insertSql = "INSERT INTO " + TEST_TOPIC_NAME + + "(id, json_value, jsonb_value, uuid_value) VALUES(1, '{}'::json, '{}'::jsonb, '" + + UUID.randomUUID() + "'::uuid)"; + stm.executeUpdate(insertSql); + } + + final var expectedRecords = new ArrayList(); + final var updateKeyAndRecord = createKeyRecord(1); + final var insertKeyAndRecord = createKeyRecord(2); + expectedRecords.add(updateKeyAndRecord.getRight()); + expectedRecords.add(insertKeyAndRecord.getRight()); + final var sentMessages = + List.of( + sendMessageAsync( + partitionRnd.nextInt(4), + updateKeyAndRecord.getLeft(), + updateKeyAndRecord.getRight()), + sendMessageAsync( + partitionRnd.nextInt(4), + insertKeyAndRecord.getLeft(), + insertKeyAndRecord.getRight())); + producer.flush(); + Thread.sleep(tablePollIntervalMs); + for (final var sentMessage : sentMessages) { + sentMessage.get(); + } + try (final var stm = pgConnection.createStatement(); + final var rs = stm.executeQuery(SELECT_QUERY)) { + var counter = 0; + while (rs.next()) { + assertDbRecord(expectedRecords.get(counter), rs); + counter++; + } + } + } + + private void createSinkConnector(final String connectorName, final String insertMode) throws Exception { + LOGGER.info("Create sink connector"); + final var config = new HashMap(); + config.put("name", connectorName); + config.put("tasks.max", "1"); + config.put("connector.class", JdbcSinkConnector.class.getCanonicalName()); + config.put("insert.mode", insertMode); + config.put("batch.size", "1"); + config.put("pk.mode", "record_value"); + config.put("pk.fields", "id"); + config.put("topics", TEST_TOPIC_NAME); + config.put("connection.url", pgSqlContainer.getJdbcUrl()); + config.put("connection.user", pgSqlContainer.getUsername()); + config.put("connection.password", pgSqlContainer.getPassword()); + config.put("auto.create", "false"); + config.put("key.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + config.put("value.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + jdbcConnectService.createConnector(config); + } + + private Pair createKeyRecord(final int id) { + final var key = "key-" + id; + final GenericRecord value = new GenericData.Record(VALUE_SCHEMA); + value.put("id", id); + value.put("json_value", "{\"json_value\": " + id + "}"); + value.put("jsonb_value", "{\"jsonb_value\": " + id + "}"); + value.put("uuid_value", UUID.randomUUID().toString()); + return Pair.of(key, value); + } + + private void assertDbRecord(final GenericRecord expectedRecord, final ResultSet rs) throws SQLException { + LOGGER.info("Expected record: {}", expectedRecord); + assertEquals(Integer.valueOf(expectedRecord.get("id").toString()), rs.getInt("id")); + assertEquals(expectedRecord.get("json_value").toString(), rs.getString("json_value")); + assertEquals(expectedRecord.get("jsonb_value").toString(), rs.getString("jsonb_value")); + assertEquals(expectedRecord.get("uuid_value").toString(), rs.getString("uuid_value")); + } + +} diff --git a/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSourceConnectorIT.java b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSourceConnectorIT.java new file mode 100644 index 00000000..81058ee8 --- /dev/null +++ b/src/integration-test/java/io/aiven/connect/jdbc/pg/PgSqlSourceConnectorIT.java @@ -0,0 +1,137 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.pg; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import io.aiven.connect.jdbc.JdbcSourceConnector; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonProcessingException; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class PgSqlSourceConnectorIT extends AbstractPgSqlAwareIT { + + @BeforeEach + void createTableTestData() throws Exception { + final var insertSql = + "INSERT INTO " + TEST_TOPIC_NAME + + "(id, json_value, jsonb_value, uuid_value) " + + "VALUES(?, ?::json, ?::jsonb, ?::uuid)"; + try (final var stm = pgConnection.prepareStatement(insertSql)) { + var rowCounter = 0; + for (int i = 1; i < 4; i++) { + stm.setInt(1, i); + stm.setString(2, String.format("{\"json_value\": %s}", i)); + stm.setString(3, String.format("{\"jsonb_value\": %s}", i)); + stm.setObject(4, UUID.randomUUID()); + rowCounter += stm.executeUpdate(); + } + assert rowCounter == 3; + } + } + + @Test + void pgSqlSupportTypes() throws Exception { + final var config = new HashMap(); + config.put("name", "pg-types-source-connector"); + config.put("tasks.max", "1"); + config.put("mode", "incrementing"); + config.put("incrementing.column.name", "id"); + config.put("connector.class", JdbcSourceConnector.class.getCanonicalName()); + config.put("topic.prefix", ""); + config.put("tables", "pg_source_table"); + config.put("connection.url", pgSqlContainer.getJdbcUrl()); + config.put("connection.user", pgSqlContainer.getUsername()); + config.put("connection.password", pgSqlContainer.getPassword()); + config.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + config.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + + jdbcConnectService.createConnector(config); + + Thread.sleep(tablePollIntervalMs); + + final var records = consumerRecords(); + var counter = 1; + for (final var e : records.entrySet()) { + assertEquals(counter, e.getKey()); + assertEquals(String.format("{\"json_value\": %s}", counter), e.getValue().jsonValue); + assertEquals(String.format("{\"jsonb_value\": %s}", counter), e.getValue().jsonbValue); + assertNotNull(e.getValue().uuidValue); + UUID.fromString(e.getValue().uuidValue); // ignore result just verify that it's UUID value + counter++; + } + } + + private Map consumerRecords() throws JsonProcessingException { + final var kafkaProperties = new Properties(); + kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "pg_source_table_group"); + kafkaProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + LOGGER.info("Create consumer with properties: {}", kafkaProperties); + final var consumer = new KafkaConsumer<>(kafkaProperties, new StringDeserializer(), new StringDeserializer()); + consumer.subscribe(List.of(TEST_TOPIC_NAME)); + final var objectMapper = new ObjectMapper(); + final var records = new TreeMap(Integer::compareTo); + try (final var c = consumer) { + for (final var record : c.poll(Duration.of(100, ChronoUnit.MILLIS))) { + final var json = objectMapper.readTree(record.value()); + final var payload = json.get("payload"); + records.put( + payload.get("id").asInt(), + new ConsumerRecord( + payload.get("json_value").asText(), + payload.get("jsonb_value").asText(), + payload.get("uuid_value").asText() + ) + ); + } + } + return records; + } + + static final class ConsumerRecord { + + final String jsonValue; + + final String jsonbValue; + + final String uuidValue; + + public ConsumerRecord(final String jsonValue, final String jsonbValue, final String uuidValue) { + this.jsonValue = jsonValue; + this.jsonbValue = jsonbValue; + this.uuidValue = uuidValue; + } + } + +}