Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reverse replication Datatype IT #1603

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Using Base class in other ITs
darshan-sj committed Jun 25, 2024
commit f2b488facd07d0f168060c67506296a84015457d
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import com.google.cloud.spanner.Value;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.io.Resources;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
@@ -36,6 +37,9 @@
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.it.jdbc.CustomMySQLResourceManager;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.apache.beam.it.jdbc.conditions.JDBCRowsCheck;
@@ -63,6 +67,11 @@ public class GCSToSourceDbDatatypeIT extends GCSToSourceDbITBase {
private static HashSet<GCSToSourceDbDatatypeIT> testInstances = new HashSet<>();
private static PipelineLauncher.LaunchInfo writerJobInfo;
private static PipelineLauncher.LaunchInfo readerJobInfo;
public static SpannerResourceManager spannerResourceManager;
public static SpannerResourceManager spannerMetadataResourceManager;
public static List<CustomMySQLResourceManager> jdbcResourceManagers;
public static GcsResourceManager gcsResourceManager;
public static FlexTemplateDataflowJobResourceManager flexTemplateDataflowJobResourceManager;

/**
* Setup resource managers and Launch dataflow job once during the execution of this test class.
@@ -86,6 +95,22 @@ public void setUp() throws IOException {
}
}

public void setupResourceManagers(
String spannerDdlResource, String sessionFileResource, int numShards) throws IOException {
spannerResourceManager = createSpannerDatabase(spannerDdlResource);
spannerMetadataResourceManager = createSpannerMetadataDatabase();

jdbcResourceManagers = new ArrayList<>();
for (int i = 0; i < numShards; ++i) {
jdbcResourceManagers.add(CustomMySQLResourceManager.builder(testName).build());
}

gcsResourceManager = createGcsResourceManager();
createAndUploadShardConfigToGcs(gcsResourceManager, jdbcResourceManagers);
gcsResourceManager.uploadArtifact(
"input/session.json", Resources.getResource(sessionFileResource).getPath());
}

/**
* Cleanup dataflow job and all the resources and resource managers.
*
@@ -96,7 +121,12 @@ public static void cleanUp() throws IOException {
for (GCSToSourceDbDatatypeIT instance : testInstances) {
instance.tearDownBase();
}
cleanupResourceManagers();
cleanupResourceManagers(
spannerResourceManager,
spannerMetadataResourceManager,
gcsResourceManager,
flexTemplateDataflowJobResourceManager,
jdbcResourceManagers);
}

@Test
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@
import com.google.gson.JsonObject;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -36,43 +35,25 @@
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.it.jdbc.CustomMySQLResourceManager;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GCSToSourceDbITBase extends TemplateTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also move the other IT tests to be derived from this base class?

private static final Logger LOG = LoggerFactory.getLogger(GCSToSourceDbITBase.class);

public static SpannerResourceManager spannerResourceManager;
public static SpannerResourceManager spannerMetadataResourceManager;
public static List<CustomMySQLResourceManager> jdbcResourceManagers;
public static GcsResourceManager gcsResourceManager;
public static FlexTemplateDataflowJobResourceManager flexTemplateDataflowJobResourceManager;

public void setupResourceManagers(
String spannerDdlResource, String sessionFileResource, int numShards) throws IOException {
spannerResourceManager = createSpannerDatabase(spannerDdlResource);
spannerMetadataResourceManager = createSpannerMetadataDatabase();

jdbcResourceManagers = new ArrayList<>();
for (int i = 0; i < numShards; ++i) {
jdbcResourceManagers.add(CustomMySQLResourceManager.builder(testName).build());
}

gcsResourceManager =
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
.build();
createAndUploadShardConfigToGcs(gcsResourceManager, jdbcResourceManagers);
gcsResourceManager.uploadArtifact(
"input/session.json", Resources.getResource(sessionFileResource).getPath());
}

public static void cleanupResourceManagers() {
public static void cleanupResourceManagers(
SpannerResourceManager spannerResourceManager,
SpannerResourceManager spannerMetadataResourceManager,
GcsResourceManager gcsResourceManager,
FlexTemplateDataflowJobResourceManager flexTemplateDataflowJobResourceManager,
List<? extends JDBCResourceManager> jdbcResourceManagers) {
ResourceManagerUtils.cleanResources(
spannerResourceManager,
spannerMetadataResourceManager,
gcsResourceManager,
flexTemplateDataflowJobResourceManager);
for (CustomMySQLResourceManager jdbcResourceManager : jdbcResourceManagers) {
for (JDBCResourceManager jdbcResourceManager : jdbcResourceManagers) {
ResourceManagerUtils.cleanResources(jdbcResourceManager);
}
}
@@ -120,11 +101,6 @@ public void createAndUploadShardConfigToGcs(
shard.setPassword(jdbcResourceManagers.get(i).getPassword());
shard.setPort(String.valueOf(jdbcResourceManagers.get(i).getPort()));
shard.setDbName(jdbcResourceManagers.get(i).getDatabaseName());
// shard.setUser("root");
// shard.setHost("34.133.70.107");
// shard.setPassword("root");
// shard.setPort("3306");
// shard.setDbName("Shard1");
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
ja.add(jsObj);
Original file line number Diff line number Diff line change
@@ -21,23 +21,15 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.utils.PipelineUtils;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
@@ -56,7 +48,7 @@
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(GCSToSourceDb.class)
@RunWith(JUnit4.class)
public class GCSToSourceDbWithReaderIT extends TemplateTestBase {
public class GCSToSourceDbWithReaderIT extends GCSToSourceDbITBase {

private static final Logger LOG = LoggerFactory.getLogger(GCSToSourceDbWithReaderIT.class);

@@ -90,15 +82,15 @@ public void setUp() throws IOException {
jdbcResourceManager = CustomMySQLResourceManager.builder(testName).build();
createMySQLSchema(jdbcResourceManager);

gcsResourceManager =
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
.build();
createAndUploadShardConfigToGcs(gcsResourceManager, jdbcResourceManager);
gcsResourceManager = createGcsResourceManager();
createAndUploadShardConfigToGcs(gcsResourceManager, Arrays.asList(jdbcResourceManager));
gcsResourceManager.uploadArtifact(
"input/session.json", Resources.getResource(SESSION_FILE_RESOURSE).getPath());

launchReaderDataflowJob();
launchWriterDataflowJob();
readerJobInfo =
launchReaderDataflowJob(
gcsResourceManager, spannerResourceManager, spannerMetadataResourceManager);
writerJobInfo = launchWriterDataflowJob(gcsResourceManager, spannerMetadataResourceManager);
}
}
}
@@ -153,37 +145,6 @@ private void assertRowInMySQL() throws InterruptedException {
assertThat(rows.get(0).get("name")).isEqualTo("FF");
}

private SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFile)
throws IOException {
SpannerResourceManager spannerResourceManager =
SpannerResourceManager.builder("rr-main-" + testName, PROJECT, REGION)
.maybeUseStaticInstance()
.build();
String ddl =
String.join(
" ",
Resources.readLines(
Resources.getResource(spannerDdlResourceFile), StandardCharsets.UTF_8));
ddl = ddl.trim();
String[] ddls = ddl.split(";");
for (String d : ddls) {
if (!d.isBlank()) {
spannerResourceManager.executeDdlStatement(d);
}
}
return spannerResourceManager;
}

private SpannerResourceManager createSpannerMetadataDatabase() throws IOException {
SpannerResourceManager spannerMetadataResourceManager =
SpannerResourceManager.builder("rr-meta-" + testName, PROJECT, REGION)
.maybeUseStaticInstance()
.build();
String dummy = "create table t1(id INT64 ) primary key(id)";
spannerMetadataResourceManager.executeDdlStatement(dummy);
return spannerMetadataResourceManager;
}

private void createMySQLSchema(CustomMySQLResourceManager jdbcResourceManager) {
HashMap<String, String> columns = new HashMap<>();
columns.put("id", "INT NOT NULL");
@@ -192,67 +153,4 @@ private void createMySQLSchema(CustomMySQLResourceManager jdbcResourceManager) {

jdbcResourceManager.createTable(TABLE, schema);
}

private void launchWriterDataflowJob() throws IOException {
Map<String, String> params =
new HashMap<>() {
{
put("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager));
put("spannerProjectId", PROJECT);
put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId());
put("metadataInstance", spannerMetadataResourceManager.getInstanceId());
put("sourceShardsFilePath", getGcsPath("input/shard.json", gcsResourceManager));
put("runIdentifier", "run1");
put("GCSInputDirectoryPath", getGcsPath("output", gcsResourceManager));
}
};
String jobName = PipelineUtils.createJobName(testName);
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
options.setParameters(params);
// Run
writerJobInfo = launchTemplate(options, false);
}

private void launchReaderDataflowJob() throws IOException {
// default parameters
flexTemplateDataflowJobResourceManager =
FlexTemplateDataflowJobResourceManager.builder(getClass().getSimpleName())
.withTemplateName("Spanner_Change_Streams_to_Sharded_File_Sink")
.withTemplateModulePath("v2/spanner-change-streams-to-sharded-file-sink")
.addParameter("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager))
.addParameter("instanceId", spannerResourceManager.getInstanceId())
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
.addParameter("spannerProjectId", PROJECT)
.addParameter("metadataDatabase", spannerMetadataResourceManager.getDatabaseId())
.addParameter("metadataInstance", spannerMetadataResourceManager.getInstanceId())
.addParameter(
"sourceShardsFilePath", getGcsPath("input/shard.json", gcsResourceManager))
.addParameter("changeStreamName", "allstream")
.addParameter("runIdentifier", "run1")
.addParameter("gcsOutputDirectory", getGcsPath("output", gcsResourceManager))
.addEnvironmentVariable(
"additionalExperiments", Collections.singletonList("use_runner_v2"))
.build();
// Run
readerJobInfo = flexTemplateDataflowJobResourceManager.launchJob();
}

private void createAndUploadShardConfigToGcs(
GcsResourceManager gcsResourceManager, CustomMySQLResourceManager jdbcResourceManager)
throws IOException {
Shard shard = new Shard();
shard.setLogicalShardId("Shard1");
shard.setUser(jdbcResourceManager.getUsername());
shard.setHost(jdbcResourceManager.getHost());
shard.setPassword(jdbcResourceManager.getPassword());
shard.setPort(String.valueOf(jdbcResourceManager.getPort()));
shard.setDbName(jdbcResourceManager.getDatabaseName());
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
JsonArray ja = new JsonArray();
ja.add(jsObj);
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
}
}
Original file line number Diff line number Diff line change
@@ -20,25 +20,22 @@

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.utils.PipelineUtils;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.it.jdbc.CustomMySQLResourceManager;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.apache.beam.it.jdbc.conditions.JDBCRowsCheck;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -52,7 +49,7 @@
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(GCSToSourceDb.class)
@RunWith(JUnit4.class)
public class GCSToSourceDbWithoutReaderIT extends TemplateTestBase {
public class GCSToSourceDbWithoutReaderIT extends GCSToSourceDbITBase {

private static final Logger LOG = LoggerFactory.getLogger(GCSToSourceDbWithoutReaderIT.class);

@@ -81,14 +78,12 @@ public void setUp() throws IOException {
jdbcResourceManager = CustomMySQLResourceManager.builder(testName).build();
createMySQLSchema(jdbcResourceManager);

gcsResourceManager =
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
.build();
createAndUploadShardConfigToGcs(gcsResourceManager, jdbcResourceManager);
gcsResourceManager = createGcsResourceManager();
createAndUploadShardConfigToGcs(gcsResourceManager, Arrays.asList(jdbcResourceManager));
gcsResourceManager.uploadArtifact(
"input/session.json", Resources.getResource(SESSION_FILE_RESOURSE).getPath());

launchWriterDataflowJob();
jobInfo = launchWriterDataflowJob(gcsResourceManager, spannerMetadataResourceManager);
}
}
}
@@ -121,28 +116,17 @@ public void testGCSToSource() throws IOException, InterruptedException {

private void assertRowInMySQL() throws InterruptedException {
long rowCount = 0;
for (int i = 0; rowCount != 1 && i < 60; ++i) {
rowCount = jdbcResourceManager.getRowCount(TABLE);
LOG.info("Row count = {}, Waiting for 30s if row count not = 1", rowCount);
Thread.sleep(10000);
}
assertThat(rowCount).isEqualTo(1);
JDBCRowsCheck rowsCheck =
JDBCRowsCheck.builder(jdbcResourceManager, TABLE).setMinRows(1).setMaxRows(1).build();
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(5)), rowsCheck);
List<Map<String, Object>> rows = jdbcResourceManager.readTable(TABLE);
assertThat(rows).hasSize(1);
assertThat(rows.get(0).get("id")).isEqualTo(1);
assertThat(rows.get(0).get("name")).isEqualTo("FF");
}

private SpannerResourceManager createSpannerMetadataDatabase() throws IOException {
SpannerResourceManager spannerMetadataResourceManager =
SpannerResourceManager.builder("rr-meta-" + testName, PROJECT, REGION)
.maybeUseStaticInstance()
.build(); // DB name is appended with prefix to avoid clashes
String dummy = "create table t1(id INT64 ) primary key(id)";
spannerMetadataResourceManager.executeDdlStatement(dummy);
return spannerMetadataResourceManager;
}

private void createMySQLSchema(CustomMySQLResourceManager jdbcResourceManager) {
HashMap<String, String> columns = new HashMap<>();
columns.put("id", "INT NOT NULL");
@@ -151,45 +135,4 @@ private void createMySQLSchema(CustomMySQLResourceManager jdbcResourceManager) {

jdbcResourceManager.createTable(TABLE, schema);
}

private void launchWriterDataflowJob() throws IOException {
Map<String, String> params =
new HashMap<>() {
{
put("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager));
put("spannerProjectId", PROJECT);
put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId());
put("metadataInstance", spannerMetadataResourceManager.getInstanceId());
put("sourceShardsFilePath", getGcsPath("input/shard.json", gcsResourceManager));
put("runIdentifier", "run1");
put("GCSInputDirectoryPath", getGcsPath("output", gcsResourceManager));
put("startTimestamp", "2024-05-13T08:43:10.000Z");
put("windowDuration", "10s");
}
};
String jobName = PipelineUtils.createJobName(testName);
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
options.setParameters(params);
// Run
jobInfo = launchTemplate(options, false);
}

private void createAndUploadShardConfigToGcs(
GcsResourceManager gcsResourceManager, CustomMySQLResourceManager jdbcResourceManager)
throws IOException {
Shard shard = new Shard();
shard.setLogicalShardId("Shard1");
shard.setUser(jdbcResourceManager.getUsername());
shard.setHost(jdbcResourceManager.getHost());
shard.setPassword(jdbcResourceManager.getPassword());
shard.setPort(String.valueOf(jdbcResourceManager.getPort()));
shard.setDbName(jdbcResourceManager.getDatabaseName());
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
JsonArray ja = new JsonArray();
ja.add(jsObj);
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
}
}
Original file line number Diff line number Diff line change
@@ -23,30 +23,23 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.PipelineUtils;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.it.jdbc.CustomMySQLResourceManager;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.apache.beam.it.jdbc.conditions.JDBCRowsCheck;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -60,7 +53,7 @@
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(GCSToSourceDb.class)
@RunWith(JUnit4.class)
public class TimezoneIT extends TemplateTestBase {
public class TimezoneIT extends GCSToSourceDbITBase {

private static final Logger LOG = LoggerFactory.getLogger(TimezoneIT.class);

@@ -94,15 +87,15 @@ public void setUp() throws IOException {
jdbcResourceManager = CustomMySQLResourceManager.builder(testName).build();
createMySQLSchema(jdbcResourceManager);

gcsResourceManager =
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
.build();
createAndUploadShardConfigToGcs(gcsResourceManager, jdbcResourceManager);
gcsResourceManager = createGcsResourceManager();
createAndUploadShardConfigToGcs(gcsResourceManager, Arrays.asList(jdbcResourceManager));
gcsResourceManager.uploadArtifact(
"input/session.json", Resources.getResource(SESSION_FILE_RESOURSE).getPath());

launchReaderDataflowJob();
launchWriterDataflowJob();
readerJobInfo =
launchReaderDataflowJob(
gcsResourceManager, spannerResourceManager, spannerMetadataResourceManager);
writerJobInfo = launchWriterDataflowJob(gcsResourceManager, spannerMetadataResourceManager);
}
}
}
@@ -164,11 +157,11 @@ private void writeRowInSpanner() {
}

private void assertRowInMySQL() throws InterruptedException {
JDBCRowsCheck rowsCheck =
JDBCRowsCheck.builder(jdbcResourceManager, TABLE).setMinRows(3).setMaxRows(3).build();
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(
createConfig(writerJobInfo, Duration.ofMinutes(10)),
() -> jdbcResourceManager.getRowCount(TABLE) == 3);
.waitForCondition(createConfig(writerJobInfo, Duration.ofMinutes(10)), rowsCheck);
assertThatResult(result).meetsConditions();
List<Map<String, Object>> rows =
jdbcResourceManager.runSQLQuery("SELECT id,time_colm FROM Users ORDER BY id");
@@ -184,37 +177,6 @@ private void assertRowInMySQL() throws InterruptedException {
.isEqualTo(java.sql.Timestamp.valueOf("2024-02-03 06:00:00.0"));
}

private SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFile)
throws IOException {
SpannerResourceManager spannerResourceManager =
SpannerResourceManager.builder("rr-main-" + testName, PROJECT, REGION)
.maybeUseStaticInstance()
.build();
String ddl =
String.join(
" ",
Resources.readLines(
Resources.getResource(spannerDdlResourceFile), StandardCharsets.UTF_8));
ddl = ddl.trim();
String[] ddls = ddl.split(";");
for (String d : ddls) {
if (!d.isBlank()) {
spannerResourceManager.executeDdlStatement(d);
}
}
return spannerResourceManager;
}

private SpannerResourceManager createSpannerMetadataDatabase() throws IOException {
SpannerResourceManager spannerMetadataResourceManager =
SpannerResourceManager.builder("rr-meta-" + testName, PROJECT, REGION)
.maybeUseStaticInstance()
.build();
String dummy = "create table t1(id INT64 ) primary key(id)";
spannerMetadataResourceManager.executeDdlStatement(dummy);
return spannerMetadataResourceManager;
}

private void createMySQLSchema(CustomMySQLResourceManager jdbcResourceManager) {
HashMap<String, String> columns = new HashMap<>();
columns.put("id", "INT NOT NULL");
@@ -223,68 +185,4 @@ private void createMySQLSchema(CustomMySQLResourceManager jdbcResourceManager) {

jdbcResourceManager.createTable(TABLE, schema);
}

private void launchWriterDataflowJob() throws IOException {
Map<String, String> params =
new HashMap<>() {
{
put("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager));
put("spannerProjectId", PROJECT);
put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId());
put("metadataInstance", spannerMetadataResourceManager.getInstanceId());
put("sourceShardsFilePath", getGcsPath("input/shard.json", gcsResourceManager));
put("runIdentifier", "run1");
put("sourceDbTimezoneOffset", "+10:00");
put("GCSInputDirectoryPath", getGcsPath("output", gcsResourceManager));
}
};
String jobName = PipelineUtils.createJobName(testName);
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
options.setParameters(params);
// Run
writerJobInfo = launchTemplate(options, false);
}

private void launchReaderDataflowJob() throws IOException {
// default parameters
flexTemplateDataflowJobResourceManager =
FlexTemplateDataflowJobResourceManager.builder(getClass().getSimpleName())
.withTemplateName("Spanner_Change_Streams_to_Sharded_File_Sink")
.withTemplateModulePath("v2/spanner-change-streams-to-sharded-file-sink")
.addParameter("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager))
.addParameter("instanceId", spannerResourceManager.getInstanceId())
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
.addParameter("spannerProjectId", PROJECT)
.addParameter("metadataDatabase", spannerMetadataResourceManager.getDatabaseId())
.addParameter("metadataInstance", spannerMetadataResourceManager.getInstanceId())
.addParameter(
"sourceShardsFilePath", getGcsPath("input/shard.json", gcsResourceManager))
.addParameter("changeStreamName", "allstream")
.addParameter("runIdentifier", "run1")
.addParameter("gcsOutputDirectory", getGcsPath("output", gcsResourceManager))
.addEnvironmentVariable(
"additionalExperiments", Collections.singletonList("use_runner_v2"))
.build();
// Run
readerJobInfo = flexTemplateDataflowJobResourceManager.launchJob();
}

private void createAndUploadShardConfigToGcs(
GcsResourceManager gcsResourceManager, CustomMySQLResourceManager jdbcResourceManager)
throws IOException {
Shard shard = new Shard();
shard.setLogicalShardId("Shard1");
shard.setUser(jdbcResourceManager.getUsername());
shard.setHost(jdbcResourceManager.getHost());
shard.setPassword(jdbcResourceManager.getPassword());
shard.setPort(String.valueOf(jdbcResourceManager.getPort()));
shard.setDbName(jdbcResourceManager.getDatabaseName());
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
JsonArray ja = new JsonArray();
ja.add(jsObj);
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
}
}