Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crash (null-pointer access) in rd_kafka_metadata_cache_entry_by_id_cmp() during rd_avl_insert() #4778

Open
5 of 7 tasks
GerKr opened this issue Jul 9, 2024 · 2 comments · May be fixed by #4864
Open
5 of 7 tasks

Comments

@GerKr
Copy link

GerKr commented Jul 9, 2024

Description

After about 2 months of usage a crash happened. The crashdump-file shows that it happened in the function rd_kafka_metadata_cache_entry_by_id_cmp(const void *_a, const void *_b), where _b is 0x0000000000000000. This leads to a write access to some address 0x0000000000000088.
During the call of rd_avl_insert(ravl, elm, ran) the variable ravl contains:
ravl->ravl_root == 0x12eaf651500 (no nullptr) and
ravl->ravl_root.ran_height == 0x61657268 (converted to text it is "hrea" from the below found "hread_0")
ravl->ravl_root.ran_elm == 0x0000000000000000 (nullptr!!!)

The memory where rafl->rafl_root points to looks like following:
0f 00 00 00 00 00 00 00 00 40 89 b8 2e 01 00 00 68 72 65 61 64 5f 30 00 00 00 00 00 00 00 00 00 0f 00 00 00 00 00 00 00 00 74 e2 b6 2e 01 00 00 06 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 0f 00 00 00 00 00 00 00 00 65 74 43 68 61 6e

The call-stack looks like following:

librdkafka.dll!rd_kafka_metadata_cache_entry_by_id_cmp(const void * _a, const void * _b) Line 700 C
librdkafka.dll!rd_avl_insert_node(rd_avl_s * ravl, rd_avl_node_s * parent, rd_avl_node_s * ran, rd_avl_node_s * * existing) Line 104 C
librdkafka.dll!rd_kafka_metadata_cache_insert(rd_kafka_s * rk, const rd_kafka_metadata_topic * mtopic, const rd_kafka_metadata_topic_internal_s * metadata_internal_topic, __int64 now, __int64 ts_expires, unsigned char include_racks, rd_kafka_metadata_broker_internal_s * brokers_internal, unsigned __int64 broker_cnt) Line 380 C
librdkafka.dll!rd_kafka_metadata_cache_topic_update(rd_kafka_s * rk, const rd_kafka_metadata_topic * mdt, const rd_kafka_metadata_topic_internal_s * mdit, unsigned char propagate, unsigned char include_racks, rd_kafka_metadata_broker_internal_s * brokers, unsigned __int64 broker_cnt, unsigned char only_existing) Line 508 C
librdkafka.dll!rd_kafka_parse_Metadata0(rd_kafka_broker_s * rkb, rd_kafka_buf_s * request, rd_kafka_buf_s * rkbuf, rd_kafka_metadata_internal_s * * mdip, rd_list_s * request_topics, const char * reason) Line 857 C
librdkafka.dll!rd_kafka_parse_Metadata(rd_kafka_broker_s * rkb, rd_kafka_buf_s * request, rd_kafka_buf_s * rkbuf, rd_kafka_metadata_internal_s * * mdip) Line 1113 C
librdkafka.dll!rd_kafka_handle_Metadata(rd_kafka_s * rk, rd_kafka_broker_s * rkb, rd_kafka_resp_err_t err, rd_kafka_buf_s * rkbuf, rd_kafka_buf_s * request, void * opaque) Line 2490 C
librdkafka.dll!rd_kafka_buf_callback(rd_kafka_s * rk, rd_kafka_broker_s * rkb, rd_kafka_resp_err_t err, rd_kafka_buf_s * response, rd_kafka_buf_s * request) Line 512 C
librdkafka.dll!rd_kafka_buf_handle_op(rd_kafka_op_s * rko, rd_kafka_resp_err_t err) Line 453 C
librdkafka.dll!rd_kafka_op_handle_std(rd_kafka_s * rk, rd_kafka_q_s * rkq, rd_kafka_op_s * rko, int cb_type) Line 884 C
librdkafka.dll!rd_kafka_op_handle(rd_kafka_s * rk, rd_kafka_q_s * rkq, rd_kafka_op_s * rko, rd_kafka_q_cb_type_t cb_type, void * opaque, rd_kafka_op_res_t(*)(rd_kafka_s *, rd_kafka_q_s *, rd_kafka_op_s *, rd_kafka_q_cb_type_t, void ) callback) Line 916 C
librdkafka.dll!rd_kafka_q_serve(rd_kafka_q_s * rkq, int timeout_ms, int max_cnt, rd_kafka_q_cb_type_t cb_type, rd_kafka_op_res_t(
)(rd_kafka_s *, rd_kafka_q_s *, rd_kafka_op_s *, rd_kafka_q_cb_type_t, void *) callback, void * opaque) Line 581 C
librdkafka.dll!rd_kafka_thread_main(void * arg) Line 2138 C
librdkafka.dll!_thrd_wrapper_function(void * aArg) Line 589 C
kernel32.dll!00007ffd36527e94() Unknown
ntdll.dll!RtlUserThreadStart�() Unknown

How to reproduce

I don't know how to reproduce, as within the last 2 months it did not happen. It seems to be a sporadic problem.
The crashdump and the according pdb-file and sources (v2.4.0) are available. On request I can deliver the values of variables or memory dumps.

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): v2.4.0
  • Apache Kafka version: v2.8.1
  • librdkafka client configuration: N/A
  • Operating system: Windows Server 2019
  • Provide logs: call stack - see in "Description"
  • Provide broker log excerpts: N/A
  • Critical issue: the crash kills the application
@GerKr
Copy link
Author

GerKr commented Jul 9, 2024

For memory-dump-decoding: I use a 64bit release version of librdkafka.

