Skip to content
This repository has been archived by the owner on Dec 22, 2021. It is now read-only.

Changed timestamp zone to UTC. Force failure on CT version reset. Added statement list configuration #26

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>io.cdap.plugin</groupId>
<artifactId>cdc-plugins</artifactId>
<packaging>jar</packaging>
<version>2.0.1-SNAPSHOT</version>
<version>2.0.6-SNAPSHOT</version>
<name>cdc-plugins</name>

<licenses>
Expand Down Expand Up @@ -91,7 +91,7 @@
<netty.version>4.1.16.Final</netty.version>
<junit.version>4.11</junit.version>
<!-- properties for script build step that creates the config files for the artifacts -->
<app.parents>system:cdap-data-streams[6.0.0,7.0.0)</app.parents>
<app.parents>system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0)</app.parents>
<main.basedir>${project.basedir}</main.basedir>

<skip.performance.test>true</skip.performance.test>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,22 @@ public class CTInputDStream extends InputDStream<StructuredRecord> {
private long trackingOffset;
private long failureStartTime;
private boolean isFailing;
private boolean forceFailure;
private final Set<String> operationsList;
private int retentionDays;
private String retentionColumn;

CTInputDStream(StreamingContext ssc, JdbcRDD.ConnectionFactory connectionFactory,
Set<String> tableWhitelist, long startingOffset, long maxRetrySeconds, int maxBatchSize) {
CTInputDStream(StreamingContext ssc, JdbcRDD.ConnectionFactory connectionFactory, Set<String> operationsList,
int retentionDays, String retentionColumn, Set<String> tableWhitelist, long startingOffset,
long maxRetrySeconds, int maxBatchSize) {
super(ssc, ClassTag$.MODULE$.apply(StructuredRecord.class));
this.connectionFactory = connectionFactory;
this.maxRetrySeconds = maxRetrySeconds;
this.maxBatchSize = maxBatchSize;
this.tableWhitelist = Collections.unmodifiableSet(new HashSet<>(tableWhitelist));
this.operationsList = Collections.unmodifiableSet(new HashSet<>(operationsList));
this.retentionDays = retentionDays;
this.retentionColumn = retentionColumn;
// if not current tracking version is given initialize it to 0
trackingOffset = startingOffset;
}
Expand All @@ -90,7 +98,7 @@ public Option<RDD<StructuredRecord>> compute(Time validTime) {

private boolean shouldFail() {
long timeElapsed = nowInSeconds() - failureStartTime;
return maxRetrySeconds == 0 || (isFailing && timeElapsed > maxRetrySeconds);
return forceFailure || maxRetrySeconds == 0 || (isFailing && timeElapsed > maxRetrySeconds);
}

private static long nowInSeconds() {
Expand All @@ -113,7 +121,16 @@ private Option<RDD<StructuredRecord>> doCompute(Time validTime) throws Exception
// retrieve the current highest tracking version
long prev = trackingOffset;
long cur = Math.min(getCurrentTrackingVersion(connection), prev + maxBatchSize);
LOG.info("Fetching changes from {} to {}", prev, cur);
if (prev != cur) {
LOG.info("Fetching changes from {} to {}", prev, cur);
}

if (cur < prev) {
this.forceFailure = true;
throw Throwables.propagate(new Exception("Current CT version is less than the previous",
new Error(String.format("Previous CT version: %s Current CT version: %s", prev, cur))));
}

// get all the data changes (DML) for the ct enabled tables till current tracking version
for (TableInformation tableInformation : tableInformations) {
changeRDDs.add(getChangeData(tableInformation, prev, cur));
Expand Down Expand Up @@ -146,24 +163,37 @@ public void stop() {
}

private JavaRDD<StructuredRecord> getChangeData(TableInformation tableInformation, long prev, long cur) {
String retentionFilter = "";
if (!this.operationsList.contains("D") && this.retentionDays > 0 && this.retentionColumn != null) {
retentionFilter = String.format("OR [CT].[SYS_CHANGE_OPERATION] = 'D' " +
"AND [CT].%s > DATEADD(day, -%s, GETDATE()) ",
this.retentionColumn, this.retentionDays);
}

String stmt = String.format("SELECT [CT].[SYS_CHANGE_VERSION] as CHANGE_TRACKING_VERSION, " +
"[CT].[SYS_CHANGE_CREATION_VERSION], " +
"[CT].[SYS_CHANGE_OPERATION], " +
"CURRENT_TIMESTAMP as CDC_CURRENT_TIMESTAMP, " +
"SYSUTCDATETIME() as CDC_CURRENT_TIMESTAMP, " +
"%s, %s FROM [%s] (nolock) " +
"as [CI] RIGHT OUTER JOIN " +
"CHANGETABLE (CHANGES [%s], %s) as [CT] on %s where [CT]" +
".[SYS_CHANGE_VERSION] > ? " +
"and [CT].[SYS_CHANGE_VERSION] <= ? ORDER BY [CT]" +
".[SYS_CHANGE_VERSION]",
"CHANGETABLE (CHANGES [%s], %s) as [CT] on %s " +
"where [CT].[SYS_CHANGE_VERSION] > ? " +
"and [CT].[SYS_CHANGE_VERSION] <= ? " +
"and (" +
"[CT].[SYS_CHANGE_OPERATION] IN " +
"('" + String.join("','", this.operationsList) + "') " +
retentionFilter +
") " +
"ORDER BY [CT].[SYS_CHANGE_VERSION]",
getSelectColumns("CT", tableInformation.getPrimaryKeys()),
getSelectColumns("CI", tableInformation.getValueColumnNames()),
tableInformation.getName(), tableInformation.getName(), prev,
getJoinCondition(tableInformation.getPrimaryKeys()));

LOG.debug("Querying for change data with statement {}", stmt);

//TODO Currently we are not partitioning the data. We should partition it for scalability
//TODO Currently we are not partitioning the data. We should p
//53™1artition it for scalability
return JdbcRDD.create(getJavaSparkContext(), connectionFactory, stmt, prev, cur, 1,
new ResultSetToDMLRecord(tableInformation));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws
LOG.info("Creating change information dstream");
ClassTag<StructuredRecord> tag = ClassTag$.MODULE$.apply(StructuredRecord.class);
CTInputDStream dstream = new CTInputDStream(context.getSparkStreamingContext().ssc(), connectionFactory,
conf.getTableWhitelist(), conf.getSequenceStartNum(),
conf.getMaxRetrySeconds(), conf.getMaxBatchSize());
conf.getOperationsList(), conf.getRetentionDays(), conf.getRetentionColumn(), conf.getTableWhitelist(),
conf.getSequenceStartNum(), conf.getMaxRetrySeconds(), conf.getMaxBatchSize());
return JavaDStream.fromDStream(dstream, tag)
.mapToPair(structuredRecord -> new Tuple2<>("", structuredRecord))
// map the dstream with schema state store to detect changes in schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig {
public static final String TABLE_WHITELIST = "tableWhitelist";
public static final String JDBC_PLUGIN_NAME = "jdbcPluginName";
public static final String CONNECTION_STRING = "connectionString";
public static final String OPERATIONS_LIST = "operationsList";
public static final String RETENTION_DAYS = "retentionDays";
public static final String RETENTION_COLUMN = "retentionColumn";

@Name(HOST_NAME)
@Description("SQL Server hostname. This is not required if a connection string was specified.")
Expand Down Expand Up @@ -104,6 +107,22 @@ public class CTSQLServerConfig extends CDCReferencePluginConfig {
@Nullable
private final String connectionString;

@Name(OPERATIONS_LIST)
@Description("The list of operations to read (INSERT,UPDATE,DELETE)")
@Nullable
private final String operationsList;

@Name(RETENTION_DAYS)
@Description("Retention period for a table if DELETE operation is turned off. " +
"In this period all deletes will be imported. Set to 0 to ignore all deletes")
@Nullable
private final int retentionDays;

@Name(RETENTION_COLUMN)
@Description("Timestamp column to use for retention period (should be a part of Primary Key)")
@Nullable
private final String retentionColumn;

public CTSQLServerConfig() {
super("");
this.hostname = null;
Expand All @@ -117,6 +136,9 @@ public CTSQLServerConfig() {
this.tableWhitelist = null;
this.jdbcPluginName = null;
this.connectionString = null;
this.operationsList = null;
this.retentionDays = 0;
this.retentionColumn = null;
}

public String getHostname() {
Expand Down Expand Up @@ -170,6 +192,21 @@ public String getConnectionString() {
return String.format("jdbc:sqlserver://%s:%s;DatabaseName=%s", hostname, port, dbName);
}

public Set<String> getOperationsList() {
return operationsList == null ? Collections.emptySet() :
Arrays.stream(operationsList.split(",")).map(String::trim).collect(Collectors.toSet());
}


public int getRetentionDays() {
return retentionDays;
}

@Nullable
public String getRetentionColumn() {
return retentionColumn;
}

@Override
public void validate() {
super.validate();
Expand All @@ -194,5 +231,14 @@ public void validate() {
if (port != null && (port < 0 || port > 65535)) {
throw new InvalidConfigPropertyException("Port number should be in range 0-65535", PORT);
}

if (operationsList == null || operationsList.length() == 0) {
throw new InvalidConfigPropertyException("Operations list couldn't be empty", OPERATIONS_LIST);
}

if (retentionDays > 0 && retentionColumn == null) {
throw new InvalidConfigPropertyException("Please set the retention timestamp column", RETENTION_COLUMN);
}

}
}
39 changes: 39 additions & 0 deletions widgets/CTSQLServer-streamingsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,45 @@
{
"label": "Advanced",
"properties": [
{
"name": "operationsList",
"label": "Statements list",
"widget-type": "multi-select",
"widget-attributes": {
"delimiter": ",",
"defaultValue": [
"I",
"U"
],
"options": [
{
"id": "I",
"label": "INSERT"
},
{
"id": "U",
"label": "UPDATE"
},
{
"id": "D",
"label": "DELETE"
}
]
}
},
{
"widget-type": "textbox",
"label": "Retention period (days)",
"name": "retentionDays",
"widget-attributes": {
"default": "0"
}
},
{
"widget-type": "textbox",
"label": "Retention timestamp column",
"name": "retentionColumn"
},
{
"widget-type": "textbox",
"label": "Max Retry Seconds",
Expand Down