Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where channel could outlive event loop #536

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/proof-alarm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Check
run: |
TMPFILE=$(mktemp)
echo "495506dac0244fd96f66dc1d11740db8 source/linux/epoll_event_loop.c" > $TMPFILE
echo "267fca6c3fc1f8cb58dc80e1926b23e0 source/linux/epoll_event_loop.c" > $TMPFILE
md5sum --check $TMPFILE

# No further steps if successful
Expand Down
18 changes: 18 additions & 0 deletions include/aws/io/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct aws_event_loop {
uint64_t latest_tick_start;
size_t current_tick_latency_sum;
struct aws_atomic_var next_flush_time;
struct aws_event_loop_group *owner;
void *impl_data;
};

Expand All @@ -125,6 +126,7 @@ struct aws_event_loop_local_object {
struct aws_event_loop_options {
aws_io_clock_fn *clock;
struct aws_thread_options *thread_options;
struct aws_event_loop_group *owner;
};

typedef struct aws_event_loop *(aws_new_event_loop_fn)(
Expand Down Expand Up @@ -190,6 +192,22 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
AWS_IO_API
void aws_event_loop_destroy(struct aws_event_loop *event_loop);

/**
* If this event loop belongs to an aws_event_loop_group, acquire a hold on the group
* preventing it from being cleaned up.
*
* If the event loop is not in a group, this function does nothing,
* but only internal tests should ever create a naked event loop without a group.
*/
AWS_IO_API
void aws_event_loop_acquire_hold_on_group(struct aws_event_loop *event_loop);

/**
* Release a hold on the event loop's group, allowing it to be cleaned up.
*/
AWS_IO_API
void aws_event_loop_release_hold_on_group(struct aws_event_loop *event_loop);

/**
* Initializes common event-loop data structures.
* This is only called from the *new() function of event loop implementations.
Expand Down
2 changes: 2 additions & 0 deletions source/bsd/kqueue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
}
clean_up_event_loop_base = true;

event_loop->owner = options->owner;

struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop));
if (!impl) {
goto clean_up;
Expand Down
20 changes: 12 additions & 8 deletions source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct aws_channel {
size_t window_update_batch_emit_threshold;
struct aws_channel_task window_update_task;
bool read_back_pressure_enabled;
bool window_update_in_progress;
bool window_update_scheduled;
};

struct channel_setup_args {
Expand Down Expand Up @@ -214,7 +214,6 @@ struct aws_channel *aws_channel_new(struct aws_allocator *alloc, const struct aw

AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: Beginning creation and setup of new channel.", (void *)channel);
channel->alloc = alloc;
channel->loop = creation_args->event_loop;
channel->on_shutdown_completed = creation_args->on_shutdown_completed;
channel->shutdown_user_data = creation_args->shutdown_user_data;

Expand Down Expand Up @@ -256,6 +255,10 @@ struct aws_channel *aws_channel_new(struct aws_allocator *alloc, const struct aw
setup_args->on_setup_completed = creation_args->on_setup_completed;
setup_args->user_data = creation_args->setup_user_data;

/* keep loop alive until channel is destroyed */
channel->loop = creation_args->event_loop;
aws_event_loop_acquire_hold_on_group(channel->loop);

aws_task_init(&setup_args->task, s_on_channel_setup_complete, setup_args, "on_channel_setup_complete");
aws_event_loop_schedule_task_now(creation_args->event_loop, &setup_args->task);

Expand All @@ -278,8 +281,6 @@ static void s_cleanup_slot(struct aws_channel_slot *slot) {
}

void aws_channel_destroy(struct aws_channel *channel) {
AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: destroying channel.", (void *)channel);

aws_channel_release_hold(channel);
}

Expand Down Expand Up @@ -307,6 +308,8 @@ static void s_final_channel_deletion_task(struct aws_task *task, void *arg, enum

aws_channel_set_statistics_handler(channel, NULL);

aws_event_loop_release_hold_on_group(channel->loop);

aws_mem_release(channel->alloc, channel);
}

Expand All @@ -322,6 +325,7 @@ void aws_channel_release_hold(struct aws_channel *channel) {

if (prev_refcount == 1) {
/* Refcount is now 0, finish cleaning up channel memory. */
AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: destroying channel.", (void *)channel);
if (aws_channel_thread_is_callers_thread(channel)) {
s_final_channel_deletion_task(NULL, channel, AWS_TASK_STATUS_RUN_READY);
} else {
Expand Down Expand Up @@ -833,6 +837,8 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar
(void)channel_task;
struct aws_channel *channel = arg;

channel->window_update_scheduled = false;

if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
/* get the right-most slot to start the updates. */
struct aws_channel_slot *slot = channel->first;
Expand All @@ -852,15 +858,13 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar
"channel %p: channel update task failed with status %d",
(void *)slot->channel,
aws_last_error());
slot->channel->window_update_in_progress = false;
aws_channel_shutdown(channel, aws_last_error());
return;
}
}
slot = slot->adj_left;
}
}
channel->window_update_in_progress = false;
}

