From b0f4bf39ed1d1521a1aca4a821da6e657e9d84ea Mon Sep 17 00:00:00 2001 From: Peter da Silva Date: Wed, 22 Aug 2018 19:46:36 +0000 Subject: [PATCH 1/6] Add "timestamps" to returned message list --- README.md | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 3c86f28..1bf1920 100755 --- a/README.md +++ b/README.md @@ -528,7 +528,7 @@ Requests events from the named topics. Topics may be regexps if they start with Remove the current subsciption list. -* *$subscriber* **assignments** +* *$subscriber* **assignment** List of topics and partitions assigned to this subscriber. @@ -573,25 +573,17 @@ Received Kafka Messages You specify the name of the array and if there isn't an error the array is populated with the following info from the message: -* payload +* payload - The message payload. - The message payload. +* partition - The partition number. -* partition +* key - The key will be provided if the (optional) key was specified when the message was produced. - The partition number. +* offset - The offset. -* key +* topic - The name of the topic. - The key will be provided if the (optional) key was specified when the message was produced. - -* offset - - The offset. - -* topic - - The name of the topic. +* timestamp - the log timestamp (milliseconds) Setting up a consumer --- From bc4f0d6cdedb03f407ebb9b04b85d19d8416960c Mon Sep 17 00:00:00 2001 From: Peter da Silva Date: Wed, 22 Aug 2018 20:04:22 +0000 Subject: [PATCH 2/6] Update timestamp for rest of APIs --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 1bf1920..48c8871 100755 --- a/README.md +++ b/README.md @@ -415,6 +415,8 @@ Methods of kafka topic consumer object * key - the partitioner key that was specified, if one was specified. + * timestamp - the timestamp (ms granularity) it was written to the log + If an error is encountered the message will contain: * error - the kafka error string returned by *rd_kafka_err2str* @@ -453,6 +455,8 @@ Methods of kafka topic consumer object * key - the partitioner key that was specified, if one was specified. + * timestamp - the timestamp (ms granularity) it was written to the log + * *$topic* **consume_batch** *partition* *timeout* *count* *array* *code* Consume up to *count* messages or however many have come in less than that within *timeout* milliseconds. From 6ec89d168e4ea5b318c474ff57de7e31fba40ee4 Mon Sep 17 00:00:00 2001 From: Peter da Silva Date: Wed, 22 Aug 2018 20:17:01 +0000 Subject: [PATCH 3/6] Fix docs - consumer/producer topic object info command was wrong. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 48c8871..d151f47 100755 --- a/README.md +++ b/README.md @@ -467,7 +467,7 @@ Methods of kafka topic consumer object This method returns number of rows processed, 0 if the end of the partition is reached. -* *$topic* **info** **topic** +* *$topic* **info** **name** Return the name of the topic. From ab2e789e4926804603519a8285665337a81b94f7 Mon Sep 17 00:00:00 2001 From: Peter da Silva Date: Fri, 7 Sep 2018 19:52:04 +0000 Subject: [PATCH 4/6] Keep informative comment. --- generic/kafkatcl.c | 1 + 1 file changed, 1 insertion(+) diff --git a/generic/kafkatcl.c b/generic/kafkatcl.c index b849f59..8256462 100644 --- a/generic/kafkatcl.c +++ b/generic/kafkatcl.c @@ -3738,6 +3738,7 @@ kafkatcl_SubscriberEventCheckProc (ClientData clientData, int flags) { if(!msgList) break; + // Note - this increments and decrements the refcount on msgList. kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList); } } From ffcdff6031423aba3f8acdaeb780ab5ea440d298 Mon Sep 17 00:00:00 2001 From: Peter da Silva Date: Tue, 11 Sep 2018 15:43:05 +0000 Subject: [PATCH 5/6] Keep looking for work after EOF. --- generic/kafkatcl.c | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/generic/kafkatcl.c b/generic/kafkatcl.c index 8256462..7be2f16 100644 --- a/generic/kafkatcl.c +++ b/generic/kafkatcl.c @@ -3716,15 +3716,12 @@ kafkatcl_SubscriberEventCheckProc (ClientData clientData, int flags) { rd_kafka_message_t *message; Tcl_Interp *interp = kh->interp; - // polling with timeoutMS of 0 is nonblocking, which is ideal - // DON'T do this if you called rd_kafka_poll_set_consumer() - //rd_kafka_poll (kh->rk, 0); - // If we don't have a subscriber callback, leave subscriber messages alone. + // User must then explicitly read messages (via subscriber consume) frequently! if(!kh->subscriberCallback) return; - while((message = rd_kafka_consumer_poll(rk, 1))) { + while((message = rd_kafka_consumer_poll(rk, 0))) { rd_kafka_timestamp_type_t tstype; Tcl_WideInt timestamp = rd_kafka_message_timestamp(message, &tstype); Tcl_Obj *msgList = kafkatcl_message_to_tcl_list(interp, message, timestamp, tstype); @@ -3732,14 +3729,10 @@ kafkatcl_SubscriberEventCheckProc (ClientData clientData, int flags) { // We don't need this any more rd_kafka_message_destroy(message); -// fprintf(stderr, "%08lx: %s\n", (long)message, Tcl_GetString(msgList)); - - // Null message list means EOF - if(!msgList) - break; - - // Note - this increments and decrements the refcount on msgList. - kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList); + if(msgList) { + // Note - this increments and decrements the refcount on msgList. + kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList); + } } } From 9b78874c6ce98d24ff1d6d68a7157c899d5c9c66 Mon Sep 17 00:00:00 2001 From: Peter da Silva Date: Tue, 11 Sep 2018 15:43:34 +0000 Subject: [PATCH 6/6] Check for failure of rd_kafka_poll_set_consumer() --- generic/kafkatcl.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/generic/kafkatcl.c b/generic/kafkatcl.c index 7be2f16..0e6eaca 100644 --- a/generic/kafkatcl.c +++ b/generic/kafkatcl.c @@ -4386,8 +4386,12 @@ kafkatcl_createSubscriberObjectCommand (kafkatcl_objectClientData *ko, char *cmd return TCL_ERROR; } - // Need to do this OR call rd_kafka_poll() periodically. - rd_kafka_poll_set_consumer(rk); + // After this do not call rd_kafka_poll(), call rd_kafka_consumer_poll() instead + if(rd_kafka_poll_set_consumer(rk) != RD_KAFKA_RESP_ERR_NO_ERROR) { + // Only possible error is RD_KAFKA_RESP_ERR__UNKNOWN_GROUP + Tcl_SetObjResult (interp, Tcl_NewStringObj("Unexpected failure from rd_kafka_poll_set_consumer", -1)); + return TCL_ERROR; + } // finished kafka setup, save state