Skip to content

Commit

Permalink
Extract duplicated batching aspect to single class
Browse files Browse the repository at this point in the history
  • Loading branch information
fbiville committed Jan 17, 2024
1 parent 3a25418 commit e0eee3f
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 205 deletions.
77 changes: 77 additions & 0 deletions src/main/java/liquibase/ext/neo4j/change/BatchableChange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package liquibase.ext.neo4j.change;

import liquibase.Scope;
import liquibase.change.AbstractChange;
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.neo4j.database.Neo4jDatabase;
import liquibase.logging.Logger;
import liquibase.statement.SqlStatement;

abstract class BatchableChange extends AbstractChange {

protected Boolean enableBatchImport = Boolean.FALSE;

protected Long batchSize;

@Override
public ValidationErrors validate(Database database) {
ValidationErrors validation = new ValidationErrors(this);
if (enableBatchImport && getChangeSet().isRunInTransaction()) {
validation.addError("enableBatchImport can be true only if the enclosing change set's runInTransaction attribute is set to false");
}
if (!enableBatchImport && batchSize != null) {
validation.addError("batch size must be set only if enableBatchImport is set to true");
}
if (batchSize != null && batchSize <= 0) {
validation.addError("batch size, if set, must be strictly positive");
}
Neo4jDatabase neo4j = (Neo4jDatabase) database;
if (enableBatchImport && !neo4j.supportsCallInTransactions()) {
validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored");
}
validation.addAll(super.validate(database));
return validation;
}

@Override
public SqlStatement[] generateStatements(Database database) {
Logger log = Scope.getCurrentScope().getLog(getClass());
boolean supportsCallInTransactions = ((Neo4jDatabase) database).supportsCallInTransactions();
if (supportsCallInTransactions && enableBatchImport) {
log.info("Running change in CALL {} IN TRANSACTIONS");
return generateBatchedStatements(database);
} else if (!supportsCallInTransactions) {
log.warning("This version of Neo4j does not support CALL {} IN TRANSACTIONS, the type rename is going to run in a single, possibly large and slow, transaction.\n" +
"Note: upgrade the Neo4j server or set the runInTransaction attribute of the enclosing change set to true to make this warning disappear.");
} else {
log.info("Running type rename in single transaction (set enableBatchImport to true to switch to CALL {} IN TRANSACTIONS)");
}
return generateUnbatchedStatements(database);
}

protected abstract SqlStatement[] generateBatchedStatements(Database database);

protected abstract SqlStatement[] generateUnbatchedStatements(Database database);


public Boolean getEnableBatchImport() {
return enableBatchImport;
}

public void setEnableBatchImport(Boolean enableBatchImport) {
this.enableBatchImport = enableBatchImport;
}

public Long getBatchSize() {
return batchSize;
}

public void setBatchSize(Long batchSize) {
this.batchSize = batchSize;
}

protected String cypherBatchSpec() {
return batchSize != null ? String.format(" OF %d ROWS", batchSize) : "";
}
}
80 changes: 16 additions & 64 deletions src/main/java/liquibase/ext/neo4j/change/InvertDirectionChange.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package liquibase.ext.neo4j.change;

import liquibase.Scope;
import liquibase.change.AbstractChange;
import liquibase.change.ChangeMetaData;
import liquibase.change.DatabaseChange;
import liquibase.database.Database;
import liquibase.exception.LiquibaseException;
import liquibase.exception.ValidationErrors;
import liquibase.ext.neo4j.database.Neo4jDatabase;
import liquibase.logging.Logger;
import liquibase.statement.SqlStatement;
import liquibase.statement.core.RawParameterizedSqlStatement;
import liquibase.statement.core.RawSqlStatement;

import java.util.List;
import java.util.Map;

@DatabaseChange(name = "invertDirection", priority = ChangeMetaData.PRIORITY_DEFAULT, description =
"The 'invertDirection' tag allows you to invert the direction of relationships.\n" +
Expand All @@ -24,18 +18,14 @@
"the relationships to merge. If the fragment is '(:Movie)<-[d:DIRECTED_BY]-(:Director {name: 'John Woo'})-[a:ACTED_IN]->(:Movie)', " +
"the output variable is either 'd' or 'a' depending on the relationships the inversion should affect.")

