Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server/kv] Fix out of order exception after delete a not exist row #238

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,39 @@ void testPutAndLookup() throws Exception {
verifyPutAndLookup(table2, schema, new Object[] {"a", 1});
}

@Test
void testDeleteNotExistRow() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_delete_not_exist_row");
TableDescriptor tableDescriptor = DATA1_TABLE_INFO_PK.getTableDescriptor();
Schema schema = tableDescriptor.getSchema();
createTable(tablePath, tableDescriptor, false);

try (Table table = conn.getTable(tablePath)) {
UpsertWriter upsertWriter = table.getUpsertWriter();
// delete a non-exist row
InternalRow row1 =
compactedRow(tableDescriptor.getSchema().toRowType(), new Object[] {1, "2"});
upsertWriter.delete(row1).get();
// then insert the row
upsertWriter.upsert(row1).get();

// delete a non-exist row again
InternalRow row2 =
compactedRow(tableDescriptor.getSchema().toRowType(), new Object[] {2, "2"});
upsertWriter.delete(row2).get();
upsertWriter.upsert(row2).get();

// look up the rows
RowType rowType = DATA1_SCHEMA_PK.toRowType();
assertThatRow(table.lookup(keyRow(schema, new Object[] {1, "2"})).get().getRow())
.withSchema(rowType)
.isEqualTo(row1);
assertThatRow(table.lookup(keyRow(schema, new Object[] {2, "2"})).get().getRow())
.withSchema(rowType)
.isEqualTo(row2);
}
}

@Test
void testLookupForNotReadyTable() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ public LogAppendInfo putAsLeader(
ValueDecoder valueDecoder =
new ValueDecoder(readContext.getRowDecoder(schemaId));

int appendedRecordCount = 0;
for (KvRecord kvRecord : kvRecords.records(readContext)) {
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
Expand All @@ -278,13 +277,11 @@ public LogAppendInfo putAsLeader(
// if newRow is null, it means the row should be deleted
if (newRow == null) {
walBuilder.append(RowKind.DELETE, oldRow);
appendedRecordCount += 1;
kvPreWriteBuffer.delete(key, logOffset++);
} else {
// otherwise, it's a partial update, should produce -U,+U
walBuilder.append(RowKind.UPDATE_BEFORE, oldRow);
walBuilder.append(RowKind.UPDATE_AFTER, newRow);
appendedRecordCount += 2;
kvPreWriteBuffer.put(
key,
ValueEncoder.encodeValue(schemaId, newRow),
Expand All @@ -302,7 +299,6 @@ public LogAppendInfo putAsLeader(
updateRow(oldRow, kvRecord.getRow(), partialUpdater);
walBuilder.append(RowKind.UPDATE_BEFORE, oldRow);
walBuilder.append(RowKind.UPDATE_AFTER, newRow);
appendedRecordCount += 2;
// logOffset is for -U, logOffset + 1 is for +U, we need to use
// the log offset for +U
kvPreWriteBuffer.put(
Expand All @@ -316,30 +312,14 @@ public LogAppendInfo putAsLeader(
// of the input row are set to null.
BinaryRow newRow = kvRecord.getRow();
walBuilder.append(RowKind.INSERT, newRow);
appendedRecordCount += 1;
kvPreWriteBuffer.put(
key,
ValueEncoder.encodeValue(schemaId, newRow),
logOffset++);
}
}
}

// if appendedRecordCount is 0, it means there is no record to append, we
// should not append.
if (appendedRecordCount > 0) {
// now, we can build the full log.
return logTablet.appendAsLeader(walBuilder.build());
} else {
return new LogAppendInfo(
logEndOffsetOfPrevBatch - 1,
logEndOffsetOfPrevBatch - 1,
0L,
0L,
0,
0,
false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @swuferhong , do you remember what cases to fix when we introduced this?

Besides, could you help to review this PR? How kafka client/server handle the sequence id if the produce messages are empty.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to fix the exception LogSegmentOffsetOverflowException when append empty log in method LogSegment#ensureOffsetInRange since the largestOffset will be 0. So I skip to append empty record to avoid that but it will cause replica's writer state out-of-sync.

Let's to see how Kafka solve this....

}
return logTablet.appendAsLeader(walBuilder.build());
} catch (Throwable t) {
// While encounter error here, the CDC logs may fail writing to disk,
// and the client probably will resend the batch. If we do not remove the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,14 +619,18 @@ private LogAppendInfo append(MemoryLogRecords records, boolean needAssignOffsetA
appendInfo.setMaxTimestamp(duplicatedBatch.timestamp);
appendInfo.setStartOffsetOfMaxTimestamp(startOffset);
} else {
// Append the records, and increment the local log end offset immediately after
// append because write to the transaction index below may fail, and we want to
// ensure that the offsets of future appends still grow monotonically.
localLog.append(
appendInfo.lastOffset(),
appendInfo.maxTimestamp(),
appendInfo.startOffsetOfMaxTimestamp(),
validRecords);
// if there are records to append
if (appendInfo.lastOffset() >= appendInfo.firstOffset()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping writing log has problem, because writer state of replicas is out-of-sync with leader.

Besides, this if condition doesn't take effect, because, this method returned at the beginning as appendInfo.shallowCount() == 0.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't noticed that the replica's writer state will be out-of-sync...
Btw, this method won't return at the beginning as appendInfo.shallowCount() won't be equal to 0 in here.....

// Append the records, and increment the local log end offset immediately after
// append because write to the transaction index below may fail, and we want to
// ensure that the offsets of future appends still grow monotonically.
localLog.append(
appendInfo.lastOffset(),
appendInfo.maxTimestamp(),
appendInfo.startOffsetOfMaxTimestamp(),
validRecords);
}

updateHighWatermarkWithLogEndOffset();

// update the writer state.
Expand Down