Skip to content

Commit

Permalink
Merge pull request #29 from flightaware/creator-subcommand
Browse files Browse the repository at this point in the history
Add subcommand to objects to pass back to creators
  • Loading branch information
resuna authored Jan 21, 2019
2 parents 2c06f29 + a9deb8b commit c09cdd3
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion generic/kafkatcl.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ kafkatcl_subscriber_poll(kafkatcl_handleClientData *kh);
void
kafkatcl_consume_stop_all_partitions (kafkatcl_topicClientData *kt);

int
kafkatcl_handleObjectObjCmd(ClientData cData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[]);

// DEBUG
#ifdef DEBUGPRINTF
void kafkatcl_dump_topic_partition_list(rd_kafka_topic_partition_list_t *topics)
Expand Down Expand Up @@ -2160,13 +2163,15 @@ kafkatcl_handle_topic_info (Tcl_Interp *interp, kafkatcl_topicClientData *kt, in
"name",
"partitions",
"consistent_partition",
"output_queue_length",
NULL
};

enum subOptions {
SUBOPT_NAME,
SUBOPT_PARTITIONS,
SUBOPT_CONSISTENT_PARTITION
SUBOPT_CONSISTENT_PARTITION,
SUBOPT_OUTPUT_QUEUE_LENGTH
};

// argument must be one of the subOptions defined above
Expand Down Expand Up @@ -2229,6 +2234,16 @@ kafkatcl_handle_topic_info (Tcl_Interp *interp, kafkatcl_topicClientData *kt, in
Tcl_SetObjResult (interp, Tcl_NewIntObj (whichPartition));
break;
}

case SUBOPT_OUTPUT_QUEUE_LENGTH: {
if (objc != 3) {
Tcl_WrongNumArgs (interp, 3, objv, "");
return TCL_ERROR;
}

Tcl_SetObjResult (interp, Tcl_NewIntObj (rd_kafka_outq_len (kh->rk)));
break;
}
}
return TCL_OK;
}
Expand Down Expand Up @@ -2530,6 +2545,7 @@ kafkatcl_topicConsumerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj
"start",
"start_queue",
"stop",
"creator",
"delete",
"consume_start",
"consume_start_queue",
Expand All @@ -2544,6 +2560,7 @@ kafkatcl_topicConsumerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj
OPT_CONSUME_START,
OPT_CONSUME_START_QUEUE,
OPT_CONSUME_STOP,
OPT_CREATOR,
OPT_DELETE,
OPT_LEGACY_CONSUME_START,
OPT_LEGACY_CONSUME_START_QUEUE,
Expand Down Expand Up @@ -2670,6 +2687,10 @@ kafkatcl_topicConsumerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj
break;
}

case OPT_CREATOR: {
return kafkatcl_handleObjectObjCmd(kt->kh, interp, objc-1, objv+1);
}

case OPT_INFO: {
return kafkatcl_handle_topic_info (interp, kt, objc, objv);
}
Expand Down Expand Up @@ -2800,6 +2821,7 @@ kafkatcl_topicProducerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj
"produce",
"produce_batch",
"info",
"creator",
"delete",
NULL
};
Expand All @@ -2808,6 +2830,7 @@ kafkatcl_topicProducerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj
OPT_PRODUCE,
OPT_PRODUCE_BATCH,
OPT_INFO,
OPT_CREATOR,
OPT_DELETE
};

Expand Down Expand Up @@ -2924,6 +2947,10 @@ kafkatcl_topicProducerObjectObjCmd(ClientData cData, Tcl_Interp *interp, int obj
break;
}

case OPT_CREATOR: {
return kafkatcl_handleObjectObjCmd(kt->kh, interp, objc-1, objv+1);
}

case OPT_INFO: {
return kafkatcl_handle_topic_info (interp, kt, objc, objv);
}
Expand Down

0 comments on commit c09cdd3

Please sign in to comment.