@marcin-krystianc
Copy link

Hi,

I've also encountered this issue and can reproduce it, though not consistently. While investigating the problem, I don't fully understand it. However, I've identified that it started occurring between v2.3.0 and v2.4.0 (#4676). I can confirm the issue still affects v2.5.3.

How to reproduce it

My test scenario involves re-creating (removing and creating) many topics quickly and producing to newly created topics.

C#/C++
  • Start the test cluster
  • Run the KafkaTool:
     dotnet run --project KafkaTool.csproj -- \
     producer \
     --config allow.auto.create.topics=false \
     --config bootstrap.servers=localhost:40001,localhost:40002,localhost:40003 \
     --topics=3000 \
     --partitions=1 \
     --replication-factor=3 \
     --min-isr=2 \
     --messages-per-second=1000 \
     --config request.timeout.ms=180000 \
     --config message.timeout.ms=180000 \
     --config request.required.acks=-1 \
     --config enable.idempotence=true \
     --config max.in.flight.requests.per.connection=1 \
     --config topic.metadata.propagation.max.ms=60000 \
     --producers=1 \
     --recreate-topics-delay=700 \
     --recreate-topics-batch-size=500
    

Sample stack-trace

librdkafka.so!rd_kafka_metadata_cache_entry_by_id_cmp(const void * _a, const void * _b) (\workspace\librdkafka\src\rdkafka_metadata_cache.c:700)
librdkafka.so!rd_avl_insert_node(rd_avl_t * ravl, rd_avl_node_t * parent, rd_avl_node_t * ran, rd_avl_node_t ** existing) (\workspace\librdkafka\src\rdavl.c:104)
librdkafka.so!rd_avl_insert_node(rd_avl_t * ravl, rd_avl_node_t * parent, rd_avl_node_t * ran, rd_avl_node_t ** existing) (\workspace\librdkafka\src\rdavl.c:119)
librdkafka.so!rd_avl_insert(rd_avl_t * ravl, void * elm, rd_avl_node_t * ran) (\workspace\librdkafka\src\rdavl.h:211)
librdkafka.so!rd_kafka_metadata_cache_insert(rd_kafka_t * rk, const rd_kafka_metadata_topic_t * mtopic, const rd_kafka_metadata_topic_internal_t * metadata_internal_topic, rd_ts_t now, rd_ts_t ts_expires, rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t * brokers_internal, size_t broker_cnt) (\workspace\librdkafka\src\rdkafka_metadata_cache.c:380)
librdkafka.so!rd_kafka_metadata_cache_topic_update(rd_kafka_t * rk, const rd_kafka_metadata_topic_t * mdt, const rd_kafka_metadata_topic_internal_t * mdit, rd_bool_t propagate, rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t * brokers, size_t broker_cnt, rd_bool_t only_existing) (\workspace\librdkafka\src\rdkafka_metadata_cache.c:502)
librdkafka.so!rd_kafka_parse_Metadata0(rd_kafka_broker_t * rkb, rd_kafka_buf_t * request, rd_kafka_buf_t * rkbuf, rd_kafka_metadata_internal_t ** mdip, rd_list_t * request_topics, const char * reason) (\workspace\librdkafka\src\rdkafka_metadata.c:858)
librdkafka.so!rd_kafka_parse_Metadata(rd_kafka_broker_t * rkb, rd_kafka_buf_t * request, rd_kafka_buf_t * rkbuf, rd_kafka_metadata_internal_t ** mdip) (\workspace\librdkafka\src\rdkafka_metadata.c:1111)
librdkafka.so!rd_kafka_handle_Metadata(rd_kafka_t * rk, rd_kafka_broker_t * rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t * rkbuf, rd_kafka_buf_t * request, void * opaque) (\workspace\librdkafka\src\rdkafka_request.c:2546)
librdkafka.so!rd_kafka_buf_callback(rd_kafka_t * rk, rd_kafka_broker_t * rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t * response, rd_kafka_buf_t * request) (\workspace\librdkafka\src\rdkafka_buf.c:509)
librdkafka.so!rd_kafka_buf_handle_op(rd_kafka_op_t * rko, rd_kafka_resp_err_t err) (\workspace\librdkafka\src\rdkafka_buf.c:452)
librdkafka.so!rd_kafka_op_handle_std(rd_kafka_t * rk, rd_kafka_q_t * rkq, rd_kafka_op_t * rko, int cb_type) (\workspace\librdkafka\src\rdkafka_op.c:905)
librdkafka.so!rd_kafka_op_handle(rd_kafka_t * rk, rd_kafka_q_t * rkq, rd_kafka_op_t * rko, rd_kafka_q_cb_type_t cb_type, void * opaque, rd_kafka_q_serve_cb_t * callback) (\workspace\librdkafka\src\rdkafka_op.c:945)
librdkafka.so!rd_kafka_q_serve(rd_kafka_q_t * rkq, int timeout_ms, int max_cnt, rd_kafka_q_cb_type_t cb_type, rd_kafka_q_serve_cb_t * callback, void * opaque) (\workspace\librdkafka\src\rdkafka_queue.c:578)
librdkafka.so!rd_kafka_thread_main(void * arg) (\workspace\librdkafka\src\rdkafka.c:2143)
libc.so.6!start_thread(void * arg) (pthread_create.c:444)
libc.so.6!__GI___clone3() (clone3.S:78)

More details

I built the library with the ASAN (Address Sanitizer) option, which helped me understand the problem better. In the rd_kafka_metadata_cache_delete method, the memory pointed to by the rkmce pointer is freed, but that memory is still referenced from the rk_metadata_cache.rkmc_avl_by_id tree. The Address Sanitizer reports a "use after free" exception when traversing the entire tree immediately after freeing the rkmce pointer, but not before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants