Skip to content

Commit

Permalink
Ensure that the continuation completion callbacks and its decref only…
Browse files Browse the repository at this point in the history
… get performed once no matter what the situation (#101)
  • Loading branch information
bretambrose authored Jul 3, 2023
1 parent 0c45575 commit ec1716c
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions source/event_stream_rpc_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct aws_event_stream_rpc_client_continuation_token {
void *user_data;
struct aws_atomic_var ref_count;
struct aws_atomic_var is_closed;
struct aws_atomic_var is_complete;
};

static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data);
Expand Down Expand Up @@ -201,17 +202,21 @@ static int s_mark_each_continuation_closed(void *context, struct aws_hash_elemen
/* Invoke continuation's on_closed() callback.
* A lock must NOT be hold while calling this */
static void s_complete_continuation(struct aws_event_stream_rpc_client_continuation_token *token) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"token=%p: token with stream-id %" PRIu32 ", purged from the stream table",
(void *)token,
token->stream_id);
size_t expect_not_complete = 0U;
if (aws_atomic_compare_exchange_int(&token->is_complete, &expect_not_complete, 1U)) {

if (token->stream_id) {
token->closed_fn(token, token->user_data);
}
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"token=%p: completing continuation with stream-id %" PRIu32,
(void *)token,
token->stream_id);

if (token->stream_id) {
token->closed_fn(token, token->user_data);
}

aws_event_stream_rpc_client_continuation_release(token);
aws_event_stream_rpc_client_continuation_release(token);
}
}

static int s_complete_and_clear_each_continuation(void *context, struct aws_hash_element *p_element) {
Expand Down Expand Up @@ -917,6 +922,7 @@ struct aws_event_stream_rpc_client_continuation_token *aws_event_stream_rpc_clie
aws_event_stream_rpc_client_connection_acquire(continuation->connection);
aws_atomic_init_int(&continuation->ref_count, 1);
aws_atomic_init_int(&continuation->is_closed, 0);
aws_atomic_init_int(&continuation->is_complete, 0);
continuation->continuation_fn = continuation_options->on_continuation;
continuation->closed_fn = continuation_options->on_continuation_closed;
continuation->user_data = continuation_options->user_data;
Expand Down

0 comments on commit ec1716c

Please sign in to comment.