Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Parameterising DDL creation timeout in Avro to Spanner Import Dataflo…
Browse files Browse the repository at this point in the history
…w Template.

PiperOrigin-RevId: 416244500
  • Loading branch information
cloud-teleport committed Dec 14, 2021
1 parent 021c0b2 commit c43aab8
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public interface Options extends PipelineOptions {
void setSpannerProjectId(ValueProvider<String> value);

void setWaitUntilFinish(boolean value);

@Description("DDL creation timeout.")
@Default.Integer(30)
ValueProvider<Integer> getDDLCreationTimeoutInMinutes();

void setDDLCreationTimeoutInMinutes(ValueProvider<Integer> value);
}

public static void main(String[] args) {
Expand All @@ -112,7 +118,8 @@ public static void main(String[] args) {
options.getInputDir(),
options.getWaitForIndexes(),
options.getWaitForForeignKeys(),
options.getEarlyIndexCreateFlag()));
options.getEarlyIndexCreateFlag(),
options.getDDLCreationTimeoutInMinutes()));

PipelineResult result = p.run();
if (options.getWaitUntilFinish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,21 @@ public class ImportTransform extends PTransform<PBegin, PDone> {
private final ValueProvider<Boolean> waitForIndexes;
private final ValueProvider<Boolean> waitForForeignKeys;
private final ValueProvider<Boolean> earlyIndexCreateFlag;
private final ValueProvider<Integer> ddlCreationTimeoutInMinutes;

public ImportTransform(
SpannerConfig spannerConfig,
ValueProvider<String> importDirectory,
ValueProvider<Boolean> waitForIndexes,
ValueProvider<Boolean> waitForForeignKeys,
ValueProvider<Boolean> earlyIndexCreateFlag) {
ValueProvider<Boolean> earlyIndexCreateFlag,
ValueProvider<Integer> ddlCreationTimeoutInMinutes) {
this.spannerConfig = spannerConfig;
this.importDirectory = importDirectory;
this.waitForIndexes = waitForIndexes;
this.waitForForeignKeys = waitForForeignKeys;
this.earlyIndexCreateFlag = earlyIndexCreateFlag;
this.ddlCreationTimeoutInMinutes = ddlCreationTimeoutInMinutes;
}

@Override
Expand Down Expand Up @@ -164,7 +167,8 @@ public void processElement(ProcessContext c) {
avroDdlView,
informationSchemaView,
manifestView,
earlyIndexCreateFlag));
earlyIndexCreateFlag,
ddlCreationTimeoutInMinutes));

final PCollection<Ddl> ddl = createTableOutput.get(CreateTables.getDdlObjectTag());
final PCollectionView<List<String>> pendingIndexes =
Expand Down Expand Up @@ -342,6 +346,7 @@ private static class CreateTables extends PTransform<PBegin, PCollectionTuple> {
private final PCollectionView<Ddl> informationSchemaView;
private final PCollectionView<Export> manifestView;
private final ValueProvider<Boolean> earlyIndexCreateFlag;
private final ValueProvider<Integer> ddlCreationTimeoutInMinutes;

private transient ExposedSpannerAccessor spannerAccessor;

Expand Down Expand Up @@ -372,12 +377,14 @@ public CreateTables(
PCollectionView<List<KV<String, String>>> avroSchemasView,
PCollectionView<Ddl> informationSchemaView,
PCollectionView<Export> manifestView,
ValueProvider<Boolean> earlyIndexCreateFlag) {
ValueProvider<Boolean> earlyIndexCreateFlag,
ValueProvider<Integer> ddlCreationTimeoutInMinutes) {
this.spannerConfig = spannerConfig;
this.avroSchemasView = avroSchemasView;
this.informationSchemaView = informationSchemaView;
this.manifestView = manifestView;
this.earlyIndexCreateFlag = earlyIndexCreateFlag;
this.ddlCreationTimeoutInMinutes = ddlCreationTimeoutInMinutes;
}

@Override
Expand Down Expand Up @@ -485,8 +492,7 @@ public void processElement(ProcessContext c) {
ddlStatements,
null);
try {
// TODO: Wait till operation is complete.
op.get(30, TimeUnit.MINUTES);
op.get(ddlCreationTimeoutInMinutes.get(), TimeUnit.MINUTES);
} catch (InterruptedException
| ExecutionException
| TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ private void runTest() {
source,
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true)));
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(30)));
PipelineResult importResult = importPipeline.run();
importResult.waitUntilFinish();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,8 @@ private void exportAndImportDb(
source,
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true)));
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(30)));
PipelineResult importResult = importPipeline.run();
importResult.waitUntilFinish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ private void exportAndImportDbAtTime(
source,
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true)));
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(30)));
PipelineResult importResult = importPipeline.run();
importResult.waitUntilFinish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ private void runTest(Schema avroSchema, String spannerSchema, Iterable<GenericRe
ValueProvider.StaticValueProvider.of(manifestFileLocation),
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(true)));
ValueProvider.StaticValueProvider.of(true),
ValueProvider.StaticValueProvider.of(30)));
PipelineResult importResult = importPipeline.run();
importResult.waitUntilFinish();

Expand Down

0 comments on commit c43aab8

Please sign in to comment.