Skip to content

Commit

Permalink
Merge pull request #3968 from lcpopa/master
Browse files Browse the repository at this point in the history
Janus Connector - Adding transaction commits to lineage queries
  • Loading branch information
planetf1 authored Oct 27, 2020
2 parents 100fe80 + a0e6f6c commit 652a3ed
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void initializeGraphDB(AuditLog auditLog) throws OpenLineageException {
errorCode.getUserAction());
}

this.helper = new LineageGraphConnectorHelper(g);
this.helper = new LineageGraphConnectorHelper(g, graphFactory.isSupportingTransactions());

} catch (JanusConnectorException error) {
log.error("The Lineage graph could not be initialized due to an error", error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.odpi.openmetadata.governanceservers.openlineage.model.LineageVertex;
import org.odpi.openmetadata.governanceservers.openlineage.model.LineageVerticesAndEdges;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -87,15 +89,19 @@

public class LineageGraphConnectorHelper {

private GraphTraversalSource g;
private String[] glossaryTermAndClassificationEdges = {EDGE_LABEL_SEMANTIC_ASSIGNMENT, EDGE_LABEL_RELATED_TERM,
private static final Logger log = LoggerFactory.getLogger(LineageGraphConnectorHelper.class);

private final GraphTraversalSource g;
private final boolean supportingTransactions;
private final String[] glossaryTermAndClassificationEdges = {EDGE_LABEL_SEMANTIC_ASSIGNMENT, EDGE_LABEL_RELATED_TERM,
EDGE_LABEL_SYNONYM, EDGE_LABEL_ANTONYM, EDGE_LABEL_REPLACEMENT_TERM, EDGE_LABEL_TRANSLATION, EDGE_LABEL_IS_A_RELATIONSHIP,
EDGE_LABEL_CLASSIFICATION, EDGE_LABEL_TERM_CATEGORIZATION};
private String[] relationalColumnAndClassificationEdges = {NESTED_SCHEMA_ATTRIBUTE, EDGE_LABEL_CLASSIFICATION, EDGE_LABEL_SEMANTIC_ASSIGNMENT};
private String[] tabularColumnAndClassificationEdges = {ATTRIBUTE_FOR_SCHEMA, EDGE_LABEL_CLASSIFICATION, EDGE_LABEL_SEMANTIC_ASSIGNMENT};
private final String[] relationalColumnAndClassificationEdges = {NESTED_SCHEMA_ATTRIBUTE, EDGE_LABEL_CLASSIFICATION, EDGE_LABEL_SEMANTIC_ASSIGNMENT};
private final String[] tabularColumnAndClassificationEdges = {ATTRIBUTE_FOR_SCHEMA, EDGE_LABEL_CLASSIFICATION, EDGE_LABEL_SEMANTIC_ASSIGNMENT};

public LineageGraphConnectorHelper(GraphTraversalSource graphTraversalSource) {
public LineageGraphConnectorHelper(GraphTraversalSource graphTraversalSource, boolean supportingTransactions) {
this.g = graphTraversalSource;
this.supportingTransactions = supportingTransactions;
}

/**
Expand All @@ -113,17 +119,31 @@ public Optional<LineageVerticesAndEdges> ultimateSource(String guid, boolean inc
return Optional.empty();
}
String edgeLabel = edgeLabelOptional.get();

Graph sourceGraph = (Graph)
g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
until(inE(edgeLabel).count().is(0)).
repeat((Traversal) inE(edgeLabel).subgraph("subGraph").outV().simplePath()).
cap("subGraph").next();

List<Vertex> sourcesList = g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
until(inE(edgeLabel).count().is(0)).
repeat(inE(edgeLabel).outV().simplePath()).
dedup().toList();
Graph sourceGraph = null;
List<Vertex> sourcesList = null;

try {
sourceGraph = (Graph)
g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
until(inE(edgeLabel).count().is(0)).
repeat((Traversal) inE(edgeLabel).subgraph("subGraph").outV().simplePath()).
cap("subGraph").next();

sourcesList = g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
until(inE(edgeLabel).count().is(0)).
repeat(inE(edgeLabel).outV().simplePath()).
dedup().toList();

if(supportingTransactions){
g.tx().commit();
}
}catch (Exception e){
if(supportingTransactions) {
g.tx().rollback();
}
log.error("Exception while querying ultimate source horizontal lineage of guid " + guid + ". Executed rollback.");
log.error("Message: " + e.getMessage());
}

return Optional.of(getCondensedLineage(guid, g, sourceGraph, getLineageVertices(sourcesList), SOURCE_CONDENSATION, includeProcesses));
}
Expand All @@ -142,17 +162,31 @@ public Optional<LineageVerticesAndEdges> ultimateDestination(String guid, boolea
return Optional.empty();
}
String edgeLabel = edgeLabelOptional.get();

Graph destinationGraph = (Graph)
g.V().has(PROPERTY_KEY_ENTITY_GUID, guid)
.until(outE(edgeLabel).count().is(0))
.repeat((Traversal) outE(edgeLabel).subgraph("subGraph").inV().simplePath())
.cap("subGraph").next();

List<Vertex> destinationsList = g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
until(outE(edgeLabel).count().is(0)).
repeat(outE(edgeLabel).inV().simplePath()).
dedup().toList();
Graph destinationGraph = null;
List<Vertex> destinationsList = null;

try{
destinationGraph = (Graph)
g.V().has(PROPERTY_KEY_ENTITY_GUID, guid)
.until(outE(edgeLabel).count().is(0))
.repeat((Traversal) outE(edgeLabel).subgraph("subGraph").inV().simplePath())
.cap("subGraph").next();

destinationsList = g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
until(outE(edgeLabel).count().is(0)).
repeat(outE(edgeLabel).inV().simplePath()).
dedup().toList();

if(supportingTransactions){
g.tx().commit();
}
}catch (Exception e){
if(supportingTransactions) {
g.tx().rollback();
}
log.error("Exception while querying ultimate destination horizontal lineage of guid " + guid + ". Executed rollback.");
log.error("Message: " + e.getMessage());
}

return Optional.of(getCondensedLineage(guid, g, destinationGraph, getLineageVertices(destinationsList), DESTINATION_CONDENSATION,
includeProcesses));
Expand All @@ -172,16 +206,28 @@ public Optional<LineageVerticesAndEdges> endToEnd(String guid, boolean includePr
return Optional.empty();
}
String edgeLabel = edgeLabelOptional.get();


Graph endToEndGraph = (Graph)
g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
union(
until(inE(edgeLabel).count().is(0)).
repeat((Traversal) inE(edgeLabel).subgraph("subGraph").outV().simplePath()),
until(outE(edgeLabel).count().is(0)).
repeat((Traversal) outE(edgeLabel).subgraph("subGraph").inV().simplePath())
).cap("subGraph").next();
Graph endToEndGraph = null;

try{
endToEndGraph = (Graph)
g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).
union(
until(inE(edgeLabel).count().is(0)).
repeat((Traversal) inE(edgeLabel).subgraph("subGraph").outV().simplePath()),
until(outE(edgeLabel).count().is(0)).
repeat((Traversal) outE(edgeLabel).subgraph("subGraph").inV().simplePath())
).cap("subGraph").next();

if(supportingTransactions){
g.tx().commit();
}
}catch (Exception e){
if(supportingTransactions) {
g.tx().rollback();
}
log.error("Exception while querying end to end horizontal lineage of guid " + guid + ". Executed rollback.");
log.error("Message: " + e.getMessage());
}

return Optional.of(getLineageVerticesAndEdges(endToEndGraph, includeProcesses));
}
Expand Down Expand Up @@ -227,8 +273,22 @@ public Optional<LineageVerticesAndEdges> sourceAndDestination(String guid, boole
*/
private Optional<LineageVerticesAndEdges> glossaryVerticalLineage(String guid) {

Graph subGraph = (Graph) g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).bothE(glossaryTermAndClassificationEdges)
.subgraph("s").cap("s").next();
Graph subGraph = null;

try{
subGraph = (Graph) g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).bothE(glossaryTermAndClassificationEdges)
.subgraph("s").cap("s").next();

if(supportingTransactions){
g.tx().commit();
}
}catch (Exception e){
if(supportingTransactions) {
g.tx().rollback();
}
log.error("Exception while querying glossary term vertical lineage of guid " + guid + ". Executed rollback.");
log.error("Message: " + e.getMessage());
}

return Optional.of(getLineageVerticesAndEdges(subGraph, true));
}
Expand All @@ -243,8 +303,22 @@ private Optional<LineageVerticesAndEdges> glossaryVerticalLineage(String guid) {
*/
private Optional<LineageVerticesAndEdges> relationalColumnVerticalLineage(String guid) {

Graph subGraph = (Graph) g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).bothE(relationalColumnAndClassificationEdges)
.subgraph("s").cap("s").next();
Graph subGraph = null;

try{
subGraph = (Graph) g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).bothE(relationalColumnAndClassificationEdges)
.subgraph("s").cap("s").next();

if(supportingTransactions){
g.tx().commit();
}
}catch (Exception e){
if(supportingTransactions) {
g.tx().rollback();
}
log.error("Exception while querying relational column vertical lineage of guid " + guid + ". Executed rollback.");
log.error("Message: " + e.getMessage());
}

return Optional.of(getLineageVerticesAndEdges(subGraph, true));
}
Expand All @@ -259,8 +333,22 @@ private Optional<LineageVerticesAndEdges> relationalColumnVerticalLineage(String
*/
private Optional<LineageVerticesAndEdges> tabularColumnVerticalLineage(String guid ) {

Graph subGraph = (Graph) g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).bothE(tabularColumnAndClassificationEdges)
.subgraph("s").bothV().inE(ASSET_SCHEMA_TYPE).subgraph("s").cap("s").next();
Graph subGraph = null;

try{
subGraph = (Graph) g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).bothE(tabularColumnAndClassificationEdges)
.subgraph("s").bothV().inE(ASSET_SCHEMA_TYPE).subgraph("s").cap("s").next();

if(supportingTransactions){
g.tx().commit();
}
}catch (Exception e){
if(supportingTransactions) {
g.tx().rollback();
}
log.error("Exception while querying tabular column vertical lineage of guid " + guid + ". Executed rollback.");
log.error("Message: " + e.getMessage());
}

return Optional.of(getLineageVerticesAndEdges(subGraph, true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class LineageGraphConnectorHelperTest {
public static void beforeClass() {
Graph graph = JanusGraphFactory.build().set("storage.backend", "inmemory").open();
GraphTraversalSource g = graph.traversal();
mainGraphConnector = new LineageGraphConnectorHelper(g);
mainGraphConnector = new LineageGraphConnectorHelper(g, true);

addColumnLineageData(g);

Expand Down

0 comments on commit 652a3ed

Please sign in to comment.