Skip to content

Commit

Permalink
Keep looking for work after EOF.
Browse files Browse the repository at this point in the history
  • Loading branch information
resuna committed Sep 11, 2018
1 parent ab2e789 commit ffcdff6
Showing 1 changed file with 6 additions and 13 deletions.
19 changes: 6 additions & 13 deletions generic/kafkatcl.c
Original file line number Diff line number Diff line change
Expand Up @@ -3716,30 +3716,23 @@ kafkatcl_SubscriberEventCheckProc (ClientData clientData, int flags) {
rd_kafka_message_t *message;
Tcl_Interp *interp = kh->interp;

// polling with timeoutMS of 0 is nonblocking, which is ideal
// DON'T do this if you called rd_kafka_poll_set_consumer()
//rd_kafka_poll (kh->rk, 0);

// If we don't have a subscriber callback, leave subscriber messages alone.
// User must then explicitly read messages (via subscriber consume) frequently!
if(!kh->subscriberCallback)
return;

while((message = rd_kafka_consumer_poll(rk, 1))) {
while((message = rd_kafka_consumer_poll(rk, 0))) {
rd_kafka_timestamp_type_t tstype;
Tcl_WideInt timestamp = rd_kafka_message_timestamp(message, &tstype);
Tcl_Obj *msgList = kafkatcl_message_to_tcl_list(interp, message, timestamp, tstype);

// We don't need this any more
rd_kafka_message_destroy(message);

// fprintf(stderr, "%08lx: %s\n", (long)message, Tcl_GetString(msgList));

// Null message list means EOF
if(!msgList)
break;

// Note - this increments and decrements the refcount on msgList.
kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList);
if(msgList) {
// Note - this increments and decrements the refcount on msgList.
kafkatcl_invoke_callback_with_argument (interp, kh->subscriberCallback, msgList);
}
}
}

Expand Down

0 comments on commit ffcdff6

Please sign in to comment.