From 66f45e3a353449cba8e634e777d72f9bdae18c97 Mon Sep 17 00:00:00 2001 From: Dmytro Zagravskyi Date: Wed, 13 Nov 2019 15:37:56 +0200 Subject: [PATCH 1/2] Changed timestamp to UTC. Added statement list configuration --- pom.xml | 4 +-- .../cdc/source/sqlserver/CTInputDStream.java | 29 ++++++++++++++----- .../cdc/source/sqlserver/CTSQLServer.java | 5 ++-- .../source/sqlserver/CTSQLServerConfig.java | 17 +++++++++++ widgets/CTSQLServer-streamingsource.json | 26 +++++++++++++++++ 5 files changed, 69 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index 7aa0121..11e8908 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin cdc-plugins jar - 2.0.1-SNAPSHOT + 2.0.5-SNAPSHOT cdc-plugins @@ -91,7 +91,7 @@ 4.1.16.Final 4.11 - system:cdap-data-streams[6.0.0,7.0.0) + system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0) ${project.basedir} true diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java index 1947c23..f48c7d9 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java +++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java @@ -58,14 +58,17 @@ public class CTInputDStream extends InputDStream { private long trackingOffset; private long failureStartTime; private boolean isFailing; + private boolean forceFailure; + private final Set operationsList; - CTInputDStream(StreamingContext ssc, JdbcRDD.ConnectionFactory connectionFactory, + CTInputDStream(StreamingContext ssc, JdbcRDD.ConnectionFactory connectionFactory, Set operationsList, Set tableWhitelist, long startingOffset, long maxRetrySeconds, int maxBatchSize) { super(ssc, ClassTag$.MODULE$.apply(StructuredRecord.class)); this.connectionFactory = connectionFactory; this.maxRetrySeconds = maxRetrySeconds; this.maxBatchSize = maxBatchSize; this.tableWhitelist = Collections.unmodifiableSet(new HashSet<>(tableWhitelist)); + this.operationsList = Collections.unmodifiableSet(new HashSet<>(operationsList)); // if not current tracking version is given initialize it to 0 trackingOffset = startingOffset; } @@ -90,7 +93,7 @@ public Option> compute(Time validTime) { private boolean shouldFail() { long timeElapsed = nowInSeconds() - failureStartTime; - return maxRetrySeconds == 0 || (isFailing && timeElapsed > maxRetrySeconds); + return forceFailure || maxRetrySeconds == 0 || (isFailing && timeElapsed > maxRetrySeconds); } private static long nowInSeconds() { @@ -114,6 +117,13 @@ private Option> doCompute(Time validTime) throws Exception long prev = trackingOffset; long cur = Math.min(getCurrentTrackingVersion(connection), prev + maxBatchSize); LOG.info("Fetching changes from {} to {}", prev, cur); + + if (cur < prev) { + this.forceFailure = true; + throw Throwables.propagate(new Exception("Current CT version is less than the previous", + new Error(String.format("Previous CT version: %s Current CT version: %s", prev, cur)))); + } + // get all the data changes (DML) for the ct enabled tables till current tracking version for (TableInformation tableInformation : tableInformations) { changeRDDs.add(getChangeData(tableInformation, prev, cur)); @@ -149,13 +159,15 @@ private JavaRDD getChangeData(TableInformation tableInformatio String stmt = String.format("SELECT [CT].[SYS_CHANGE_VERSION] as CHANGE_TRACKING_VERSION, " + "[CT].[SYS_CHANGE_CREATION_VERSION], " + "[CT].[SYS_CHANGE_OPERATION], " + - "CURRENT_TIMESTAMP as CDC_CURRENT_TIMESTAMP, " + + "SYSUTCDATETIME() as CDC_CURRENT_TIMESTAMP, " + "%s, %s FROM [%s] (nolock) " + "as [CI] RIGHT OUTER JOIN " + - "CHANGETABLE (CHANGES [%s], %s) as [CT] on %s where [CT]" + - ".[SYS_CHANGE_VERSION] > ? " + - "and [CT].[SYS_CHANGE_VERSION] <= ? ORDER BY [CT]" + - ".[SYS_CHANGE_VERSION]", + "CHANGETABLE (CHANGES [%s], %s) as [CT] on %s " + + "where [CT].[SYS_CHANGE_VERSION] > ? " + + "and [CT].[SYS_CHANGE_VERSION] <= ? " + + "and [CT].[SYS_CHANGE_OPERATION] IN " + + "('" + String.join("','", this.operationsList) + "') " + + "ORDER BY [CT].[SYS_CHANGE_VERSION]", getSelectColumns("CT", tableInformation.getPrimaryKeys()), getSelectColumns("CI", tableInformation.getValueColumnNames()), tableInformation.getName(), tableInformation.getName(), prev, @@ -163,7 +175,8 @@ private JavaRDD getChangeData(TableInformation tableInformatio LOG.debug("Querying for change data with statement {}", stmt); - //TODO Currently we are not partitioning the data. We should partition it for scalability + //TODO Currently we are not partitioning the data. We should p + //53™1artition it for scalability return JdbcRDD.create(getJavaSparkContext(), connectionFactory, stmt, prev, cur, 1, new ResultSetToDMLRecord(tableInformation)); } diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java index e71b7e8..a539bbb 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java +++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java @@ -137,8 +137,9 @@ public JavaDStream getStream(StreamingContext context) throws LOG.info("Creating change information dstream"); ClassTag tag = ClassTag$.MODULE$.apply(StructuredRecord.class); CTInputDStream dstream = new CTInputDStream(context.getSparkStreamingContext().ssc(), connectionFactory, - conf.getTableWhitelist(), conf.getSequenceStartNum(), - conf.getMaxRetrySeconds(), conf.getMaxBatchSize()); + conf.getOperationsList(), conf.getTableWhitelist(), + conf.getSequenceStartNum(), conf.getMaxRetrySeconds(), + conf.getMaxBatchSize()); return JavaDStream.fromDStream(dstream, tag) .mapToPair(structuredRecord -> new Tuple2<>("", structuredRecord)) // map the dstream with schema state store to detect changes in schema diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java index 4f72bfa..4fcfa37 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java +++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java @@ -44,6 +44,7 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig { public static final String TABLE_WHITELIST = "tableWhitelist"; public static final String JDBC_PLUGIN_NAME = "jdbcPluginName"; public static final String CONNECTION_STRING = "connectionString"; + public static final String OPERATIONS_LIST = "operationsList"; @Name(HOST_NAME) @Description("SQL Server hostname. This is not required if a connection string was specified.") @@ -104,6 +105,11 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig { @Nullable private final String connectionString; + @Name(OPERATIONS_LIST) + @Description("The list of operations to read (INSERT,UPDATE,DELETE)") + @Nullable + private final String operationsList; + public CTSQLServerConfig() { super(""); this.hostname = null; @@ -117,6 +123,7 @@ public CTSQLServerConfig() { this.tableWhitelist = null; this.jdbcPluginName = null; this.connectionString = null; + this.operationsList = null; } public String getHostname() { @@ -170,6 +177,11 @@ public String getConnectionString() { return String.format("jdbc:sqlserver://%s:%s;DatabaseName=%s", hostname, port, dbName); } + public Set getOperationsList() { + return operationsList == null ? Collections.emptySet() : + Arrays.stream(operationsList.split(",")).map(String::trim).collect(Collectors.toSet()); + } + @Override public void validate() { super.validate(); @@ -194,5 +206,10 @@ public void validate() { if (port != null && (port < 0 || port > 65535)) { throw new InvalidConfigPropertyException("Port number should be in range 0-65535", PORT); } + + if (operationsList == null || operationsList.length() == 0) { + throw new InvalidConfigPropertyException("Operations list couldn't be empty", OPERATIONS_LIST); + } + } } diff --git a/widgets/CTSQLServer-streamingsource.json b/widgets/CTSQLServer-streamingsource.json index 3ff4c9b..b8ee5c9 100644 --- a/widgets/CTSQLServer-streamingsource.json +++ b/widgets/CTSQLServer-streamingsource.json @@ -76,6 +76,32 @@ { "label": "Advanced", "properties": [ + { + "name": "operationsList", + "label": "Statements list", + "widget-type": "multi-select", + "widget-attributes": { + "delimiter": ",", + "defaultValue": [ + "I", + "U" + ], + "options": [ + { + "id": "I", + "label": "INSERT" + }, + { + "id": "U", + "label": "UPDATE" + }, + { + "id": "D", + "label": "DELETE" + } + ] + } + }, { "widget-type": "textbox", "label": "Max Retry Seconds", From 475fdb23c817a6dc1f7b91bd205de6afd1762c71 Mon Sep 17 00:00:00 2001 From: Dmytro Zagravskyi Date: Wed, 4 Dec 2019 13:25:01 +0200 Subject: [PATCH 2/2] added retention period configuration --- pom.xml | 2 +- .../cdc/source/sqlserver/CTInputDStream.java | 25 +++++++++++++--- .../cdc/source/sqlserver/CTSQLServer.java | 5 ++-- .../source/sqlserver/CTSQLServerConfig.java | 29 +++++++++++++++++++ widgets/CTSQLServer-streamingsource.json | 13 +++++++++ 5 files changed, 66 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 11e8908..5a0f447 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin cdc-plugins jar - 2.0.5-SNAPSHOT + 2.0.6-SNAPSHOT cdc-plugins diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java index f48c7d9..842520f 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java +++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTInputDStream.java @@ -60,15 +60,20 @@ public class CTInputDStream extends InputDStream { private boolean isFailing; private boolean forceFailure; private final Set operationsList; + private int retentionDays; + private String retentionColumn; CTInputDStream(StreamingContext ssc, JdbcRDD.ConnectionFactory connectionFactory, Set operationsList, - Set tableWhitelist, long startingOffset, long maxRetrySeconds, int maxBatchSize) { + int retentionDays, String retentionColumn, Set tableWhitelist, long startingOffset, + long maxRetrySeconds, int maxBatchSize) { super(ssc, ClassTag$.MODULE$.apply(StructuredRecord.class)); this.connectionFactory = connectionFactory; this.maxRetrySeconds = maxRetrySeconds; this.maxBatchSize = maxBatchSize; this.tableWhitelist = Collections.unmodifiableSet(new HashSet<>(tableWhitelist)); this.operationsList = Collections.unmodifiableSet(new HashSet<>(operationsList)); + this.retentionDays = retentionDays; + this.retentionColumn = retentionColumn; // if not current tracking version is given initialize it to 0 trackingOffset = startingOffset; } @@ -116,7 +121,9 @@ private Option> doCompute(Time validTime) throws Exception // retrieve the current highest tracking version long prev = trackingOffset; long cur = Math.min(getCurrentTrackingVersion(connection), prev + maxBatchSize); - LOG.info("Fetching changes from {} to {}", prev, cur); + if (prev != cur) { + LOG.info("Fetching changes from {} to {}", prev, cur); + } if (cur < prev) { this.forceFailure = true; @@ -156,6 +163,13 @@ public void stop() { } private JavaRDD getChangeData(TableInformation tableInformation, long prev, long cur) { + String retentionFilter = ""; + if (!this.operationsList.contains("D") && this.retentionDays > 0 && this.retentionColumn != null) { + retentionFilter = String.format("OR [CT].[SYS_CHANGE_OPERATION] = 'D' " + + "AND [CT].%s > DATEADD(day, -%s, GETDATE()) ", + this.retentionColumn, this.retentionDays); + } + String stmt = String.format("SELECT [CT].[SYS_CHANGE_VERSION] as CHANGE_TRACKING_VERSION, " + "[CT].[SYS_CHANGE_CREATION_VERSION], " + "[CT].[SYS_CHANGE_OPERATION], " + @@ -165,8 +179,11 @@ private JavaRDD getChangeData(TableInformation tableInformatio "CHANGETABLE (CHANGES [%s], %s) as [CT] on %s " + "where [CT].[SYS_CHANGE_VERSION] > ? " + "and [CT].[SYS_CHANGE_VERSION] <= ? " + - "and [CT].[SYS_CHANGE_OPERATION] IN " + - "('" + String.join("','", this.operationsList) + "') " + + "and (" + + "[CT].[SYS_CHANGE_OPERATION] IN " + + "('" + String.join("','", this.operationsList) + "') " + + retentionFilter + + ") " + "ORDER BY [CT].[SYS_CHANGE_VERSION]", getSelectColumns("CT", tableInformation.getPrimaryKeys()), getSelectColumns("CI", tableInformation.getValueColumnNames()), diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java index a539bbb..47eedcb 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java +++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServer.java @@ -137,9 +137,8 @@ public JavaDStream getStream(StreamingContext context) throws LOG.info("Creating change information dstream"); ClassTag tag = ClassTag$.MODULE$.apply(StructuredRecord.class); CTInputDStream dstream = new CTInputDStream(context.getSparkStreamingContext().ssc(), connectionFactory, - conf.getOperationsList(), conf.getTableWhitelist(), - conf.getSequenceStartNum(), conf.getMaxRetrySeconds(), - conf.getMaxBatchSize()); + conf.getOperationsList(), conf.getRetentionDays(), conf.getRetentionColumn(), conf.getTableWhitelist(), + conf.getSequenceStartNum(), conf.getMaxRetrySeconds(), conf.getMaxBatchSize()); return JavaDStream.fromDStream(dstream, tag) .mapToPair(structuredRecord -> new Tuple2<>("", structuredRecord)) // map the dstream with schema state store to detect changes in schema diff --git a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java index 4fcfa37..99c6869 100644 --- a/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java +++ b/src/main/java/io/cdap/plugin/cdc/source/sqlserver/CTSQLServerConfig.java @@ -45,6 +45,8 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig { public static final String JDBC_PLUGIN_NAME = "jdbcPluginName"; public static final String CONNECTION_STRING = "connectionString"; public static final String OPERATIONS_LIST = "operationsList"; + public static final String RETENTION_DAYS = "retentionDays"; + public static final String RETENTION_COLUMN = "retentionColumn"; @Name(HOST_NAME) @Description("SQL Server hostname. This is not required if a connection string was specified.") @@ -110,6 +112,17 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig { @Nullable private final String operationsList; + @Name(RETENTION_DAYS) + @Description("Retention period for a table if DELETE operation is turned off. " + + "In this period all deletes will be imported. Set to 0 to ignore all deletes") + @Nullable + private final int retentionDays; + + @Name(RETENTION_COLUMN) + @Description("Timestamp column to use for retention period (should be a part of Primary Key)") + @Nullable + private final String retentionColumn; + public CTSQLServerConfig() { super(""); this.hostname = null; @@ -124,6 +137,8 @@ public CTSQLServerConfig() { this.jdbcPluginName = null; this.connectionString = null; this.operationsList = null; + this.retentionDays = 0; + this.retentionColumn = null; } public String getHostname() { @@ -182,6 +197,16 @@ public Set getOperationsList() { Arrays.stream(operationsList.split(",")).map(String::trim).collect(Collectors.toSet()); } + + public int getRetentionDays() { + return retentionDays; + } + + @Nullable + public String getRetentionColumn() { + return retentionColumn; + } + @Override public void validate() { super.validate(); @@ -211,5 +236,9 @@ public void validate() { throw new InvalidConfigPropertyException("Operations list couldn't be empty", OPERATIONS_LIST); } + if (retentionDays > 0 && retentionColumn == null) { + throw new InvalidConfigPropertyException("Please set the retention timestamp column", RETENTION_COLUMN); + } + } } diff --git a/widgets/CTSQLServer-streamingsource.json b/widgets/CTSQLServer-streamingsource.json index b8ee5c9..f26c8f4 100644 --- a/widgets/CTSQLServer-streamingsource.json +++ b/widgets/CTSQLServer-streamingsource.json @@ -102,6 +102,19 @@ ] } }, + { + "widget-type": "textbox", + "label": "Retention period (days)", + "name": "retentionDays", + "widget-attributes": { + "default": "0" + } + }, + { + "widget-type": "textbox", + "label": "Retention timestamp column", + "name": "retentionColumn" + }, { "widget-type": "textbox", "label": "Max Retry Seconds",