diff --git a/source/event_stream_rpc_client.c b/source/event_stream_rpc_client.c index 1a292a8..dfadfa6 100644 --- a/source/event_stream_rpc_client.c +++ b/source/event_stream_rpc_client.c @@ -52,6 +52,7 @@ struct aws_event_stream_rpc_client_continuation_token { void *user_data; struct aws_atomic_var ref_count; struct aws_atomic_var is_closed; + struct aws_atomic_var is_complete; }; static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data); @@ -201,17 +202,21 @@ static int s_mark_each_continuation_closed(void *context, struct aws_hash_elemen /* Invoke continuation's on_closed() callback. * A lock must NOT be hold while calling this */ static void s_complete_continuation(struct aws_event_stream_rpc_client_continuation_token *token) { - AWS_LOGF_DEBUG( - AWS_LS_EVENT_STREAM_RPC_CLIENT, - "token=%p: token with stream-id %" PRIu32 ", purged from the stream table", - (void *)token, - token->stream_id); + size_t expect_not_complete = 0U; + if (aws_atomic_compare_exchange_int(&token->is_complete, &expect_not_complete, 1U)) { - if (token->stream_id) { - token->closed_fn(token, token->user_data); - } + AWS_LOGF_DEBUG( + AWS_LS_EVENT_STREAM_RPC_CLIENT, + "token=%p: completing continuation with stream-id %" PRIu32, + (void *)token, + token->stream_id); + + if (token->stream_id) { + token->closed_fn(token, token->user_data); + } - aws_event_stream_rpc_client_continuation_release(token); + aws_event_stream_rpc_client_continuation_release(token); + } } static int s_complete_and_clear_each_continuation(void *context, struct aws_hash_element *p_element) { @@ -917,6 +922,7 @@ struct aws_event_stream_rpc_client_continuation_token *aws_event_stream_rpc_clie aws_event_stream_rpc_client_connection_acquire(continuation->connection); aws_atomic_init_int(&continuation->ref_count, 1); aws_atomic_init_int(&continuation->is_closed, 0); + aws_atomic_init_int(&continuation->is_complete, 0); continuation->continuation_fn = continuation_options->on_continuation; continuation->closed_fn = continuation_options->on_continuation_closed; continuation->user_data = continuation_options->user_data;