Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKCORE-81: Experiment with read-committed share groups #1146

Open
wants to merge 2 commits into
base: kip-932
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
Expand All @@ -26,6 +27,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
Expand All @@ -39,11 +41,15 @@

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;

/**
* {@link ShareCompletedFetch} represents a {@link RecordBatch batch} of {@link Record records}
Expand All @@ -56,11 +62,14 @@ public class ShareCompletedFetch {

final TopicIdPartition partition;
final ShareFetchResponseData.PartitionData partitionData;
final IsolationLevel isolationLevel;
final short requestVersion;

private final Logger log;
private final BufferSupplier decompressionBufferSupplier;
private final Iterator<? extends RecordBatch> batches;
private final Set<Long> abortedProducerIds;
private final PriorityQueue<ShareFetchResponseData.AbortedTransaction> abortedTransactions;
private RecordBatch currentBatch;
private Record lastRecord;
private CloseableIterator<Record> records;
Expand All @@ -75,14 +84,18 @@ public class ShareCompletedFetch {
final BufferSupplier decompressionBufferSupplier,
final TopicIdPartition partition,
final ShareFetchResponseData.PartitionData partitionData,
final IsolationLevel isolationLevel,
final short requestVersion) {
this.log = logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.partition = partition;
this.partitionData = partitionData;
this.isolationLevel = isolationLevel;
this.requestVersion = requestVersion;
this.batches = ShareFetchResponse.recordsOrFail(partitionData).batches().iterator();
this.acquiredRecordList = buildAcquiredRecordList(partitionData.acquiredRecords());
this.abortedProducerIds = new HashSet<>();
this.abortedTransactions = abortedTransactions(partitionData);
}

private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) {
Expand Down Expand Up @@ -302,6 +315,20 @@ private Record nextFetchedRecord(final boolean checkCrcs) {
currentBatch = batches.next();
maybeEnsureValid(currentBatch, checkCrcs);

if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
consumeAbortedTransactionsUpTo(currentBatch.lastOffset());

long producerId = currentBatch.producerId();
if (containsAbortMarker(currentBatch)) {
abortedProducerIds.remove(producerId);
} else if (isBatchAborted(currentBatch)) {
log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
"offsets {} to {}",
partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
continue;
}
}

records = currentBatch.streamingIterator(decompressionBufferSupplier);
} else {
Record record = records.next();
Expand Down Expand Up @@ -348,6 +375,43 @@ private void maybeCloseRecordStream() {
}
}

private void consumeAbortedTransactionsUpTo(long offset) {
if (abortedTransactions == null)
return;

while (!abortedTransactions.isEmpty() && abortedTransactions.peek().firstOffset() <= offset) {
ShareFetchResponseData.AbortedTransaction abortedTransaction = abortedTransactions.poll();
abortedProducerIds.add(abortedTransaction.producerId());
}
}

private boolean isBatchAborted(RecordBatch batch) {
return batch.isTransactional() && abortedProducerIds.contains(batch.producerId());
}

private PriorityQueue<ShareFetchResponseData.AbortedTransaction> abortedTransactions(ShareFetchResponseData.PartitionData partition) {
if (partition.abortedTransactions() == null || partition.abortedTransactions().isEmpty())
return null;

PriorityQueue<ShareFetchResponseData.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
partition.abortedTransactions().size(), Comparator.comparingLong(ShareFetchResponseData.AbortedTransaction::firstOffset)
);
abortedTransactions.addAll(partition.abortedTransactions());
return abortedTransactions;
}

private boolean containsAbortMarker(RecordBatch batch) {
if (!batch.isControlBatch())
return false;

Iterator<Record> batchIterator = batch.iterator();
if (!batchIterator.hasNext())
return false;

Record firstRecord = batchIterator.next();
return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
}

private static class OffsetAndDeliveryCount {
final long offset;
final short deliveryCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -51,7 +52,7 @@
* represent the {@link SubscriptionState#fetchablePartitions(Predicate)} based on the share group
* consumer's assignment.
*/
public class ShareFetchRequestManager implements RequestManager, MemberStateListener {
public class ShareFetchRequestManager implements RequestManager, ShareMemberStateListener {

private final Logger log;
private final LogContext logContext;
Expand All @@ -65,6 +66,7 @@ public class ShareFetchRequestManager implements RequestManager, MemberStateList
private final FetchMetricsManager metricsManager;
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
private Uuid memberId;
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;

ShareFetchRequestManager(final LogContext logContext,
final String groupId,
Expand Down Expand Up @@ -215,6 +217,7 @@ private void handleShareFetchSuccess(Node fetchTarget,
BufferSupplier.create(),
partition,
partitionData,
isolationLevel,
requestVersion);
shareFetchBuffer.add(completedFetch);
shareFetchBuffer.handleAcknowledgementResponses(partition, Errors.forCode(partitionData.acknowledgeErrorCode()));
Expand Down Expand Up @@ -333,13 +336,16 @@ public void close() {
}

@Override
public void onMemberEpochUpdated(Optional<Integer> memberEpochOpt, Optional<String> memberIdOpt) {
public void onMemberEpochUpdated(Optional<Integer> memberEpochOpt, Optional<String> memberIdOpt, Optional<IsolationLevel> isolationLevelOpt) {
// Only set the memberID once for now - will handle changes in AKCORE-57
if (memberId == null) {
if (memberIdOpt.isPresent()) {
memberId = Uuid.fromString(memberIdOpt.get());
}
}
if (isolationLevelOpt.isPresent()) {
isolationLevel = isolationLevelOpt.get();
}
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.IsolationLevel;

import java.util.Optional;

/**
* Listener for getting notified of member ID and epoch changes.
*/
public interface ShareMemberStateListener {

/**
* Called whenever member ID or epoch change with new values received from the broker or
* cleared if the member is not part of the group anymore (when it gets fenced, leaves the
* group or fails).
*
* @param memberEpoch New member epoch received from the broker. Empty if the member is
* not part of the group anymore.
* @param memberId Current member ID. Empty if the member is not part of the group.
* @param isolationLevel Isolation level of the group. Empty if the member is not part
* of the group.
*/
void onMemberEpochUpdated(Optional<Integer> memberEpoch,
Optional<String> memberId,
Optional<IsolationLevel> isolationLevel);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -119,6 +120,12 @@ public class ShareMembershipManager implements RequestManager {
*/
private String memberId = "";

/**
* Isolation level of the share group, received in a heartbeat response when joining the
* group specified in {@link #groupId}
*/
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;

/**
* Current epoch of the member. It will be set to 0 by the member, and provided to the server
* on the heartbeat request, to join the group. It will be then maintained by the server,
Expand Down Expand Up @@ -197,7 +204,7 @@ public class ShareMembershipManager implements RequestManager {
* values received from the broker, or values cleared due to member leaving the group, getting
* fenced or failing).
*/
private final List<MemberStateListener> stateUpdatesListeners;
private final List<ShareMemberStateListener> stateUpdatesListeners;

/**
* Optional client telemetry reporter which sends client telemetry data to the broker. This
Expand Down Expand Up @@ -306,7 +313,7 @@ public void onHeartbeatResponseReceived(ShareGroupHeartbeatResponseData response
}

this.memberId = response.memberId();
updateMemberEpoch(response.memberEpoch());
updateMemberEpoch(response.memberEpoch(), IsolationLevel.forId(response.isolationLevel()));

ShareGroupHeartbeatResponseData.Assignment assignment = response.assignment();

Expand Down Expand Up @@ -412,7 +419,7 @@ public void transitionToFatal() {
MemberState previousState = state;
transitionTo(MemberState.FATAL);
log.error("Member {} with epoch {} transitioned to {} state", memberId, memberEpoch, MemberState.FATAL);
notifyEpochChange(Optional.empty(), Optional.empty());
notifyEpochChange(Optional.empty(), Optional.empty(), Optional.empty());

if (previousState == MemberState.UNSUBSCRIBED) {
log.debug("Member {} with epoch {} got fatal error from the broker but it already " +
Expand Down Expand Up @@ -546,7 +553,7 @@ void transitionToSendingLeaveGroup() {
memberId);
return;
}
updateMemberEpoch(ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH);
updateMemberEpoch(ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, IsolationLevel.READ_UNCOMMITTED);
currentAssignment = new HashMap<>();
transitionTo(MemberState.LEAVING);
}
Expand All @@ -556,8 +563,8 @@ void transitionToSendingLeaveGroup() {
* This also includes the latest member ID in the notification. If the member fails or leaves
* the group, this will be invoked with empty epoch and member ID.
*/
private void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
private void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId, Optional<IsolationLevel> isolationLevel) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId, isolationLevel));
}

/**
Expand Down Expand Up @@ -956,19 +963,20 @@ private void clearPendingAssignmentsAndLocalNamesCache() {
}

private void resetEpoch() {
updateMemberEpoch(ShareGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH);
updateMemberEpoch(ShareGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH, IsolationLevel.READ_UNCOMMITTED);
}

private void updateMemberEpoch(int newEpoch) {
private void updateMemberEpoch(int newEpoch, IsolationLevel newIsolationLevel) {
boolean newEpochReceived = this.memberEpoch != newEpoch;
this.memberEpoch = newEpoch;
isolationLevel = newIsolationLevel;
// Simply notify based on epoch change only, given that the member will never receive a
// new member ID without an epoch (member ID is only assigned when it joins the group).
if (newEpochReceived) {
if (memberEpoch > 0) {
notifyEpochChange(Optional.of(memberEpoch), Optional.ofNullable(memberId));
notifyEpochChange(Optional.of(memberEpoch), Optional.ofNullable(memberId), Optional.of(isolationLevel));
} else {
notifyEpochChange(Optional.empty(), Optional.empty());
notifyEpochChange(Optional.empty(), Optional.empty(), Optional.empty());
}
}
}
Expand Down Expand Up @@ -1038,7 +1046,7 @@ boolean reconciliationInProgress() {
*
* @param listener Listener to invoke.
*/
public void registerStateListener(MemberStateListener listener) {
public void registerStateListener(ShareMemberStateListener listener) {
if (listener == null) {
throw new IllegalArgumentException("State updates listener cannot be null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@
"The last offset of this batch of acquired records." },
{ "name": "DeliveryCount", "type": "int16", "versions": "0+",
"about": "The delivery count of this batch of acquired records." }
]},
{ "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "0+", "nullableVersions": "0+", "ignorable": true,
"about": "The aborted transactions.", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
"about": "The producer id associated with the aborted transaction." },
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "The first offset in the aborted transaction." }
]}
]}
]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
"about": "The member epoch." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
"about": "The heartbeat interval in milliseconds." },
{ "name": "IsolationLevel", "type": "int8", "versions": "0+", "default": "0", "ignorable": true,
"about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records" },
{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided; the assignment otherwise.", "fields": [
{ "name": "Error", "type": "int8", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
Expand Down Expand Up @@ -263,6 +264,7 @@ private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.Partit
BufferSupplier.create(),
TIP,
partitionData,
IsolationLevel.READ_UNCOMMITTED,
ApiKeys.SHARE_FETCH.latestVersion());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
Expand Down Expand Up @@ -152,6 +153,7 @@ private ShareCompletedFetch completedFetch(TopicIdPartition tp) {
BufferSupplier.create(),
tp,
partitionData,
IsolationLevel.READ_UNCOMMITTED,
ApiKeys.SHARE_FETCH.latestVersion());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -335,6 +336,7 @@ private ShareCompletedFetch build() {
BufferSupplier.create(),
topicAPartition0,
partitionData,
IsolationLevel.READ_UNCOMMITTED,
ApiKeys.SHARE_FETCH.latestVersion());
}
}
Expand Down
Loading