int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) {
Expand All @@ -869,9 +873,9 @@ int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t
slot->current_window_update_batch_size =
aws_add_size_saturating(slot->current_window_update_batch_size, window);

if (!slot->channel->window_update_in_progress &&
if (!slot->channel->window_update_scheduled &&
slot->window_size <= slot->channel->window_update_batch_emit_threshold) {
slot->channel->window_update_in_progress = true;
slot->channel->window_update_scheduled = true;
aws_channel_task_init(
&slot->channel->window_update_task, s_window_update_task, slot->channel, "window update task");
aws_channel_schedule_task_now(slot->channel, &slot->channel->window_update_task);
Expand Down
18 changes: 18 additions & 0 deletions source/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

#include <aws/io/event_loop.h>
#include <aws/io/logging.h>

#include <aws/common/clock.h>
#include <aws/common/device_random.h>
Expand Down Expand Up @@ -124,6 +125,7 @@ static struct aws_event_loop_group *s_event_loop_group_new(
struct aws_event_loop_options options = {
.clock = clock,
.thread_options = &thread_options,
.owner = el_group,
};

if (pin_threads) {
Expand Down Expand Up @@ -385,6 +387,22 @@ void aws_event_loop_destroy(struct aws_event_loop *event_loop) {
event_loop->vtable->destroy(event_loop);
}

void aws_event_loop_acquire_hold_on_group(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop);

if (event_loop->owner) {
aws_event_loop_group_acquire(event_loop->owner);
} else {
AWS_LOGF_WARN(AWS_LS_IO_EVENT_LOOP, "id=%p: Event loop does not belong to a group.", (void *)event_loop);
}
}

void aws_event_loop_release_hold_on_group(struct aws_event_loop *event_loop) {
if (event_loop && event_loop->owner) {
aws_event_loop_group_release(event_loop->owner);
}
}

int aws_event_loop_fetch_local_object(
struct aws_event_loop *event_loop,
void *key,
Expand Down
2 changes: 2 additions & 0 deletions source/linux/epoll_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
goto clean_up_loop;
}

loop->owner = options->owner;

struct epoll_loop *epoll_loop = aws_mem_calloc(alloc, 1, sizeof(struct epoll_loop));
if (!epoll_loop) {
goto cleanup_base_loop;
Expand Down
2 changes: 2 additions & 0 deletions source/windows/iocp/iocp_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
}
clean_up_event_loop_base = true;

event_loop->owner = options->owner;

impl = aws_mem_calloc(alloc, 1, sizeof(struct iocp_loop));
if (!impl) {
goto clean_up;
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ add_test_case(channel_tasks_serialized_run)
add_test_case(channel_rejects_post_shutdown_tasks)
add_test_case(channel_cancels_pending_tasks)
add_test_case(channel_duplicate_shutdown)
add_test_case(channel_keeps_event_loop_group_alive)
add_net_test_case(channel_connect_some_hosts_timeout)

add_net_test_case(test_default_with_ipv6_lookup)
Expand Down
91 changes: 88 additions & 3 deletions tests/channel_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
struct channel_setup_test_args {
struct aws_mutex mutex;
struct aws_condition_variable condition_variable;
bool setup_completed; /* protected by mutex */
bool shutdown_completed; /* protected by mutex */
int error_code; /* protected by mutex */
bool setup_completed; /* protected by mutex */
bool shutdown_completed; /* protected by mutex */
int error_code; /* protected by mutex */
bool event_loop_group_shutdown_completed; /* protected by mutex (not used by all tests) */
enum aws_task_status task_status;
};

Expand Down Expand Up @@ -59,6 +60,19 @@ static int s_channel_setup_create_and_wait(
return AWS_OP_SUCCESS;
}

static void s_event_loop_group_on_shutdown_complete(void *user_data) {
struct channel_setup_test_args *setup_test_args = user_data;
aws_mutex_lock(&setup_test_args->mutex);
setup_test_args->event_loop_group_shutdown_completed = true;
aws_condition_variable_notify_all(&setup_test_args->condition_variable);
aws_mutex_unlock(&setup_test_args->mutex);
}

static bool s_event_loop_group_shutdown_completed_predicate(void *arg) {
struct channel_setup_test_args *setup_test_args = (struct channel_setup_test_args *)arg;
return setup_test_args->event_loop_group_shutdown_completed;
}

static int s_test_channel_setup(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
struct aws_event_loop *event_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks);
Expand Down Expand Up @@ -616,6 +630,77 @@ static int s_test_channel_duplicate_shutdown(struct aws_allocator *allocator, vo

AWS_TEST_CASE(channel_duplicate_shutdown, s_test_channel_duplicate_shutdown)

/* This is a regression test. The channel didn't used to do anything to keep the event-loop alive.
* So if the event-loop-group was released before the channel, the loops would get destroyed,
* then the channel would try to schedule its own destruction task on the loop and crash. */
static int s_test_channel_keeps_event_loop_group_alive(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
aws_io_library_init(allocator);

struct channel_setup_test_args test_args = {
.mutex = AWS_MUTEX_INIT,
.condition_variable = AWS_CONDITION_VARIABLE_INIT,
};

struct aws_shutdown_callback_options event_loop_group_shutdown_options = {
.shutdown_callback_fn = s_event_loop_group_on_shutdown_complete,
.shutdown_callback_user_data = &test_args,
};
struct aws_event_loop_group *event_loop_group =
aws_event_loop_group_new_default(allocator, 1, &event_loop_group_shutdown_options);
ASSERT_NOT_NULL(event_loop_group);

struct aws_event_loop *event_loop = aws_event_loop_group_get_next_loop(event_loop_group);

struct aws_channel_options channel_options = {
.on_setup_completed = s_channel_setup_test_on_setup_completed,
.setup_user_data = &test_args,
.on_shutdown_completed = s_channel_test_shutdown,
.shutdown_user_data = &test_args,
.event_loop = event_loop,
};

struct aws_channel *channel = NULL;
ASSERT_SUCCESS(s_channel_setup_create_and_wait(allocator, &channel_options, &test_args, &channel));

/* shut down channel, but don't clean it up yet */
aws_channel_shutdown(channel, 0);

ASSERT_SUCCESS(aws_mutex_lock(&test_args.mutex));
ASSERT_SUCCESS(aws_condition_variable_wait_pred(
&test_args.condition_variable, &test_args.mutex, s_channel_test_shutdown_predicate, &test_args));
ASSERT_SUCCESS(aws_mutex_unlock(&test_args.mutex));

/* release event loop group before channel */
aws_event_loop_group_release(event_loop_group);

/* wait a bit to ensure the event-loop-group doesn't shut down (because channel has a hold on it) */
uint64_t wait_time = aws_timestamp_convert(500, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
ASSERT_SUCCESS(aws_mutex_lock(&test_args.mutex));
ASSERT_FAILS(
aws_condition_variable_wait_for_pred(
&test_args.condition_variable,
&test_args.mutex,
wait_time,
s_event_loop_group_shutdown_completed_predicate,
&test_args),
"Channel failed to keep event loop alive");
ASSERT_SUCCESS(aws_mutex_unlock(&test_args.mutex));

/* release channel for destruction */
aws_channel_destroy(channel);

/* event loop group should shut down now */
ASSERT_SUCCESS(aws_mutex_lock(&test_args.mutex));
ASSERT_SUCCESS(aws_condition_variable_wait_pred(
&test_args.condition_variable, &test_args.mutex, s_event_loop_group_shutdown_completed_predicate, &test_args));
ASSERT_SUCCESS(aws_mutex_unlock(&test_args.mutex));

aws_io_library_clean_up();
return AWS_OP_SUCCESS;
}
AWS_TEST_CASE(channel_keeps_event_loop_group_alive, s_test_channel_keeps_event_loop_group_alive)

struct channel_connect_test_args {
struct aws_mutex *mutex;
struct aws_condition_variable cv;
Expand Down