Skip to content

Commit

Permalink
Simplify dependency manager: remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
fbiville committed Dec 16, 2024
1 parent cd28d01 commit 4b0d74c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ public void run() {
String.format("Metadata for source %s", source.getName()), provider.queryMetadata());
sourceRows.add(sourceMetadata);
Schema sourceBeamSchema = sourceMetadata.getSchema();
processingQueue.addToQueue(
ArtifactType.source, false, source.getName(), defaultActionContext, sourceMetadata);
processingQueue.addToQueue(ArtifactType.source, source.getName(), defaultActionContext);

Check warning on line 295 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L295

Added line #L295 was not covered by tests
PCollection<Row> nullableSourceBeamRows = null;

////////////////////////////
Expand Down Expand Up @@ -338,7 +337,7 @@ public void run() {
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_NODES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(target.getDependencies(), nodeStepDescription));
processingQueue.resolveOutputs(target.getDependencies(), nodeStepDescription));

Check warning on line 340 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L340

Added line #L340 was not covered by tests

PCollection<Row> blockingReturn =
preInsertBeamRows
Expand All @@ -364,8 +363,7 @@ public void run() {
.computeIfAbsent(TargetType.NODE, (type) -> new ArrayList<>(nodeTargets.size()))
.add(blockingReturn);

processingQueue.addToQueue(
ArtifactType.node, false, target.getName(), blockingReturn, preInsertBeamRows);
processingQueue.addToQueue(ArtifactType.node, target.getName(), blockingReturn);

Check warning on line 366 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L366

Added line #L366 was not covered by tests
}

////////////////////////////
Expand Down Expand Up @@ -406,7 +404,7 @@ public void run() {
dependencyNames.add(target.getStartNodeReference());
dependencyNames.add(target.getEndNodeReference());
dependencies.add(
processingQueue.waitOnCollections(dependencyNames, relationshipStepDescription));
processingQueue.resolveOutputs(dependencyNames, relationshipStepDescription));

Check warning on line 407 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L407

Added line #L407 was not covered by tests

PCollection<Row> blockingReturn =
preInsertBeamRows
Expand All @@ -433,8 +431,7 @@ public void run() {
TargetType.RELATIONSHIP, (type) -> new ArrayList<>(relationshipTargets.size()))
.add(blockingReturn);
// serialize relationships
processingQueue.addToQueue(
ArtifactType.edge, false, target.getName(), blockingReturn, preInsertBeamRows);
processingQueue.addToQueue(ArtifactType.edge, target.getName(), blockingReturn);

Check warning on line 434 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L434

Added line #L434 was not covered by tests
}
////////////////////////////
// Custom query targets
Expand All @@ -452,8 +449,7 @@ public void run() {
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_QUERIES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(
target.getDependencies(), customQueryStepDescription));
processingQueue.resolveOutputs(target.getDependencies(), customQueryStepDescription));

Check warning on line 452 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L452

Added line #L452 was not covered by tests

PCollection<Row> blockingReturn =
nullableSourceBeamRows
Expand All @@ -478,12 +474,7 @@ public void run() {
targetRows
.computeIfAbsent(TargetType.QUERY, (type) -> new ArrayList<>(customQueryTargets.size()))
.add(blockingReturn);
processingQueue.addToQueue(
ArtifactType.custom_query,
false,
target.getName(),
blockingReturn,
nullableSourceBeamRows);
processingQueue.addToQueue(ArtifactType.custom_query, target.getName(), blockingReturn);

Check warning on line 477 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L477

Added line #L477 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,59 +32,20 @@
public class BeamBlock {

private static final Logger LOG = LoggerFactory.getLogger(BeamBlock.class);
private final List<PCollection<Row>> sourceQueue = new ArrayList<>();
private final List<PCollection<Row>> preloadActionQueue = new ArrayList<>();
private final List<PCollection<Row>> processActionQueue = new ArrayList<>();
private final List<PCollection<Row>> nodeQueue = new ArrayList<>();
private final List<PCollection<Row>> edgeQueue = new ArrayList<>();
private final List<PCollection<Row>> customQueue = new ArrayList<>();
private final Map<String, PCollection<Row>> executeAfterNamedQueue = new HashMap<>();
private final Map<String, PCollection<Row>> executionContexts = new HashMap<>();
private PCollection<Row> defaultCollection;
private final Map<String, PCollection<Row>> outputs = new HashMap<>();

Check warning on line 35 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L35

Added line #L35 was not covered by tests
private final PCollection<Row> defaultCollection;

public BeamBlock(PCollection<Row> defaultCollection) {
this.defaultCollection = defaultCollection;
}

public void addToQueue(
ArtifactType artifactType, boolean preload, String name, PCollection<Row> blockingReturn) {
addToQueue(artifactType, preload, name, blockingReturn, defaultCollection);
public void addToQueue(ArtifactType artifactType, String name, PCollection<Row> output) {
outputs.put(artifactType.name() + ":" + name, output);

Check warning on line 43 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L43

Added line #L43 was not covered by tests
}

public void addToQueue(
ArtifactType artifactType,
boolean preload,
String name,
PCollection<Row> blockingReturn,
PCollection<Row> executionContext) {
switch (artifactType) {
case action:
if (preload) {
preloadActionQueue.add(blockingReturn);
} else {
processActionQueue.add(blockingReturn);
}
break;
case source:
sourceQueue.add(blockingReturn);
break;
case node:
nodeQueue.add(blockingReturn);
break;
case edge:
edgeQueue.add(blockingReturn);
break;
case custom_query:
customQueue.add(blockingReturn);
break;
}
executeAfterNamedQueue.put(artifactType.name() + ":" + name, blockingReturn);
executionContexts.put(artifactType.name() + ":" + name, executionContext);
}

public PCollection<Row> waitOnCollections(
public PCollection<Row> resolveOutputs(
Collection<String> dependencies, String queuingDescription) {
List<PCollection<Row>> waitOnQueues = populateQueueForTargets(dependencies);
List<PCollection<Row>> waitOnQueues = resolveOutputs(dependencies);

Check warning on line 48 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L48

Added line #L48 was not covered by tests
if (waitOnQueues.isEmpty()) {
waitOnQueues.add(defaultCollection);
}
Expand All @@ -103,15 +64,16 @@ public PCollection<Row> waitOnCollections(
Flatten.pCollections());
}

private List<PCollection<Row>> populateQueueForTargets(Collection<String> dependencies) {
List<PCollection<Row>> waitOnQueues = new ArrayList<>();
private List<PCollection<Row>> resolveOutputs(Collection<String> dependencies) {
List<PCollection<Row>> outputs = new ArrayList<>();

Check warning on line 68 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L68

Added line #L68 was not covered by tests
for (String dependency : dependencies) {
for (ArtifactType type : ArtifactType.values()) {
if (executeAfterNamedQueue.containsKey(type + ":" + dependency)) {
waitOnQueues.add(executeAfterNamedQueue.get(type + ":" + dependency));
if (this.outputs.containsKey(type + ":" + dependency)) {
outputs.add(this.outputs.get(type + ":" + dependency));
break;

Check warning on line 73 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L72-L73

Added lines #L72 - L73 were not covered by tests
}
}
}
return waitOnQueues;
return outputs;

Check warning on line 77 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L77

Added line #L77 was not covered by tests
}
}

0 comments on commit 4b0d74c

Please sign in to comment.