Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integation tests for PgSQL connector #67

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions .github/workflows/create_release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: Create release

on:
workflow_dispatch:
inputs:
commit_hash:
description: "Hash of 'Release version x.y.z' commit"
required: true

jobs:
build:
name: Create Release
runs-on: ubuntu-latest
steps:
- name: Setup Java SDK
uses: actions/[email protected]
with:
java-version: 11

- name: Checkout code
uses: actions/checkout@v2
with:
ref: ${{ github.event.inputs.commit_hash }}

- name: Check commit title and extract version
run: |
export commit_title=$(git log --pretty=format:%s -1 ${{ github.event.inputs.commit_hash }})
echo "Commit title: $commit_title"
if [[ $commit_title =~ ^Release\ version\ [0-9]*\.[0-9]*\.[0-9]*$ ]]; then
echo "Valid commit title"
else
echo "Invalid commit title"
exit 1
fi
export version=$(echo ${commit_title} | sed s/^Release\ version\ //g)
echo "Will use version ${version}"
echo "version=${version}" >> $GITHUB_ENV

- name: Build
run: |
./gradlew distTar distZip

export tar_file=$(ls ./build/distributions/ | grep tar)
export zip_file=$(ls ./build/distributions/ | grep zip)
echo tar_file=${tar_file} >> $GITHUB_ENV
echo zip_file=${zip_file} >> $GITHUB_ENV

echo tar_path=`realpath ./build/distributions/${tar_file}` >> $GITHUB_ENV
echo zip_path=`realpath ./build/distributions/${zip_file}` >> $GITHUB_ENV

- name: Create release draft
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: "v${{ env.version }}"
release_name: "v${{ env.version }}"
body: |
*Fill in*
draft: true
prerelease: false

- name: Upload tar
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ${{ env.tar_path }}
asset_name: ${{ env.tar_file }}
asset_content_type: application/tar

- name: Upload zip
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ${{ env.zip_path }}
asset_name: ${{ env.zip_file }}
asset_content_type: application/zip
82 changes: 81 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Aiven Oy
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,10 +29,17 @@ plugins {

// https://docs.gradle.org/current/userguide/publishing_maven.html
id "maven-publish"

// https://docs.gradle.org/current/userguide/idea_plugin.html
id "idea"
}

repositories {
jcenter()
// For kafka-avro-serializer and kafka-connect-avro-converter
maven {
url "https://packages.confluent.io/maven"
}
}

java {
Expand Down Expand Up @@ -103,6 +110,37 @@ publishing {
ext {
kafkaVersion = "2.2.0"
slf4jVersion = "1.7.25"
confluentPlatformVersion = "4.1.4"
}

sourceSets {
integrationTest {
java {
srcDirs = ['src/integration-test/java']
}
resources {
srcDirs = ['src/integration-test/resources']
}

compileClasspath += sourceSets.main.output + configurations.testRuntimeClasspath
runtimeClasspath += output + compileClasspath
}
}

idea {
module {
testSourceDirs += project.sourceSets.integrationTest.java.srcDirs
testSourceDirs += project.sourceSets.integrationTest.resources.srcDirs
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntimeClasspath
}

test {
useJUnit()
}

dependencies {
Expand All @@ -128,8 +166,50 @@ dependencies {
testImplementation "org.apache.derby:derby:10.11.1.1"
testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"

integrationTestImplementation "org.apache.kafka:connect-api:$kafkaVersion"
integrationTestImplementation "org.apache.kafka:connect-runtime:$kafkaVersion"
integrationTestImplementation "org.apache.kafka:connect-json:$kafkaVersion"
integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion"
integrationTestImplementation "io.confluent:kafka-avro-serializer:$confluentPlatformVersion"
integrationTestImplementation "io.confluent:kafka-connect-avro-converter:$confluentPlatformVersion"
integrationTestImplementation "org.apache.avro:avro:1.8.1"

integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.7.1'
integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.1'

integrationTestImplementation "org.testcontainers:junit-jupiter:1.14.3"
integrationTestImplementation "org.testcontainers:testcontainers:1.14.3"
integrationTestImplementation "org.testcontainers:kafka:1.14.3"
integrationTestImplementation "org.testcontainers:postgresql:1.14.3"

integrationTestImplementation "cloud.localstack:localstack-utils:0.2.5"

integrationTestImplementation sourceSets.test.output
}

task integrationTest(type: Test) {
description = 'Runs the integration tests.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath

dependsOn test, distTar

useJUnitPlatform()

// Run always.
outputs.upToDateWhen { false }

if (project.hasProperty("tablePollIntervalMs")) {
//pass table.poll.interval.ms
systemProperty("integration-test.table.poll.interval.ms", project.hasProperty("tablePollIntervalMs"))
}
Comment on lines +203 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's a good idea to put the burden of choosing this value (or even knowing the value must be chosen) on whoever is going to run the tests. For example, I frankly have no idea what I should insert here without some research. We just need one good enough value set as a constant in tests somewhere.


// Pass the distribution file path to the tests.
systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path)
}


processResources {
filesMatching('aiven-kafka-connect-jdbc-version.properties') {
expand(version: version)
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect).java"/>
files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect|JdbcConnectService).java"/>

<suppress checks="NPathComplexity"
files="(DataConverter|FieldsMetadata|JdbcSourceTask|GenericDatabaseDialect).java"/>
Expand Down
143 changes: 143 additions & 0 deletions src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import cloud.localstack.docker.LocalstackDockerExtension;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@ExtendWith(LocalstackDockerExtension.class)
@LocalstackDockerProperties(services = {"jdbc"})
@Testcontainers
public abstract class AbstractIT {

static final Logger LOGGER = LoggerFactory.getLogger(AbstractIT.class);

protected static final String TEST_TOPIC_NAME = "test_pg_topic";

protected static KafkaProducer<String, GenericRecord> producer;

@Container
protected KafkaContainer kafkaContainer =
new KafkaContainer()
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");

@Container
protected SchemaRegistryContainer schemaRegistryContainer =
new SchemaRegistryContainer(kafkaContainer);

protected JdbcConnectService jdbcConnectService;

@BeforeEach
void startKafka() throws Exception {
LOGGER.info("Configure Kafka connect plugins");
final var pluginDir = setupPluginDir();
setupKafka();
setupKafkaConnect(pluginDir);
createProducer();
}

protected AdminClient adminClient() {
final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
return AdminClient.create(adminClientConfig);
}

private static Path setupPluginDir() throws Exception {
Copy link
Contributor

@ivanyu ivanyu Feb 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically could be run in some @BeforeAll instead of @BeforeEach

final var testDir = Files.createTempDirectory("aiven-kafka-connect-jdbc-test-");
final var distFile = Paths.get(System.getProperty("integration-test.distribution.file.path"));
assert Files.exists(distFile);

final var pluginDir = Paths.get(testDir.toString(), "plugins/aiven-kafka-connect-jdbc/");
Files.createDirectories(pluginDir);

final String cmd = String.format("tar -xf %s --strip-components=1 -C %s",
distFile.toString(), pluginDir.toString());
final Process p = Runtime.getRuntime().exec(cmd);
assert p.waitFor() == 0;
return pluginDir;
}

private void setupKafka() throws Exception {
LOGGER.info("Setup Kafka");
try (final var adminClient = adminClient()) {
LOGGER.info("Create topic {}", TEST_TOPIC_NAME);
final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1);
adminClient.createTopics(List.of(newTopic)).all().get();
}
}

private void setupKafkaConnect(final Path pluginDir) throws Exception {
LOGGER.info("Start Kafka Connect");
jdbcConnectService =
new JdbcConnectService(kafkaContainer.getBootstrapServers(), pluginDir);
jdbcConnectService.start();
}

private void createProducer() {
LOGGER.info("Create kafka producer");
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
producer = new KafkaProducer<>(producerProps);
}

@AfterEach
void stopKafka() throws Exception {
jdbcConnectService.stop();
}

protected Future<RecordMetadata> sendMessageAsync(final int partition,
final String key,
final GenericRecord value) {
final var msg = new ProducerRecord<>(
TEST_TOPIC_NAME, partition,
key == null ? null : key,
value == null ? null : value);
return producer.send(msg);
}

}
Loading