Skip to content

Commit

Permalink
Merge branch 'master' into creator-subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
resuna committed Nov 26, 2018
2 parents 81cad5c + 9b78874 commit a9deb8b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 30 deletions.
28 changes: 12 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ Methods of kafka topic consumer object

* key - the partitioner key that was specified, if one was specified.

* timestamp - the timestamp (ms granularity) it was written to the log

If an error is encountered the message will contain:

* error - the kafka error string returned by *rd_kafka_err2str*
Expand Down Expand Up @@ -453,6 +455,8 @@ Methods of kafka topic consumer object

* key - the partitioner key that was specified, if one was specified.

* timestamp - the timestamp (ms granularity) it was written to the log

* *$topic* **consume_batch** *partition* *timeout* *count* *array* *code*

Consume up to *count* messages or however many have come in less than that within *timeout* milliseconds.
Expand All @@ -463,7 +467,7 @@ Methods of kafka topic consumer object

This method returns number of rows processed, 0 if the end of the partition is reached.

* *$topic* **info** **topic**
* *$topic* **info** **name**

Return the name of the topic.

Expand Down Expand Up @@ -528,7 +532,7 @@ Requests events from the named topics. Topics may be regexps if they start with

Remove the current subsciption list.

* *$subscriber* **assignments**
* *$subscriber* **assignment**

List of topics and partitions assigned to this subscriber.

Expand Down Expand Up @@ -573,25 +577,17 @@ Received Kafka Messages

You specify the name of the array and if there isn't an error the array is populated with the following info from the message:

* payload

The message payload.

* partition

The partition number.

* key
* payload - The message payload.

The key will be provided if the (optional) key was specified when the message was produced.
* partition - The partition number.

* offset
* key - The key will be provided if the (optional) key was specified when the message was produced.

The offset.
* offset - The offset.

* topic
* topic - The name of the topic.

The name of the topic.
* timestamp - the log timestamp (milliseconds)

Setting up a consumer
---
Expand Down
26 changes: 12 additions & 14 deletions generic/kafkatcl.c
Original file line number Diff line number Diff line change
Expand Up @@ -3743,29 +3743,23 @@ kafkatcl_SubscriberEventCheckProc (ClientData clientData, int flags) {
rd_kafka_message_t *message;
Tcl_Interp *interp = kh->interp;

// polling with timeoutMS of 0 is nonblocking, which is ideal
// DON'T do this if you called rd_kafka_poll_set_consumer()
//rd_kafka_poll (kh->rk, 0);

// If we don't have a subscriber callback, leave subscriber messages alone.
// User must then explicitly read messages (via subscriber consume) frequently!
if(!kh->subscriberCallback)
return;

while((message = rd_kafka_consumer_poll(rk, 1))) {
while((message = rd_kafka_consumer_poll(rk, 0))) {
rd_kafka_timestamp_type_t tstype;
Tcl_WideInt timestamp = rd_kafka_message_timestamp(message, &tstype);
Tcl_Obj *msgList = kafkatcl_message_to_tcl_list(interp, message, timestamp, tstype);

// We don't need this any more
rd_kafka_message_destroy(message);

// fprintf(stderr, "%08lx: %s\n", (long)message, Tcl_GetString(msgList));

// Null message list means EOF
if(!msgList)
break;

kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList);
if(msgList) {
// Note - this increments and decrements the refcount on msgList.
kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList);
}
}
}

Expand Down Expand Up @@ -4419,8 +4413,12 @@ kafkatcl_createSubscriberObjectCommand (kafkatcl_objectClientData *ko, char *cmd
return TCL_ERROR;
}

// Need to do this OR call rd_kafka_poll() periodically.
rd_kafka_poll_set_consumer(rk);
// After this do not call rd_kafka_poll(), call rd_kafka_consumer_poll() instead
if(rd_kafka_poll_set_consumer(rk) != RD_KAFKA_RESP_ERR_NO_ERROR) {
// Only possible error is RD_KAFKA_RESP_ERR__UNKNOWN_GROUP
Tcl_SetObjResult (interp, Tcl_NewStringObj("Unexpected failure from rd_kafka_poll_set_consumer", -1));
return TCL_ERROR;
}

// finished kafka setup, save state

Expand Down

0 comments on commit a9deb8b

Please sign in to comment.