Skip to content

Commit

Permalink
Release lock before invoking callbacks (#55)
Browse files Browse the repository at this point in the history
Issue: We were seeing deadlock if one stream was activated from within the on_closed() callback of another stream. This was because we used a lock to protect continuation_table AND the on_closed() callback was automatically invoked when an entry was removed.

Solution is to remove the automatic action when entry removed from table. Instead we invoke the on_closed() callback after the lock is released.
  • Loading branch information
graebm authored Dec 8, 2020
1 parent a81a287 commit 5bcc8b0
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 45 deletions.
87 changes: 73 additions & 14 deletions source/event_stream_rpc_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#endif

static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection);

struct aws_event_stream_rpc_client_connection {
struct aws_allocator *allocator;
struct aws_hash_table continuation_table;
Expand Down Expand Up @@ -173,9 +175,8 @@ static void s_on_channel_shutdown_fn(
aws_atomic_store_int(&connection->is_open, 0u);

if (connection->bootstrap_owned) {
aws_mutex_lock(&connection->stream_lock);
aws_hash_table_clear(&connection->continuation_table);
aws_mutex_unlock(&connection->stream_lock);
s_clear_continuation_table(connection);

aws_event_stream_rpc_client_connection_acquire(connection);
connection->on_connection_shutdown(connection, error_code, connection->user_data);
aws_event_stream_rpc_client_connection_release(connection);
Expand All @@ -185,8 +186,21 @@ static void s_on_channel_shutdown_fn(
aws_event_stream_rpc_client_connection_release(connection);
}

static void s_continuation_destroy(void *value) {
struct aws_event_stream_rpc_client_continuation_token *token = value;
/* Set each continuation's is_closed=true.
* A lock MUST be held while calling this.
* For use with aws_hash_table_foreach(). */
static int s_mark_each_continuation_closed(void *context, struct aws_hash_element *p_element) {
(void)context;
struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value;

aws_atomic_store_int(&continuation->is_closed, 1U);

return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
}

/* Invoke continuation's on_closed() callback.
* A lock must NOT be hold while calling this */
static void s_complete_continuation(struct aws_event_stream_rpc_client_continuation_token *token) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"token=%p: token with stream-id %" PRIu32 ", purged from the stream table",
Expand All @@ -200,6 +214,34 @@ static void s_continuation_destroy(void *value) {
aws_event_stream_rpc_client_continuation_release(token);
}

static int s_complete_and_clear_each_continuation(void *context, struct aws_hash_element *p_element) {
(void)context;
struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value;

s_complete_continuation(continuation);

return AWS_COMMON_HASH_TABLE_ITER_DELETE | AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
}

/* Remove each continuation from hash-table and invoke its on_closed() callback.
* The connection->is_open must be set false before calling this. */
static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection) {
AWS_ASSERT(!aws_event_stream_rpc_client_connection_is_open(connection));

/* Use lock to ensure synchronization with code that adds entries to table.
* Since connection was just marked closed, no further entries will be
* added to table once we acquire the lock. */
aws_mutex_lock(&connection->stream_lock);
aws_hash_table_foreach(&connection->continuation_table, s_mark_each_continuation_closed, NULL);
aws_mutex_unlock(&connection->stream_lock);

/* Now release lock before invoking callbacks.
* It's safe to alter the table now without a lock, since no further
* entries can be added, and we've gone through the critical section
* above to ensure synchronization */
aws_hash_table_foreach(&connection->continuation_table, s_complete_and_clear_each_continuation, NULL);
}

int aws_event_stream_rpc_client_connection_connect(
struct aws_allocator *allocator,
const struct aws_event_stream_rpc_client_connection_options *conn_options) {
Expand Down Expand Up @@ -240,7 +282,7 @@ int aws_event_stream_rpc_client_connection_connect(
aws_event_stream_rpc_hash_streamid,
aws_event_stream_rpc_streamid_eq,
NULL,
s_continuation_destroy)) {
NULL)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: failed initializing continuation table with error %s.",
Expand Down Expand Up @@ -326,14 +368,13 @@ void aws_event_stream_rpc_client_connection_close(
(void *)connection,
aws_error_debug_str(shutdown_error_code));

if (aws_event_stream_rpc_client_connection_is_open(connection)) {
aws_atomic_store_int(&connection->is_open, 0U);
size_t expect_open = 1U;
if (aws_atomic_compare_exchange_int(&connection->is_open, &expect_open, 0U)) {
aws_channel_shutdown(connection->channel, shutdown_error_code);

if (!connection->bootstrap_owned) {
aws_mutex_lock(&connection->stream_lock);
aws_hash_table_clear(&connection->continuation_table);
aws_mutex_unlock(&connection->stream_lock);
s_clear_continuation_table(connection);

aws_event_stream_rpc_client_connection_release(connection);
}
} else {
Expand Down Expand Up @@ -386,8 +427,14 @@ static void s_on_protocol_message_written_fn(
(void *)message_args->continuation);
AWS_FATAL_ASSERT(message_args->continuation && "end stream flag was set but it wasn't on a continuation");
aws_atomic_store_int(&message_args->continuation->is_closed, 1U);

aws_mutex_lock(&message_args->connection->stream_lock);
aws_hash_table_remove(
&message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, NULL);
aws_mutex_unlock(&message_args->connection->stream_lock);

/* Lock must NOT be held while invoking callback */
s_complete_continuation(message_args->continuation);
}

message_args->flush_fn(error_code, message_args->user_data);
Expand Down Expand Up @@ -716,6 +763,9 @@ static void s_route_message_by_type(
aws_mutex_lock(&connection->stream_lock);
aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
aws_mutex_unlock(&connection->stream_lock);

/* Note that we do not invoke callback while holding lock */
s_complete_continuation(continuation);
}
} else {
if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
Expand Down Expand Up @@ -908,10 +958,19 @@ int aws_event_stream_rpc_client_continuation_activate(
aws_mutex_lock(&continuation->connection->stream_lock);

if (continuation->stream_id) {
AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream has already been activated", (void *)continuation)
aws_raise_error(AWS_ERROR_INVALID_STATE);
goto clean_up;
}

/* Even though is_open is atomic, we need to hold a lock while checking it.
* This lets us coordinate with code that sets is_open to false. */
if (!aws_event_stream_rpc_client_connection_is_open(continuation->connection)) {
AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream's connection is not open", (void *)continuation)
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
goto clean_up;
}

/* we cannot update the connection's stream id until we're certain the message at least made it to the wire, because
* the next stream id must be consecutively increasing by 1. So send the message then update the connection state
* once we've made it to the wire. */
Expand All @@ -933,9 +992,6 @@ int aws_event_stream_rpc_client_continuation_activate(
goto clean_up;
}

/* The continuation table gets a ref count on the continuation. Take it here. */
aws_event_stream_rpc_client_continuation_acquire(continuation);

if (s_send_protocol_message(
continuation->connection,
continuation,
Expand All @@ -954,6 +1010,9 @@ int aws_event_stream_rpc_client_continuation_activate(
goto clean_up;
}

/* The continuation table gets a ref count on the continuation. Take it here. */
aws_event_stream_rpc_client_continuation_acquire(continuation);

continuation->connection->latest_stream_id = continuation->stream_id;
ret_val = AWS_OP_SUCCESS;

Expand Down
50 changes: 32 additions & 18 deletions tests/event_stream_rpc_client_connection_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <aws/event-stream/private/event_stream_rpc_test_helper.h>

#include <aws/common/condition_variable.h>
#include <aws/common/device_random.h>
#include <aws/common/macros.h>
#include <aws/common/mutex.h>
#include <aws/io/channel_bootstrap.h>
Expand All @@ -16,7 +17,6 @@
#include <aws/testing/aws_test_harness.h>

static const char *s_test_host_name = "127.0.0.1";
static const uint16_t s_test_port = 30123;

struct test_data {
struct aws_allocator *allocator;
Expand Down Expand Up @@ -244,7 +244,7 @@ static int s_fixture_setup(struct aws_allocator *allocator, void *ctx) {
.shutdown_callback_fn = s_event_loop_shutdown_callback,
.shutdown_callback_user_data = test_data,
};
test_data->el_group = aws_event_loop_group_new_default(allocator, 0, &el_shutdown_options);
test_data->el_group = aws_event_loop_group_new_default(allocator, 1, &el_shutdown_options);
ASSERT_NOT_NULL(test_data->el_group);
test_data->server_bootstrap = aws_server_bootstrap_new(allocator, test_data->el_group);
ASSERT_NOT_NULL(test_data->server_bootstrap);
Expand All @@ -254,8 +254,13 @@ static int s_fixture_setup(struct aws_allocator *allocator, void *ctx) {
.shutdown_callback_user_data = test_data,
};

test_data->resolver =
aws_host_resolver_new_default(allocator, 1, test_data->el_group, &host_resolver_shutdown_options);
struct aws_host_resolver_default_options resolver_options = {
.el_group = test_data->el_group,
.max_entries = 1,
.shutdown_options = &host_resolver_shutdown_options,
};

test_data->resolver = aws_host_resolver_new_default(allocator, &resolver_options);
ASSERT_NOT_NULL(test_data->resolver);

struct aws_client_bootstrap_options client_bootstrap_options = {
Expand All @@ -275,19 +280,28 @@ static int s_fixture_setup(struct aws_allocator *allocator, void *ctx) {
.type = AWS_SOCKET_STREAM,
};

struct aws_event_stream_rpc_server_listener_options listener_options = {
.socket_options = &socket_options,
.host_name = s_test_host_name,
.port = s_test_port,
.bootstrap = test_data->server_bootstrap,
.user_data = test_data,
.on_new_connection = s_fixture_on_new_server_connection,
.on_connection_shutdown = s_fixture_on_server_connection_shutdown,
.on_destroy_callback = s_on_listener_destroy,
};

test_data->listener = aws_event_stream_rpc_server_new_listener(allocator, &listener_options);
ASSERT_NOT_NULL(test_data->listener);
/* Find a random open port */
uint16_t test_port = 0;
while (!test_data->listener) {
aws_device_random_u16(&test_port);
test_port |= 0x8000; /* Use high numbers */

struct aws_event_stream_rpc_server_listener_options listener_options = {
.socket_options = &socket_options,
.host_name = s_test_host_name,
.port = test_port,
.bootstrap = test_data->server_bootstrap,
.user_data = test_data,
.on_new_connection = s_fixture_on_new_server_connection,
.on_connection_shutdown = s_fixture_on_server_connection_shutdown,
.on_destroy_callback = s_on_listener_destroy,
};

test_data->listener = aws_event_stream_rpc_server_new_listener(allocator, &listener_options);
if (!test_data->listener) {
ASSERT_INT_EQUALS(AWS_IO_SOCKET_ADDRESS_IN_USE, aws_last_error());
}
}

test_data->allocator = allocator;

Expand All @@ -296,7 +310,7 @@ static int s_fixture_setup(struct aws_allocator *allocator, void *ctx) {
.user_data = test_data,
.bootstrap = test_data->client_bootstrap,
.host_name = s_test_host_name,
.port = s_test_port,
.port = test_port,
.on_connection_setup = s_client_on_connection_setup,
.on_connection_shutdown = s_client_on_connection_shutdown,
.on_connection_protocol_message = s_client_connection_protocol_message,
Expand Down
36 changes: 23 additions & 13 deletions tests/event_stream_rpc_server_connection_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

#include <aws/common/condition_variable.h>
#include <aws/common/device_random.h>
#include <aws/common/macros.h>
#include <aws/common/mutex.h>
#include <aws/event-stream/private/event_stream_rpc_test_helper.h>
Expand Down Expand Up @@ -138,19 +139,28 @@ static int s_fixture_setup(struct aws_allocator *allocator, void *ctx) {
.type = AWS_SOCKET_STREAM,
};

struct aws_event_stream_rpc_server_listener_options listener_options = {
.socket_options = &socket_options,
.host_name = "127.0.0.1",
.port = 30123,
.bootstrap = test_data->server_bootstrap,
.user_data = test_data,
.on_new_connection = s_fixture_on_new_server_connection,
.on_connection_shutdown = s_fixture_on_server_connection_shutdown,
.on_destroy_callback = s_on_listener_destroy,
};

test_data->listener = aws_event_stream_rpc_server_new_listener(allocator, &listener_options);
ASSERT_NOT_NULL(test_data->listener);
/* Find a random open port */
uint16_t test_port = 0;
while (!test_data->listener) {
aws_device_random_u16(&test_port);
test_port |= 0x8000; /* Use high numbers */

struct aws_event_stream_rpc_server_listener_options listener_options = {
.socket_options = &socket_options,
.host_name = "127.0.0.1",
.port = test_port,
.bootstrap = test_data->server_bootstrap,
.user_data = test_data,
.on_new_connection = s_fixture_on_new_server_connection,
.on_connection_shutdown = s_fixture_on_server_connection_shutdown,
.on_destroy_callback = s_on_listener_destroy,
};

test_data->listener = aws_event_stream_rpc_server_new_listener(allocator, &listener_options);
if (!test_data->listener) {
ASSERT_INT_EQUALS(AWS_IO_SOCKET_ADDRESS_IN_USE, aws_last_error());
}
}

test_data->allocator = allocator;

Expand Down

0 comments on commit 5bcc8b0

Please sign in to comment.