Skip to content

Commit

Permalink
Integation tests for PgSQL connector
Browse files Browse the repository at this point in the history
  • Loading branch information
willyborankin committed Feb 8, 2021
1 parent c682109 commit 5ea08e4
Show file tree
Hide file tree
Showing 8 changed files with 814 additions and 2 deletions.
82 changes: 81 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Aiven Oy
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,10 +29,17 @@ plugins {

// https://docs.gradle.org/current/userguide/publishing_maven.html
id "maven-publish"

// https://docs.gradle.org/current/userguide/idea_plugin.html
id "idea"
}

repositories {
jcenter()
// For kafka-avro-serializer and kafka-connect-avro-converter
maven {
url "https://packages.confluent.io/maven"
}
}

java {
Expand Down Expand Up @@ -103,6 +110,37 @@ publishing {
ext {
kafkaVersion = "2.2.0"
slf4jVersion = "1.7.25"
confluentPlatformVersion = "4.1.4"
}

sourceSets {
integrationTest {
java {
srcDirs = ['src/integration-test/java']
}
resources {
srcDirs = ['src/integration-test/resources']
}

compileClasspath += sourceSets.main.output + configurations.testRuntimeClasspath
runtimeClasspath += output + compileClasspath
}
}

idea {
module {
testSourceDirs += project.sourceSets.integrationTest.java.srcDirs
testSourceDirs += project.sourceSets.integrationTest.resources.srcDirs
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntimeClasspath
}

test {
useJUnit()
}

dependencies {
Expand All @@ -128,8 +166,50 @@ dependencies {
testImplementation "org.apache.derby:derby:10.11.1.1"
testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"

integrationTestImplementation "org.apache.kafka:connect-api:$kafkaVersion"
integrationTestImplementation "org.apache.kafka:connect-runtime:$kafkaVersion"
integrationTestImplementation "org.apache.kafka:connect-json:$kafkaVersion"
integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion"
integrationTestImplementation "io.confluent:kafka-avro-serializer:$confluentPlatformVersion"
integrationTestImplementation "io.confluent:kafka-connect-avro-converter:$confluentPlatformVersion"
integrationTestImplementation "org.apache.avro:avro:1.8.1"

integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.7.1'
integrationTestRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.1'

integrationTestImplementation "org.testcontainers:junit-jupiter:1.14.3"
integrationTestImplementation "org.testcontainers:testcontainers:1.14.3"
integrationTestImplementation "org.testcontainers:kafka:1.14.3"
integrationTestImplementation "org.testcontainers:postgresql:1.14.3"

integrationTestImplementation "cloud.localstack:localstack-utils:0.2.5"

integrationTestImplementation sourceSets.test.output
}

task integrationTest(type: Test) {
description = 'Runs the integration tests.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath

dependsOn test, distTar

useJUnitPlatform()

// Run always.
outputs.upToDateWhen { false }

if (project.hasProperty("tablePollIntervalMs")) {
//pass table.poll.interval.ms
systemProperty("integration-test.table.poll.interval.ms", project.hasProperty("tablePollIntervalMs"))
}

// Pass the distribution file path to the tests.
systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path)
}


processResources {
filesMatching('aiven-kafka-connect-jdbc-version.properties') {
expand(version: version)
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect).java"/>
files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect|JdbcConnectService).java"/>

<suppress checks="NPathComplexity"
files="(DataConverter|FieldsMetadata|JdbcSourceTask|GenericDatabaseDialect).java"/>
Expand Down
144 changes: 144 additions & 0 deletions src/integration-test/java/io/aiven/connect/jdbc/AbstractIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

import cloud.localstack.docker.LocalstackDockerExtension;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@ExtendWith(LocalstackDockerExtension.class)
@LocalstackDockerProperties(services = {"jdbc"})
@Testcontainers
public abstract class AbstractIT {

static final Logger LOGGER = LoggerFactory.getLogger(AbstractIT.class);

protected static final String TEST_TOPIC_NAME = "test_pg_topic";

protected static KafkaProducer<String, GenericRecord> producer;

@Container
protected KafkaContainer kafkaContainer =
new KafkaContainer()
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");

@Container
protected SchemaRegistryContainer schemaRegistryContainer =
new SchemaRegistryContainer(kafkaContainer);

protected JdbcConnectService jdbcConnectService;

@BeforeEach
void startKafka() throws Exception {
LOGGER.info("Configure Kafka connect plugins");
final var pluginDir = setupPluginDir();
setupKafka();
setupKafkaConnect(pluginDir);
createProducer();
}

protected AdminClient adminClient() {
final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
return AdminClient.create(adminClientConfig);
}

private static Path setupPluginDir() throws Exception {
final var testDir = Files.createTempDirectory("aiven-kafka-connect-jdbc-test-");
final var distFile = Paths.get(System.getProperty("integration-test.distribution.file.path"));
assert Files.exists(distFile);

final var pluginDir = Paths.get(testDir.toString(), "plugins/aiven-kafka-connect-jdbc/");
Files.createDirectories(pluginDir);

final String cmd = String.format("tar -xf %s --strip-components=1 -C %s",
distFile.toString(), pluginDir.toString());
final Process p = Runtime.getRuntime().exec(cmd);
assert p.waitFor() == 0;
return pluginDir;
}

private void setupKafka() throws Exception {
LOGGER.info("Setup Kafka");
try (final var adminClient = adminClient()) {
LOGGER.info("Create topic {}", TEST_TOPIC_NAME);
final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1);
adminClient.createTopics(List.of(newTopic)).all().get();
}
}

private void setupKafkaConnect(final Path pluginDir) throws Exception {
LOGGER.info("Start Kafka Connect");
jdbcConnectService =
new JdbcConnectService(kafkaContainer.getBootstrapServers(), pluginDir);
jdbcConnectService.start();
}

private void createProducer() {
LOGGER.info("Create kafka producer");
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
producer = new KafkaProducer<>(producerProps);
}

@AfterEach
void stopKafka() throws Exception {
jdbcConnectService.stop();
}

protected Future<RecordMetadata> sendMessageAsync(final int partition,
final String key,
final GenericRecord value) {
final var msg = new ProducerRecord<>(
TEST_TOPIC_NAME, partition,
key == null ? null : key,
value == null ? null : value);
return producer.send(msg);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc;

import java.nio.file.Path;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.FutureCallback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcConnectService {

static final Logger LOGGER = LoggerFactory.getLogger(JdbcConnectService.class);

private final String bootstrapServers;

private final int offsetFlushInterval = 5000;

private final Path pluginDir;

private Herder herder;

private Connect connect;

public JdbcConnectService(final String bootstrapServers, final Path pluginDir) {
this.bootstrapServers = bootstrapServers;
this.pluginDir = pluginDir;
}

void start() {
final Map<String, String> workerProps = Map.of(
"bootstrap.servers", bootstrapServers,
"offset.flush.interval.ms", Integer.toString(offsetFlushInterval),
// These don't matter much (each connector sets its own converters),
// but need to be filled with valid classes.
"key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter",
"internal.key.converter", "org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable", "false",
"internal.value.converter", "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter.schemas.enable", "false",
// Don't need it since we'll memory MemoryOffsetBackingStore.
"offset.storage.file.filename", "",
"plugin.path", pluginDir.toString());

final Time time = Time.SYSTEM;
final String workerId = "test-worker";
final String kafkaClusterId = "test-cluster";

final var plugins = new Plugins(workerProps);
final var config = new StandaloneConfig(workerProps);

final var worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore());
herder = new StandaloneHerder(worker, kafkaClusterId);
connect = new Connect(herder, new RestServer(config));

connect.start();
}

public void createConnector(final Map<String, String> config) throws ExecutionException, InterruptedException {
assert herder != null;

final FutureCallback<Herder.Created<ConnectorInfo>> cb =
new FutureCallback<>((error, info) -> {
if (error != null) {
LOGGER.error("Failed to create job");
} else {
LOGGER.info("Created connector {}", info.result().name());
}
});
herder.putConnectorConfig(
config.get(ConnectorConfig.NAME_CONFIG),
config, false, cb
);

final Herder.Created<ConnectorInfo> connectorInfoCreated = cb.get();
assert connectorInfoCreated.created();
}

void stop() {
herder.stop();
connect.stop();
}

}
Loading

0 comments on commit 5ea08e4

Please sign in to comment.