Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify dependency manager: remove unused code #2011

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@
String.format("Metadata for source %s", source.getName()), provider.queryMetadata());
sourceRows.add(sourceMetadata);
Schema sourceBeamSchema = sourceMetadata.getSchema();
processingQueue.addToQueue(
ArtifactType.source, false, source.getName(), defaultActionContext, sourceMetadata);
processingQueue.addToQueue(ArtifactType.source, source.getName(), defaultActionContext);

Check warning on line 295 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L295

Added line #L295 was not covered by tests
PCollection<Row> nullableSourceBeamRows = null;

////////////////////////////
Expand Down Expand Up @@ -338,7 +337,7 @@
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_NODES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(target.getDependencies(), nodeStepDescription));
processingQueue.resolveOutputs(target.getDependencies(), nodeStepDescription));

Check warning on line 340 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L340

Added line #L340 was not covered by tests

PCollection<Row> blockingReturn =
preInsertBeamRows
Expand All @@ -364,8 +363,7 @@
.computeIfAbsent(TargetType.NODE, (type) -> new ArrayList<>(nodeTargets.size()))
.add(blockingReturn);

processingQueue.addToQueue(
ArtifactType.node, false, target.getName(), blockingReturn, preInsertBeamRows);
processingQueue.addToQueue(ArtifactType.node, target.getName(), blockingReturn);

Check warning on line 366 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L366

Added line #L366 was not covered by tests
}

////////////////////////////
Expand Down Expand Up @@ -406,7 +404,7 @@
dependencyNames.add(target.getStartNodeReference());
dependencyNames.add(target.getEndNodeReference());
dependencies.add(
processingQueue.waitOnCollections(dependencyNames, relationshipStepDescription));
processingQueue.resolveOutputs(dependencyNames, relationshipStepDescription));

Check warning on line 407 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L407

Added line #L407 was not covered by tests

PCollection<Row> blockingReturn =
preInsertBeamRows
Expand All @@ -433,8 +431,7 @@
TargetType.RELATIONSHIP, (type) -> new ArrayList<>(relationshipTargets.size()))
.add(blockingReturn);
// serialize relationships
processingQueue.addToQueue(
ArtifactType.edge, false, target.getName(), blockingReturn, preInsertBeamRows);
processingQueue.addToQueue(ArtifactType.edge, target.getName(), blockingReturn);

Check warning on line 434 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L434

Added line #L434 was not covered by tests
}
////////////////////////////
// Custom query targets
Expand All @@ -452,8 +449,7 @@
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_QUERIES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(
target.getDependencies(), customQueryStepDescription));
processingQueue.resolveOutputs(target.getDependencies(), customQueryStepDescription));

Check warning on line 452 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L452

Added line #L452 was not covered by tests

PCollection<Row> blockingReturn =
nullableSourceBeamRows
Expand All @@ -478,12 +474,7 @@
targetRows
.computeIfAbsent(TargetType.QUERY, (type) -> new ArrayList<>(customQueryTargets.size()))
.add(blockingReturn);
processingQueue.addToQueue(
ArtifactType.custom_query,
false,
target.getName(),
blockingReturn,
nullableSourceBeamRows);
processingQueue.addToQueue(ArtifactType.custom_query, target.getName(), blockingReturn);

Check warning on line 477 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L477

Added line #L477 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,59 +32,20 @@
public class BeamBlock {

private static final Logger LOG = LoggerFactory.getLogger(BeamBlock.class);
private final List<PCollection<Row>> sourceQueue = new ArrayList<>();
private final List<PCollection<Row>> preloadActionQueue = new ArrayList<>();
private final List<PCollection<Row>> processActionQueue = new ArrayList<>();
private final List<PCollection<Row>> nodeQueue = new ArrayList<>();
private final List<PCollection<Row>> edgeQueue = new ArrayList<>();
private final List<PCollection<Row>> customQueue = new ArrayList<>();
private final Map<String, PCollection<Row>> executeAfterNamedQueue = new HashMap<>();
private final Map<String, PCollection<Row>> executionContexts = new HashMap<>();
private PCollection<Row> defaultCollection;
private final Map<String, PCollection<Row>> outputs = new HashMap<>();

Check warning on line 35 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L35

Added line #L35 was not covered by tests
private final PCollection<Row> defaultCollection;

public BeamBlock(PCollection<Row> defaultCollection) {
this.defaultCollection = defaultCollection;
}

public void addToQueue(
ArtifactType artifactType, boolean preload, String name, PCollection<Row> blockingReturn) {
addToQueue(artifactType, preload, name, blockingReturn, defaultCollection);
public void addToQueue(ArtifactType artifactType, String name, PCollection<Row> output) {
outputs.put(artifactType.name() + ":" + name, output);

Check warning on line 43 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L43

Added line #L43 was not covered by tests
}

public void addToQueue(
ArtifactType artifactType,
boolean preload,
String name,
PCollection<Row> blockingReturn,
PCollection<Row> executionContext) {
switch (artifactType) {
case action:
if (preload) {
preloadActionQueue.add(blockingReturn);
} else {
processActionQueue.add(blockingReturn);
}
break;
case source:
sourceQueue.add(blockingReturn);
break;
case node:
nodeQueue.add(blockingReturn);
break;
case edge:
edgeQueue.add(blockingReturn);
break;
case custom_query:
customQueue.add(blockingReturn);
break;
}
executeAfterNamedQueue.put(artifactType.name() + ":" + name, blockingReturn);
executionContexts.put(artifactType.name() + ":" + name, executionContext);
}

public PCollection<Row> waitOnCollections(
public PCollection<Row> resolveOutputs(
Collection<String> dependencies, String queuingDescription) {
List<PCollection<Row>> waitOnQueues = populateQueueForTargets(dependencies);
List<PCollection<Row>> waitOnQueues = resolveOutputs(dependencies);

Check warning on line 48 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L48

Added line #L48 was not covered by tests
if (waitOnQueues.isEmpty()) {
waitOnQueues.add(defaultCollection);
}
Expand All @@ -103,15 +64,16 @@
Flatten.pCollections());
}

private List<PCollection<Row>> populateQueueForTargets(Collection<String> dependencies) {
List<PCollection<Row>> waitOnQueues = new ArrayList<>();
private List<PCollection<Row>> resolveOutputs(Collection<String> dependencies) {
List<PCollection<Row>> outputs = new ArrayList<>();

Check warning on line 68 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L68

Added line #L68 was not covered by tests
for (String dependency : dependencies) {
for (ArtifactType type : ArtifactType.values()) {
if (executeAfterNamedQueue.containsKey(type + ":" + dependency)) {
waitOnQueues.add(executeAfterNamedQueue.get(type + ":" + dependency));
if (this.outputs.containsKey(type + ":" + dependency)) {
outputs.add(this.outputs.get(type + ":" + dependency));
break;

Check warning on line 73 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L72-L73

Added lines #L72 - L73 were not covered by tests
}
}
}
return waitOnQueues;
return outputs;

Check warning on line 77 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L77

Added line #L77 was not covered by tests
}
}
Loading