Skip to content

Commit

Permalink
Simplify dependency manager: remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
fbiville committed Nov 14, 2024
1 parent c67dd1e commit 442c4bd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ public void run() {
String.format("Metadata for source %s", source.getName()), provider.queryMetadata());
sourceRows.add(sourceMetadata);
Schema sourceBeamSchema = sourceMetadata.getSchema();
processingQueue.addToQueue(
ArtifactType.source, false, source.getName(), defaultActionContext, sourceMetadata);
processingQueue.addToQueue(ArtifactType.source, source.getName(), defaultActionContext);

Check warning on line 290 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L290

Added line #L290 was not covered by tests
PCollection<Row> nullableSourceBeamRows = null;

////////////////////////////
Expand Down Expand Up @@ -333,7 +332,7 @@ public void run() {
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_NODES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(target.getDependencies(), nodeStepDescription));
processingQueue.resolveOutputs(target.getDependencies(), nodeStepDescription));

Check warning on line 335 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L335

Added line #L335 was not covered by tests

PCollection<Row> blockingReturn =
preInsertBeamRows
Expand All @@ -359,8 +358,7 @@ public void run() {
.computeIfAbsent(TargetType.NODE, (type) -> new ArrayList<>(nodeTargets.size()))
.add(blockingReturn);

processingQueue.addToQueue(
ArtifactType.node, false, target.getName(), blockingReturn, preInsertBeamRows);
processingQueue.addToQueue(ArtifactType.node, target.getName(), blockingReturn);

Check warning on line 361 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L361

Added line #L361 was not covered by tests
}

////////////////////////////
Expand Down Expand Up @@ -396,8 +394,7 @@ public void run() {
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_RELATIONSHIPS, List.of()));
dependencies.add(
processingQueue.waitOnCollections(
target.getDependencies(), relationshipStepDescription));
processingQueue.resolveOutputs(target.getDependencies(), relationshipStepDescription));

Check warning on line 397 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L397

Added line #L397 was not covered by tests

PCollection<Row> blockingReturn =
preInsertBeamRows
Expand All @@ -424,8 +421,7 @@ public void run() {
TargetType.RELATIONSHIP, (type) -> new ArrayList<>(relationshipTargets.size()))
.add(blockingReturn);
// serialize relationships
processingQueue.addToQueue(
ArtifactType.edge, false, target.getName(), blockingReturn, preInsertBeamRows);
processingQueue.addToQueue(ArtifactType.edge, target.getName(), blockingReturn);

Check warning on line 424 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L424

Added line #L424 was not covered by tests
}
////////////////////////////
// Custom query targets
Expand All @@ -443,8 +439,7 @@ public void run() {
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_QUERIES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(
target.getDependencies(), customQueryStepDescription));
processingQueue.resolveOutputs(target.getDependencies(), customQueryStepDescription));

Check warning on line 442 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L442

Added line #L442 was not covered by tests

PCollection<Row> blockingReturn =
nullableSourceBeamRows
Expand All @@ -469,12 +464,7 @@ public void run() {
targetRows
.computeIfAbsent(TargetType.QUERY, (type) -> new ArrayList<>(customQueryTargets.size()))
.add(blockingReturn);
processingQueue.addToQueue(
ArtifactType.custom_query,
false,
target.getName(),
blockingReturn,
nullableSourceBeamRows);
processingQueue.addToQueue(ArtifactType.custom_query, target.getName(), blockingReturn);

Check warning on line 467 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/templates/GoogleCloudToNeo4j.java#L467

Added line #L467 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,58 +31,19 @@
public class BeamBlock {

private static final Logger LOG = LoggerFactory.getLogger(BeamBlock.class);
private final List<PCollection<Row>> sourceQueue = new ArrayList<>();
private final List<PCollection<Row>> preloadActionQueue = new ArrayList<>();
private final List<PCollection<Row>> processActionQueue = new ArrayList<>();
private final List<PCollection<Row>> nodeQueue = new ArrayList<>();
private final List<PCollection<Row>> edgeQueue = new ArrayList<>();
private final List<PCollection<Row>> customQueue = new ArrayList<>();
private final Map<String, PCollection<Row>> executeAfterNamedQueue = new HashMap<>();
private final Map<String, PCollection<Row>> executionContexts = new HashMap<>();
private PCollection<Row> defaultCollection;
private final Map<String, PCollection<Row>> outputs = new HashMap<>();

Check warning on line 34 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L34

Added line #L34 was not covered by tests
private final PCollection<Row> defaultCollection;

public BeamBlock(PCollection<Row> defaultCollection) {
this.defaultCollection = defaultCollection;
}

public void addToQueue(
ArtifactType artifactType, boolean preload, String name, PCollection<Row> blockingReturn) {
addToQueue(artifactType, preload, name, blockingReturn, defaultCollection);
public void addToQueue(ArtifactType artifactType, String name, PCollection<Row> output) {
outputs.put(artifactType.name() + ":" + name, output);

Check warning on line 42 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L42

Added line #L42 was not covered by tests
}

public void addToQueue(
ArtifactType artifactType,
boolean preload,
String name,
PCollection<Row> blockingReturn,
PCollection<Row> executionContext) {
switch (artifactType) {
case action:
if (preload) {
preloadActionQueue.add(blockingReturn);
} else {
processActionQueue.add(blockingReturn);
}
break;
case source:
sourceQueue.add(blockingReturn);
break;
case node:
nodeQueue.add(blockingReturn);
break;
case edge:
edgeQueue.add(blockingReturn);
break;
case custom_query:
customQueue.add(blockingReturn);
break;
}
executeAfterNamedQueue.put(artifactType.name() + ":" + name, blockingReturn);
executionContexts.put(artifactType.name() + ":" + name, executionContext);
}

public PCollection<Row> waitOnCollections(List<String> dependencies, String queuingDescription) {
List<PCollection<Row>> waitOnQueues = populateQueueForTargets(dependencies);
public PCollection<Row> resolveOutputs(List<String> dependencies, String queuingDescription) {
List<PCollection<Row>> waitOnQueues = resolveOutputs(dependencies);

Check warning on line 46 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L46

Added line #L46 was not covered by tests
if (waitOnQueues.isEmpty()) {
waitOnQueues.add(defaultCollection);
}
Expand All @@ -101,15 +62,16 @@ public PCollection<Row> waitOnCollections(List<String> dependencies, String queu
Flatten.pCollections());
}

private List<PCollection<Row>> populateQueueForTargets(List<String> dependencies) {
List<PCollection<Row>> waitOnQueues = new ArrayList<>();
private List<PCollection<Row>> resolveOutputs(List<String> dependencies) {
List<PCollection<Row>> outputs = new ArrayList<>();

Check warning on line 66 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L66

Added line #L66 was not covered by tests
for (String dependency : dependencies) {
for (ArtifactType type : ArtifactType.values()) {
if (executeAfterNamedQueue.containsKey(type + ":" + dependency)) {
waitOnQueues.add(executeAfterNamedQueue.get(type + ":" + dependency));
if (this.outputs.containsKey(type + ":" + dependency)) {
outputs.add(this.outputs.get(type + ":" + dependency));
break;

Check warning on line 71 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L70-L71

Added lines #L70 - L71 were not covered by tests
}
}
}
return waitOnQueues;
return outputs;

Check warning on line 75 in v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/utils/BeamBlock.java#L75

Added line #L75 was not covered by tests
}
}

0 comments on commit 442c4bd

Please sign in to comment.