public class InvertDirectionChange extends AbstractChange {
public class InvertDirectionChange extends BatchableChange {

private String fragment;

private String outputVariable;

private String type;

private Boolean enableBatchImport = Boolean.FALSE;

private Long batchSize;

@Override
public boolean supports(Database database) {
return database instanceof Neo4jDatabase;
Expand All @@ -55,19 +45,6 @@ public ValidationErrors validate(Database database) {
if ("__rel__".equals(outputVariable)) {
validation.addError(String.format("outputVariable %s clashes with the reserved variable name: __rel__. outputVariable must be renamed and fragment accordingly updated", outputVariable));
}
if (enableBatchImport && getChangeSet().isRunInTransaction()) {
validation.addError("enableBatchImport can be true only if the enclosing change set's runInTransaction attribute is set to false");
}
if (!enableBatchImport && batchSize != null) {
validation.addError("batch size must be set only if enableBatchImport is set to true");
}
if (batchSize != null && batchSize <= 0) {
validation.addError("batch size, if set, must be strictly positive");
}
Neo4jDatabase neo4j = (Neo4jDatabase) database;
if (enableBatchImport && !neo4j.supportsCallInTransactions()) {
validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored");
}
validation.addAll(super.validate(database));
return validation;
}
Expand All @@ -80,29 +57,21 @@ public String getConfirmationMessage() {
return String.format("the direction of relationships with type %s has been inverted", type); }

@Override
public SqlStatement[] generateStatements(Database database) {
Logger log = Scope.getCurrentScope().getLog(getClass());
boolean supportsCallInTransactions = ((Neo4jDatabase) database).supportsCallInTransactions();
if (supportsCallInTransactions && enableBatchImport) {
log.info("Running type rename in CALL {} IN TRANSACTIONS");
String batchSpec = batchSize != null ? String.format(" OF %d ROWS", batchSize) : "";
String cypher = String.format("%s " +
"CALL { " +
" WITH __rel__ " +
" MATCH (__start__) WHERE id(__start__) = id(startNode(__rel__)) " +
" MATCH (__end__) WHERE id(__end__) = id(endNode(__rel__)) " +
" CREATE (__start__)<-[__newrel__:`%s`]-(__end__) " +
" SET __newrel__ = properties(__rel__) " +
" DELETE __rel__ " +
"} IN TRANSACTIONS%s", queryStart(), type, batchSpec);
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, type)};
}
if (!supportsCallInTransactions) {
log.warning("This version of Neo4j does not support CALL {} IN TRANSACTIONS, the type rename is going to run in a single, possibly large and slow, transaction.\n" +
"Note: upgrade the Neo4j server or set the runInTransaction attribute of the enclosing change set to true to make this warning disappear.");
} else {
log.info("Running type rename in single transaction (set enableBatchImport to true to switch to CALL {} IN TRANSACTIONS)");
}
protected SqlStatement[] generateBatchedStatements(Database database) {
String cypher = String.format("%s " +
"CALL { " +
" WITH __rel__ " +
" MATCH (__start__) WHERE id(__start__) = id(startNode(__rel__)) " +
" MATCH (__end__) WHERE id(__end__) = id(endNode(__rel__)) " +
" CREATE (__start__)<-[__newrel__:`%s`]-(__end__) " +
" SET __newrel__ = properties(__rel__) " +
" DELETE __rel__ " +
"} IN TRANSACTIONS%s", queryStart(), type, cypherBatchSpec());
return new SqlStatement[]{new RawParameterizedSqlStatement(cypher, type)};
}

@Override
protected SqlStatement[] generateUnbatchedStatements(Database database) {
String cypher = String.format("%s " +
"MATCH (__start__) WHERE id(__start__) = id(startNode(__rel__)) " +
"MATCH (__end__) WHERE id(__end__) = id(endNode(__rel__)) " +
Expand Down Expand Up @@ -135,23 +104,6 @@ public String getOutputVariable() {
public void setOutputVariable(String outputVariable) {
this.outputVariable = outputVariable;
}

public Boolean getEnableBatchImport() {
return enableBatchImport;
}

public void setEnableBatchImport(Boolean enableBatchImport) {
this.enableBatchImport = enableBatchImport;
}

public Long getBatchSize() {
return batchSize;
}

public void setBatchSize(Long batchSize) {
this.batchSize = batchSize;
}

private String queryStart() {
if (fragment != null) {
return String.format("MATCH %s WITH %s AS __rel__ WHERE type(__rel__) = $1", fragment, outputVariable);
Expand Down
59 changes: 9 additions & 50 deletions src/main/java/liquibase/ext/neo4j/change/RenameLabelChange.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"'renameLabel' also defines the 'outputVariable' attribute. This attribute denotes the variable used in the pattern for\n" +
"the nodes to merge. If the fragment is '(m:Movie)<-[:DIRECTED_BY]-(d:Director {name: 'John Woo'})', " +
"the output variable is either 'm' or 'd' depending on the nodes the rename should affect.")
public class RenameLabelChange extends AbstractChange {
public class RenameLabelChange extends BatchableChange {

private String from;

Expand All @@ -29,10 +29,6 @@ public class RenameLabelChange extends AbstractChange {

private String outputVariable;

private Boolean enableBatchImport = Boolean.FALSE;

private Long batchSize;

@Override
public boolean supports(Database database) {
return database instanceof Neo4jDatabase;
Expand All @@ -55,19 +51,6 @@ public ValidationErrors validate(Database database) {
if ("__node__".equals(outputVariable)) {
validation.addError("__node__ is a reserved variable name, outputVariable must be renamed and fragment accordingly updated");
}
if (enableBatchImport && getChangeSet().isRunInTransaction()) {
validation.addError("enableBatchImport can be true only if the enclosing change set's runInTransaction attribute is set to false");
}
if (!enableBatchImport && batchSize != null) {
validation.addError("batch size must be set only if enableBatchImport is set to true");
}
if (batchSize != null && batchSize <= 0) {
validation.addError("batch size, if set, must be strictly positive");
}
Neo4jDatabase neo4j = (Neo4jDatabase) database;
if (enableBatchImport && !neo4j.supportsCallInTransactions()) {
validation.addWarning("this version of Neo4j does not support CALL {} IN TRANSACTIONS, all batch import settings are ignored");
}
validation.addAll(super.validate(database));
return validation;
}
Expand All @@ -81,21 +64,14 @@ public String getConfirmationMessage() {
}

@Override
public SqlStatement[] generateStatements(Database database) {
Logger log = Scope.getCurrentScope().getLog(getClass());
boolean supportsCallInTransactions = ((Neo4jDatabase) database).supportsCallInTransactions();
if (supportsCallInTransactions && enableBatchImport) {
log.info("Running label rename in CALL {} IN TRANSACTIONS");
String batchSpec = batchSize != null ? String.format(" OF %d ROWS", batchSize) : "";
String cypher = String.format("%s CALL {WITH __node__ SET __node__:`%s` REMOVE __node__:`%s`} IN TRANSACTIONS%s", queryStart(), to, from, batchSpec);
return new SqlStatement[]{new RawSqlStatement(cypher)};
}
if (!supportsCallInTransactions) {
log.warning("This version of Neo4j does not support CALL {} IN TRANSACTIONS, the label rename is going to run in a single, possibly large and slow, transaction.\n" +
"Note: set the runInTransaction attribute of the enclosing change set to true to make this warning disappear.");
} else {
log.info("Running label rename in single transaction (set enableBatchImport to true to switch to CALL {} IN TRANSACTIONS)");
}
protected SqlStatement[] generateBatchedStatements(Database database) {
String cypher = String.format("%s CALL {WITH __node__ SET __node__:`%s` REMOVE __node__:`%s`} IN TRANSACTIONS%s",
queryStart(), to, from, cypherBatchSpec());
return new SqlStatement[]{new RawSqlStatement(cypher)};
}

@Override
protected SqlStatement[] generateUnbatchedStatements(Database database) {
String cypher = String.format("%s SET __node__:`%s` REMOVE __node__:`%s`", queryStart(), to, from);
return new SqlStatement[]{new RawSqlStatement(cypher)};
}
Expand All @@ -116,14 +92,6 @@ public void setTo(String to) {
this.to = to;
}

public Long getBatchSize() {
return batchSize;
}

public void setBatchSize(Long batchSize) {
this.batchSize = batchSize;
}

public String getFragment() {
return fragment;
}
Expand All @@ -139,15 +107,6 @@ public String getOutputVariable() {
public void setOutputVariable(String outputVariable) {
this.outputVariable = outputVariable;
}

public void setEnableBatchImport(Boolean enableBatchImport) {
this.enableBatchImport = enableBatchImport;
}

public Boolean isEnableBatchImport() {
return enableBatchImport;
}

private String queryStart() {
if (fragment != null) {
return String.format("MATCH %s WITH %s AS __node__ WHERE __node__:`%s`", fragment, outputVariable, from);
Expand Down
Loading

0 comments on commit e0eee3f

Please sign in to comment.