-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transactional API #78
Comments
We had a couple of discussions with @mr-swifter regarding potential transactional API. The easiest thing would be to implement calls "as is". However, there are some logic that might be used by more or less everyone such as:
Omitting insignificant implementation details and making it more abstract, we were using the following code in send/commit and partially in abort transaction. func commitTransaction(attempts: UInt, timeoutMs: Int = -1 /* wait until transaction.timeout.ms */) async -> Result<Void, KafkaError> {
for _ in 0..<attempts {
let error = await forBlockingFunc {
rd_kafka_commit_transaction(self.kafkaHandle, Int32(timeoutMs))
}
/* check if transaction is completed successfully */
if error == nil { return .success(()) }
/* destroy error in any case */
defer { rd_kafka_error_destroy(error) }
/* check if transaction is retriable and retry */
if rd_kafka_error_is_retriable(error) == 1 { continue }
/* check if transaction need to be aborted */
if rd_kafka_error_txn_requires_abort(error) == 1 {
// at this point we cannot retry, client code should begin transaction from scratch
let res = await abortTransaction()
return /* TransactionAbortedError/InconsistentStateError from `res` */
}
let description = String(cString: rd_kafka_error_string(error))
let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal error require producer restart
return /* Error + isFatal + description */
}
return /* out of attempts error */
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Transactional API is required to use Exactly Once Semantics provided by Kafka.
One of the ideas how to structure this:
KafkaTransactionalProducer
which supports regularKafkaProducer
API and transactional API (likebeginTransaction
,commitTransaction
,abortTransaction
etc. Maybe it's simpler to makeKafkaProducer
as class and inherit transactional producer from it to avoid code duplicationKafkaTransactionalProducer
should havesend
andsendOffset
within a transactionKafkaTransactionalProducer
should taketransactional.id
as parameter.KafkaTransactionalProducer
should callrd_kafka_init_transactions(...)
and make sure it's initialised and not fenced with others.KafkaTransactionalProducer
should handle retriable errors and tries to recover. Possible such errors also need to be delivered to optional callback (as notification)The text was updated successfully, but these errors were encountered: