diff --git a/README.md b/README.md index 3c86f28..d151f47 100755 --- a/README.md +++ b/README.md @@ -415,6 +415,8 @@ Methods of kafka topic consumer object * key - the partitioner key that was specified, if one was specified. + * timestamp - the timestamp (ms granularity) it was written to the log + If an error is encountered the message will contain: * error - the kafka error string returned by *rd_kafka_err2str* @@ -453,6 +455,8 @@ Methods of kafka topic consumer object * key - the partitioner key that was specified, if one was specified. + * timestamp - the timestamp (ms granularity) it was written to the log + * *$topic* **consume_batch** *partition* *timeout* *count* *array* *code* Consume up to *count* messages or however many have come in less than that within *timeout* milliseconds. @@ -463,7 +467,7 @@ Methods of kafka topic consumer object This method returns number of rows processed, 0 if the end of the partition is reached. -* *$topic* **info** **topic** +* *$topic* **info** **name** Return the name of the topic. @@ -528,7 +532,7 @@ Requests events from the named topics. Topics may be regexps if they start with Remove the current subsciption list. -* *$subscriber* **assignments** +* *$subscriber* **assignment** List of topics and partitions assigned to this subscriber. @@ -573,25 +577,17 @@ Received Kafka Messages You specify the name of the array and if there isn't an error the array is populated with the following info from the message: -* payload - - The message payload. - -* partition - - The partition number. - -* key +* payload - The message payload. - The key will be provided if the (optional) key was specified when the message was produced. +* partition - The partition number. -* offset +* key - The key will be provided if the (optional) key was specified when the message was produced. - The offset. +* offset - The offset. -* topic +* topic - The name of the topic. - The name of the topic. +* timestamp - the log timestamp (milliseconds) Setting up a consumer --- diff --git a/generic/kafkatcl.c b/generic/kafkatcl.c index b41ce02..9fff1d7 100644 --- a/generic/kafkatcl.c +++ b/generic/kafkatcl.c @@ -3743,15 +3743,12 @@ kafkatcl_SubscriberEventCheckProc (ClientData clientData, int flags) { rd_kafka_message_t *message; Tcl_Interp *interp = kh->interp; - // polling with timeoutMS of 0 is nonblocking, which is ideal - // DON'T do this if you called rd_kafka_poll_set_consumer() - //rd_kafka_poll (kh->rk, 0); - // If we don't have a subscriber callback, leave subscriber messages alone. + // User must then explicitly read messages (via subscriber consume) frequently! if(!kh->subscriberCallback) return; - while((message = rd_kafka_consumer_poll(rk, 1))) { + while((message = rd_kafka_consumer_poll(rk, 0))) { rd_kafka_timestamp_type_t tstype; Tcl_WideInt timestamp = rd_kafka_message_timestamp(message, &tstype); Tcl_Obj *msgList = kafkatcl_message_to_tcl_list(interp, message, timestamp, tstype); @@ -3759,13 +3756,10 @@ kafkatcl_SubscriberEventCheckProc (ClientData clientData, int flags) { // We don't need this any more rd_kafka_message_destroy(message); -// fprintf(stderr, "%08lx: %s\n", (long)message, Tcl_GetString(msgList)); - - // Null message list means EOF - if(!msgList) - break; - - kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList); + if(msgList) { + // Note - this increments and decrements the refcount on msgList. + kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList); + } } } @@ -4419,8 +4413,12 @@ kafkatcl_createSubscriberObjectCommand (kafkatcl_objectClientData *ko, char *cmd return TCL_ERROR; } - // Need to do this OR call rd_kafka_poll() periodically. - rd_kafka_poll_set_consumer(rk); + // After this do not call rd_kafka_poll(), call rd_kafka_consumer_poll() instead + if(rd_kafka_poll_set_consumer(rk) != RD_KAFKA_RESP_ERR_NO_ERROR) { + // Only possible error is RD_KAFKA_RESP_ERR__UNKNOWN_GROUP + Tcl_SetObjResult (interp, Tcl_NewStringObj("Unexpected failure from rd_kafka_poll_set_consumer", -1)); + return TCL_ERROR; + } // finished kafka setup, save state