Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple threads processing #342

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from

Conversation

IlyaTsoi
Copy link
Contributor

I think, that the current code doesn't work properly. I have checked it by logs. It runs ClickhouseBatchRunnable sequently, instead of parallel. We should create a few ClickhouseBatchRunnable tasks by scheduleAtFixedRate method to make them work in parallel. Such behavior for single task is described in docs for ScheduledThreadPoolExecutor for method scheduleAtFixedRate.

IlyaTsoi and others added 3 commits October 24, 2023 23:49
It would be better if it didn't depend on the hard-coded <server>.<database>.<table> template
It runs ClickhouseBatchRunnable sequently, instead of parallel.
We should create a few ClickhouseBatchRunnable tasks by scheduleAtFixedRate method to make them work in parallel.
Such behavior for single task is described in docs for ScheduledThreadPoolExecutor for method scheduleAtFixedRate.
@AdamKatzDev
Copy link
Contributor

AdamKatzDev commented Nov 15, 2023

It is likely that this code will cause race conditions (e.g. records inserted/updated/removed in an order that is different from binlog), you will have to implement per topic locks to avoid that (and to avoid your issue here #350).

I've actually tried to implement a similar lock (though it wasn't needed it seems), in ClickHouseBatchRunnable:

final private ConcurrentHashMap<String, Lock> topicLocks;  //should be passed in constructor and shared

...

for (Map.Entry<String, Queue<ClickHouseStruct>> entry : this.records.entrySet()) {
    String topicName = entry.getKey();
    Queue<ClickHouseStruct> queue = entry.getValue();
    if (!queue.isEmpty()) {
        if (!this.topicLocks.containsKey(topicName)) {
            Lock lock = new ReentrantLock();
            this.topicLocks.putIfAbsent(topicName, lock);
        }

        Lock lock = this.topicLocks.get(topicName);
        boolean isLockAcquired = lock.tryLock();
        if (isLockAcquired) {
            try {
               processRecordsByTopic(entry.getKey(), queue);
            } finally {
                lock.unlock();
            }
        }
    }
}

@IlyaTsoi
Copy link
Contributor Author

IlyaTsoi commented Nov 23, 2023

@AdamKatzDev I think the order of insertions is not important. There is a version column that helps Clickhouse determine the order in which rows are changed. Unfortunately, locks won't make the queue processing faster

@AdamKatzDev
Copy link
Contributor

@IlyaTsoi I couldn't figure out the moment when the version column is generated, version correctness itself might depend on the order records are processed. There is also an issue caused by records coming before or after a DDL (failures injecting records with different schema, lost or corrupted data after truncates).

@aadant @subkanthi could you please tell at what moment row version generated and how?

@IlyaTsoi
Copy link
Contributor Author

IlyaTsoi commented Nov 23, 2023

@AdamKatzDev If I'm not mistaken, this value is taken from the debezium message field ts_ms. From debezium docs for mysql connector:

In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.

You are right about DDL handling. I haven't explored the current logic yet)

@AdamKatzDev
Copy link
Contributor

@IlyaTsoi if you are correct then this version control won't work for very hot data.

https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree

If two inserted rows have the same version number, the last inserted row is the one kept.

Chances of the same row updating in the same millisecond can be quite high depending on the load.

@aadant
Copy link
Collaborator

aadant commented Nov 23, 2023

@IlyaTsoi @AdamKatzDev for the MySQL case the version is increasing as it contains a snowflakeID + a counter coming from the GTID.
You would need to get the equivalent for SQL server or postgres

Regarding the DDL, all table writers are flushed before the DDL is applied with the sink-connector-lightweight. @subkanthi can confirm.

If you have a test case that is failing or losing data, please report it as a separate issue ideally with a test case. Thanks !

@AdamKatzDev
Copy link
Contributor

@aadant

@IlyaTsoi @AdamKatzDev for the MySQL case the version is increasing as it contains a snowflakeID + a counter coming from the GTID.

So it looks like it is safe to insert data in an arbitrary order. If there are same millisecond collisions then GTID should take care of this.

Regarding the DDL, all table writers are flushed before the DDL is applied with the sink-connector-lightweight. @subkanthi can confirm.

I remember that code. Looks safe.

@IlyaTsoi looks like DDL operations are not a problem after all. MySQL replication is not an issue too since it is safe to insert rows in any order as you assumed. The only issue is other databases as @aadant mentioned, their version value is just a millisecond timestamp.

if(columnNameToIndexMap.containsKey(versionColumn)) {
if (record.getGtid() != -1) {
if(this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.SNOWFLAKE_ID.toString())) {
ps.setLong(columnNameToIndexMap.get(versionColumn), SnowFlakeId.generate(record.getTs_ms(), record.getGtid()));
} else {
ps.setLong(columnNameToIndexMap.get(versionColumn), record.getGtid());
}
} else {
ps.setLong(columnNameToIndexMap.get(versionColumn), record.getTs_ms());
}

It looks like there is an equivalent for GTID in both PostgreSQL and SQL Sever called LSN and Commit LSN respectively that could be used instead.
At worst case scenario this multi-threading mechanism should only be enabled for MySQL.

@IlyaTsoi
Copy link
Contributor Author

It looks like there is an equivalent for GTID in both PostgreSQL and SQL Sever called LSN and Commit LSN respectively that could be used instead.

Hm, it should be checked. Would be nice if it worked)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants