From 3334843eb4e0e56c2565e481b2ef853d1cadde78 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Mon, 8 Apr 2024 10:36:11 -0700 Subject: [PATCH] Add async write() function - fixes "stalled" uploads deadlocking S3 Client (#418) --- include/aws/s3/private/s3_meta_request_impl.h | 40 +- include/aws/s3/s3.h | 1 + include/aws/s3/s3_client.h | 69 +- source/s3.c | 2 +- source/s3_auto_ranged_put.c | 7 + source/s3_client.c | 32 +- source/s3_meta_request.c | 252 ++++++- tests/CMakeLists.txt | 18 + tests/s3_asyncwrite_tests.c | 652 ++++++++++++++++++ .../s3_many_async_uploads_without_data_test.c | 184 ++--- 10 files changed, 1065 insertions(+), 192 deletions(-) create mode 100644 tests/s3_asyncwrite_tests.c diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index d0cdf236c..3075973dc 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -143,14 +143,11 @@ struct aws_s3_meta_request { /* The meta request's outgoing body comes from one of these: * 1) request_body_async_stream: if set, then async stream 1 part at a time * 2) request_body_parallel_stream: if set, then stream multiple parts in parallel - * 3) initial_request_message's body_stream: else synchronously stream parts */ + * 3) request_body_using_async_writes: if set, then synchronously copy async_write data from 1 part at a time + * 4) initial_request_message's body_stream: else synchronously stream parts */ struct aws_async_input_stream *request_body_async_stream; struct aws_parallel_input_stream *request_body_parallel_stream; - - /* Whether to let this meta-request exceed the regular limits on num-request-being-prepared. - * This lets as many async-stream reads be pending as possible, reducing the chance of deadlock - * when the user can't control when data arrives. */ - bool maximize_async_stream_reads; + bool request_body_using_async_writes; /* Part size to use for uploads and downloads. Passed down by the creating client. */ const size_t part_size; @@ -234,6 +231,35 @@ struct aws_s3_meta_request { /* To track aws_s3_requests with cancellable HTTP streams */ struct aws_linked_list cancellable_http_streams_list; + /* Data for async-writes. + * Currently, for a given meta request, only 1 async-write is allowed at a time. + * + * When the user calls write(), they may not provide enough data for us to send an UploadPart. + * In that case, we copy the data to a buffer and immediately mark the write complete, + * so the user can write more data, so we finally get enough to send. */ + struct { + /* The future for whatever async-write is pending. + * If this is NULL, there isn't enough data to send another part. + * + * If this is non-NULL, 1+ part requests can be sent. + * When all the data has been processed, this future is completed + * and cleared, and we can accept another write() call. */ + struct aws_future_void *future; + + /* True once user passes `eof` to their final write() call */ + bool eof; + + /* Holds buffered data we can't immediately send. + * The length will always be less than part-size */ + struct aws_byte_buf buffered_data; + + /* Cursor/pointer to data from the most-recent write() call, which + * provides enough data (combined with any buffered_data) to send 1+ parts. + * If there's data leftover in unbuffered_cursor after these parts are sent, + * it's copied into buffered_data, and we wait for more writes... */ + struct aws_byte_cursor unbuffered_cursor; + } async_write; + } synced_data; /* Anything in this structure should only ever be accessed by the client on its process work event loop task. */ @@ -390,8 +416,6 @@ struct aws_future_bool *aws_s3_meta_request_read_body( uint64_t offset, struct aws_byte_buf *buffer); -bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request); - /* Set the meta request finish result as failed. This is meant to be called sometime before aws_s3_meta_request_finish. * Subsequent calls to this function or to aws_s3_meta_request_set_success_synced will not overwrite the end result of * the meta request. */ diff --git a/include/aws/s3/s3.h b/include/aws/s3/s3.h index 99b897c84..4dc0fe905 100644 --- a/include/aws/s3/s3.h +++ b/include/aws/s3/s3.h @@ -45,6 +45,7 @@ enum aws_s3_errors { AWS_ERROR_S3_INVALID_MEMORY_LIMIT_CONFIG, AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED, AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE, + AWS_ERROR_S3_REQUEST_HAS_COMPLETED, AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID) }; diff --git a/include/aws/s3/s3_client.h b/include/aws/s3/s3_client.h index e7215b43c..fc7f36dd0 100644 --- a/include/aws/s3/s3_client.h +++ b/include/aws/s3/s3_client.h @@ -566,7 +566,8 @@ struct aws_s3_checksum_config { * There are several ways to pass the request's body data: * 1) If the data is already in memory, set the body-stream on `message`. * 2) If the data is on disk, set `send_filepath` for best performance. - * 3) If the data will be be produced in asynchronous chunks, set `send_async_stream`. + * 3) If the data is available, but copying each chunk is asynchronous, set `send_async_stream`. + * 4) If you're not sure when each chunk of data will be available, use `send_using_async_writes`. */ struct aws_s3_meta_request_options { /* The type of meta request we will be trying to accelerate. */ @@ -615,23 +616,27 @@ struct aws_s3_meta_request_options { * Optional - EXPERIMENTAL/UNSTABLE * If set, the request body comes from this async stream. * Use this when outgoing data will be produced in asynchronous chunks. + * The S3 client will read from the stream whenever it's ready to upload another chunk. + * + * WARNING: The S3 client can deadlock if many async streams are "stalled", + * never completing their async read. If you're not sure when (if ever) + * data will be ready, use `send_using_async_writes` instead. + * * Do not set if the body is being passed by other means (see note above). */ struct aws_async_input_stream *send_async_stream; /** - * NOT FOR PUBLIC USE + * Optional - EXPERIMENTAL/UNSTABLE + * Set this to send request body data using the async aws_s3_meta_request_write() function. + * Use this when outgoing data will be produced in asynchronous chunks, + * and you're not sure when (if ever) each chunk will be ready. * - * The S3 client can currently deadlock if too many uploads using - * `send_async_stream` are "stalled" and failing to provide data. - * Set this true to raise the number of "stalled" meta-requests the S3 client - * can tolerate before it deadlocks. The downside of setting this is that - * the S3 client will use as much memory as it is allowed. - * (see `aws_s3_client_config.memory_limit_in_bytes`). + * This only works with AWS_S3_META_REQUEST_TYPE_PUT_OBJECT. * - * This setting will be removed when a better solution is developed. + * Do not set if the body is being passed by other means (see note above). */ - bool maximize_async_stream_reads_internal_use_only; + bool send_using_async_writes; /** * Optional. @@ -829,6 +834,50 @@ struct aws_s3_meta_request *aws_s3_client_make_meta_request( struct aws_s3_client *client, const struct aws_s3_meta_request_options *options); +/** + * Write the next chunk of data. + * + * You must set `aws_s3_meta_request_options.send_using_async_writes` to use this function. + * + * This function is asynchronous, and returns a future (see ). + * You may not call write() again until the future completes. + * + * If the future completes with an error code, then write() did not succeed + * and you should not call it again. If the future contains any error code, + * the meta request is guaranteed to finish soon (you don't need to worry about + * canceling the meta request yourself after a failed write). + * A common error code is AWS_ERROR_S3_REQUEST_HAS_COMPLETED, indicating + * the meta request completed for reasons unrelated to the write() call + * (e.g. CreateMultipartUpload received a 403 Forbidden response). + * AWS_ERROR_INVALID_STATE usually indicates that you're calling write() + * incorrectly (e.g. not waiting for previous write to complete). + * + * You MUST keep the data in memory until the future completes. + * If you need to free the memory early, call aws_s3_meta_request_cancel(). + * cancel() will synchronously complete the future from any pending write with + * error code AWS_ERROR_S3_REQUEST_HAS_COMPLETED. + * + * You can wait any length of time between calls to write(). + * If there's not enough data to upload a part, the data will be copied + * to a buffer and the future will immediately complete. + * + * @param meta_request Meta request + * + * @param data The data to send. The data can be any size. + * + * @param eof Pass true to signal EOF (end of file). + * Do not call write() again after passing true. + * + * This function never returns NULL. + * + * WARNING: This feature is experimental. + */ +AWS_S3_API +struct aws_future_void *aws_s3_meta_request_write( + struct aws_s3_meta_request *meta_request, + struct aws_byte_cursor data, + bool eof); + /** * Increment the flow-control window, so that response data continues downloading. * diff --git a/source/s3.c b/source/s3.c index 61bc0ee3e..2ca1b3d75 100644 --- a/source/s3.c +++ b/source/s3.c @@ -46,7 +46,7 @@ static struct aws_error_info s_errors[] = { "Memory limit should be at least 1GiB. Part size and max part size should be smaller than memory limit."), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED, "CreateSession call failed when signing with S3 Express."), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE, "part_size mismatch, possibly due to wrong object_size_hint. Retrying with Range instead of partNumber."), - + AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_HAS_COMPLETED, "Request has already completed, action cannot be performed."), }; /* clang-format on */ diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 941d8ae81..0b29421ad 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -418,6 +418,13 @@ static bool s_should_skip_scheduling_more_parts_based_on_flags( return auto_ranged_put->synced_data.num_parts_pending_read > 0; } + /* If doing async-writes, only allow a new part if there's a pending write-future, + * and no pending-reads yet to copy that data. */ + if (auto_ranged_put->base.request_body_using_async_writes == true) { + return (auto_ranged_put->base.synced_data.async_write.future == NULL) || + (auto_ranged_put->synced_data.num_parts_pending_read > 0); + } + /* If this is the conservative pass, only allow 1 pending-read. * Reads are serial anyway, so queuing up a whole bunch isn't necessarily a speedup. */ if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) { diff --git a/source/s3_client.c b/source/s3_client.c index aa07f9504..678abf9a8 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -1140,14 +1140,35 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default( if (options->send_filepath.len > 0) { ++body_source_count; } + if (options->send_using_async_writes == true) { + if (options->type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) { + /* TODO: we could support async-writes for DEFAULT type too, just takes work & testing */ + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "Could not create meta request." + "send-using-data-writes can only be used with auto-ranged-put."); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + if (content_length_found) { + /* TODO: we could support async-writes with content-length, just takes work & testing */ + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "Could not create meta request." + "send-using-data-writes can only be used when Content-Length is unknown."); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + ++body_source_count; + } if (options->send_async_stream != NULL) { ++body_source_count; } if (body_source_count > 1) { AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, - "Could not create auto-ranged-put meta request." - " More than one data source is set (filepath, async stream, body stream)."); + "Could not create meta request." + " More than one data source is set (filepath, async stream, body stream, data writes)."); aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); return NULL; } @@ -1716,13 +1737,6 @@ static bool s_s3_client_should_update_meta_request( } } - /* If maximize_async_stream_reads, let this meta-request ignore max_requests_prepare & max_requests_in_flight. - * We need to maximize the number of async-streams being read from, because the user has no idea - * when data will arrive to any of them. */ - if (meta_request->request_body_async_stream != NULL && meta_request->maximize_async_stream_reads) { - return true; - } - /** * If number of being-prepared + already-prepared-and-queued requests is more than the max that can * be in the preparation stage. diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 76e2ad266..d33488371 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -4,6 +4,7 @@ */ #include "aws/s3/private/s3_auto_ranged_get.h" +#include "aws/s3/private/s3_auto_ranged_put.h" #include "aws/s3/private/s3_checksums.h" #include "aws/s3/private/s3_client_impl.h" #include "aws/s3/private/s3_meta_request_impl.h" @@ -77,6 +78,11 @@ static void s_s3_meta_request_send_request_finish( struct aws_http_stream *stream, int error_code); +static int s_s3_meta_request_read_from_pending_async_writes( + struct aws_s3_meta_request *meta_request, + struct aws_byte_buf *dest, + bool *eof); + void aws_s3_meta_request_lock_synced_data(struct aws_s3_meta_request *meta_request) { AWS_PRECONDITION(meta_request); @@ -261,8 +267,11 @@ int aws_s3_meta_request_init_base( meta_request->synced_data.read_window_running_total = client->initial_read_window; } - /* Set initial_meta_request, based on how the request's body is being passed in - * (we checked earlier that it's not being passed multiple ways) */ + /* Keep original message around, for headers, method, and synchronous body-stream (if any) */ + meta_request->initial_request_message = aws_http_message_acquire(options->message); + + /* If the request's body is being passed in some other way, set that up. + * (we checked earlier that the request body is not being passed multiple ways) */ if (options->send_filepath.len > 0) { /* Create parallel read stream from file */ meta_request->request_body_parallel_stream = @@ -271,17 +280,12 @@ int aws_s3_meta_request_init_base( goto error; } - /* but keep original message around for headers, method, etc */ - meta_request->initial_request_message = aws_http_message_acquire(options->message); } else if (options->send_async_stream != NULL) { - /* Read from async body-stream, but keep original message around for headers, method, etc */ meta_request->request_body_async_stream = aws_async_input_stream_acquire(options->send_async_stream); - meta_request->initial_request_message = aws_http_message_acquire(options->message); - meta_request->maximize_async_stream_reads = options->maximize_async_stream_reads_internal_use_only; - } else { - /* Keep original message around, we'll read from its synchronous body-stream */ - meta_request->initial_request_message = aws_http_message_acquire(options->message); + } else if (options->send_using_async_writes == true) { + meta_request->request_body_using_async_writes = true; + aws_byte_buf_init(&meta_request->synced_data.async_write.buffered_data, allocator, 0); } meta_request->synced_data.next_streaming_part = 1; @@ -350,12 +354,28 @@ void aws_s3_meta_request_increment_read_window(struct aws_s3_meta_request *meta_ } void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) { + struct aws_future_void *write_future_to_cancel = NULL; + /* BEGIN CRITICAL SECTION */ aws_s3_meta_request_lock_synced_data(meta_request); aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_CANCELED); aws_s3_meta_request_cancel_cancellable_requests_synced(meta_request, AWS_ERROR_S3_CANCELED); + if (meta_request->synced_data.async_write.future != NULL) { + write_future_to_cancel = meta_request->synced_data.async_write.future; + meta_request->synced_data.async_write.future = NULL; + } aws_s3_meta_request_unlock_synced_data(meta_request); /* END CRITICAL SECTION */ + + if (write_future_to_cancel != NULL) { + AWS_LOGF_TRACE( + AWS_LS_S3_META_REQUEST, "id=%p: write future complete due to cancellation", (void *)meta_request); + aws_future_void_set_error(write_future_to_cancel, AWS_ERROR_S3_REQUEST_HAS_COMPLETED); + aws_future_void_release(write_future_to_cancel); + } + + /* Schedule the work task, to continue processing the meta-request */ + aws_s3_client_schedule_process_work(meta_request->client); } int aws_s3_meta_request_pause( @@ -499,6 +519,8 @@ static void s_s3_meta_request_destroy(void *user_data) { aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result); + aws_byte_buf_clean_up(&meta_request->synced_data.async_write.buffered_data); + if (meta_request->vtable != NULL) { AWS_LOGF_TRACE(AWS_LS_S3_META_REQUEST, "id=%p Calling virtual meta request destroy function.", log_id); meta_request->vtable->destroy(meta_request); @@ -1978,6 +2000,8 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request struct aws_linked_list release_request_list; aws_linked_list_init(&release_request_list); + struct aws_future_void *pending_async_write_future = NULL; + struct aws_s3_meta_request_result finish_result; AWS_ZERO_STRUCT(finish_result); @@ -2001,6 +2025,10 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request aws_linked_list_push_back(&release_request_list, &request->node); } + /* Clean out any pending async-write future */ + pending_async_write_future = meta_request->synced_data.async_write.future; + meta_request->synced_data.async_write.future = NULL; + finish_result = meta_request->synced_data.finish_result; AWS_ZERO_STRUCT(meta_request->synced_data.finish_result); @@ -2013,6 +2041,15 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request return; } + if (pending_async_write_future != NULL) { + AWS_LOGF_TRACE( + AWS_LS_S3_META_REQUEST, + "id=%p: write future complete due to meta request's early finish", + (void *)meta_request); + aws_future_void_set_error(pending_async_write_future, AWS_ERROR_S3_REQUEST_HAS_COMPLETED); + pending_async_write_future = aws_future_void_release(pending_async_write_future); + } + while (!aws_linked_list_empty(&release_request_list)) { struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list); struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node); @@ -2080,13 +2117,25 @@ struct aws_future_bool *aws_s3_meta_request_read_body( return aws_parallel_input_stream_read(meta_request->request_body_parallel_stream, offset, buffer); } + /* Further techniques are synchronous... */ + struct aws_future_bool *synchronous_read_future = aws_future_bool_new(meta_request->allocator); + + /* If using async-writes, call function which fills the buffer and/or hits EOF */ + if (meta_request->request_body_using_async_writes == true) { + bool eof = false; + if (s_s3_meta_request_read_from_pending_async_writes(meta_request, buffer, &eof) == AWS_OP_SUCCESS) { + aws_future_bool_set_result(synchronous_read_future, eof); + } else { + aws_future_bool_set_error(synchronous_read_future, aws_last_error()); + } + return synchronous_read_future; + } + /* Else synchronous aws_input_stream */ struct aws_input_stream *synchronous_stream = aws_http_message_get_body_stream(meta_request->initial_request_message); AWS_FATAL_ASSERT(synchronous_stream); - struct aws_future_bool *synchronous_read_future = aws_future_bool_new(meta_request->allocator); - /* Keep calling read() until we fill the buffer, or hit EOF */ struct aws_stream_status status = {.is_end_of_stream = false, .is_valid = true}; while ((buffer->len < buffer->capacity) && !status.is_end_of_stream) { @@ -2109,21 +2158,6 @@ struct aws_future_bool *aws_s3_meta_request_read_body( return synchronous_read_future; } -bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request) { - AWS_PRECONDITION(meta_request); - - struct aws_input_stream *initial_body_stream = - aws_http_message_get_body_stream(meta_request->initial_request_message); - AWS_FATAL_ASSERT(initial_body_stream); - - struct aws_stream_status status; - if (aws_input_stream_get_status(initial_body_stream, &status)) { - return true; - } - - return status.is_end_of_stream; -} - void aws_s3_meta_request_result_setup( struct aws_s3_meta_request *meta_request, struct aws_s3_meta_request_result *result, @@ -2154,6 +2188,170 @@ void aws_s3_meta_request_result_setup( result->error_code = error_code; } +struct aws_future_void *aws_s3_meta_request_write( + struct aws_s3_meta_request *meta_request, + struct aws_byte_cursor data, + bool eof) { + + struct aws_future_void *write_future = aws_future_void_new(meta_request->allocator); + + /* Set this true, while lock is held, if we're ready to send data */ + bool ready_to_send = false; + + /* Set this true, while lock is held, if write() was called illegally + * and the meta-request should terminate */ + bool illegal_usage_terminate_meta_request = false; + + /* BEGIN CRITICAL SECTION */ + aws_s3_meta_request_lock_synced_data(meta_request); + + if (aws_s3_meta_request_has_finish_result_synced(meta_request)) { + /* The meta-request is already complete */ + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: Ignoring write(), the meta request is already complete.", + (void *)meta_request); + aws_future_void_set_error(write_future, AWS_ERROR_S3_REQUEST_HAS_COMPLETED); + + } else if (!meta_request->request_body_using_async_writes) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p: Illegal call to write(). The meta-request must be configured to send-using-data-writes.", + (void *)meta_request); + illegal_usage_terminate_meta_request = true; + + } else if (meta_request->synced_data.async_write.future != NULL) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p: Illegal call to write(). The previous write is not complete.", + (void *)meta_request); + illegal_usage_terminate_meta_request = true; + + } else if (meta_request->synced_data.async_write.eof) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, "id=%p: Illegal call to write(). EOF already set.", (void *)meta_request); + illegal_usage_terminate_meta_request = true; + + } else if (eof || (meta_request->synced_data.async_write.buffered_data.len + data.len >= meta_request->part_size)) { + /* This write makes us ready to send (EOF, or we have enough data now to send at least 1 part) */ + AWS_LOGF_TRACE( + AWS_LS_S3_META_REQUEST, + "id=%p: write(data=%zu, eof=%d) previously-buffered=%zu. Ready to upload part...", + (void *)meta_request, + data.len, + eof, + meta_request->synced_data.async_write.buffered_data.len); + + meta_request->synced_data.async_write.unbuffered_cursor = data; + meta_request->synced_data.async_write.eof = eof; + meta_request->synced_data.async_write.future = aws_future_void_acquire(write_future); + ready_to_send = true; + + } else { + /* Can't send yet. Buffer the data and complete its future, so we can get more data */ + AWS_LOGF_TRACE( + AWS_LS_S3_META_REQUEST, + "id=%p: write(data=%zu, eof=%d) previously-buffered=%zu. Buffering data, not enough to upload.", + (void *)meta_request, + data.len, + eof, + meta_request->synced_data.async_write.buffered_data.len); + + /* TODO: something smarter with this buffer: like get it from buffer-pool, + * or reserve exactly part-size, or reserve exactly how much we need */ + aws_byte_buf_append_dynamic(&meta_request->synced_data.async_write.buffered_data, &data); + + /* TODO: does a future that completes immediately risk stack overflow? + * If a user does tiny writes, and registers callbacks on the write-future, + * they'll fire synchronously. If the user repeats, the stack will just grow and grow. */ + aws_future_void_set_result(write_future); + } + + if (illegal_usage_terminate_meta_request) { + aws_future_void_set_error(write_future, AWS_ERROR_INVALID_STATE); + aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_INVALID_STATE); + } + + aws_s3_meta_request_unlock_synced_data(meta_request); + /* END CRITICAL SECTION */ + + if (ready_to_send || illegal_usage_terminate_meta_request) { + /* Schedule the work task, to continue processing the meta-request */ + aws_s3_client_schedule_process_work(meta_request->client); + } + + return write_future; +} + +/* Copy pending async-write data into the buffer. + * This is only called when there's enough data for the next part. */ +static int s_s3_meta_request_read_from_pending_async_writes( + struct aws_s3_meta_request *meta_request, + struct aws_byte_buf *dest, + bool *eof) { + + *eof = false; + + struct aws_future_void *write_future_to_complete = NULL; + int error_code = 0; + + /* BEGIN CRITICAL SECTION */ + aws_s3_meta_request_lock_synced_data(meta_request); + + /* If user calls aws_s3_meta_request_cancel(), it will synchronously complete any pending async-writes. + * So if the write-future is unexpectedly gone, that's what happened, don't touch the data. */ + if (meta_request->synced_data.async_write.future == NULL) { + error_code = AWS_ERROR_S3_CANCELED; + goto unlock; + } + + /* Buffered data should not exceed part-size */ + AWS_FATAL_ASSERT(dest->capacity - dest->len >= meta_request->synced_data.async_write.buffered_data.len); + + /* Copy all buffered data */ + aws_byte_buf_write_from_whole_buffer(dest, meta_request->synced_data.async_write.buffered_data); + meta_request->synced_data.async_write.buffered_data.len = 0; + + /* Copy as much unbuffered data as possible */ + aws_byte_buf_write_to_capacity(dest, &meta_request->synced_data.async_write.unbuffered_cursor); + + /* We should have filled the dest buffer, unless this is the final write */ + AWS_FATAL_ASSERT(dest->len == dest->capacity || meta_request->synced_data.async_write.eof); + + /* If we haven't received EOF, and there's not enough data in unbuffered_cursor to fill another part, + * then we need to move it into buffered_data, so we can complete the write's future and get more data */ + if (!meta_request->synced_data.async_write.eof && + meta_request->synced_data.async_write.unbuffered_cursor.len < meta_request->part_size) { + + aws_byte_buf_append_dynamic( + &meta_request->synced_data.async_write.buffered_data, + &meta_request->synced_data.async_write.unbuffered_cursor); + meta_request->synced_data.async_write.unbuffered_cursor.len = 0; + } + + /* If all unbuffered data is consumed (we sent it, or buffered it) then complete the write's future */ + if (meta_request->synced_data.async_write.unbuffered_cursor.len == 0) { + write_future_to_complete = meta_request->synced_data.async_write.future; + meta_request->synced_data.async_write.future = NULL; + + if (meta_request->synced_data.async_write.eof) { + *eof = true; + } + } +unlock: + aws_s3_meta_request_unlock_synced_data(meta_request); + /* END CRITICAL SECTION */ + + /* Don't hold locks while completing the future, it might trigger a user callback */ + if (write_future_to_complete != NULL) { + AWS_LOGF_TRACE(AWS_LS_S3_META_REQUEST, "id=%p: write future complete", (void *)meta_request); + aws_future_void_set_result(write_future_to_complete); + aws_future_void_release(write_future_to_complete); + } + + return error_code == 0 ? AWS_OP_SUCCESS : aws_raise_error(error_code); +} + void aws_s3_meta_request_result_clean_up( struct aws_s3_meta_request *meta_request, struct aws_s3_meta_request_result *result) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 016af657c..0d73de62e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -132,6 +132,24 @@ add_net_test_case(test_s3_many_async_uploads_without_data) add_net_test_case(test_s3_download_empty_file_with_checksum) add_net_test_case(test_s3_download_single_part_file_with_checksum) add_net_test_case(test_s3_download_multipart_file_with_checksum) +add_net_test_case(test_s3_asyncwrite_empty_file) +add_net_test_case(test_s3_asyncwrite_small_file_1_write) +add_net_test_case(test_s3_asyncwrite_small_file_1_write_then_eof) +add_net_test_case(test_s3_asyncwrite_small_file_many_writes) +add_net_test_case(test_s3_asyncwrite_1_part) +add_net_test_case(test_s3_asyncwrite_1_part_many_writes) +add_net_test_case(test_s3_asyncwrite_1_part_then_eof) +add_net_test_case(test_s3_asyncwrite_2_parts_1_write) +add_net_test_case(test_s3_asyncwrite_2_parts_2_partsize_writes) +add_net_test_case(test_s3_asyncwrite_2_parts_first_write_over_partsize) +add_net_test_case(test_s3_asyncwrite_2_parts_first_write_under_partsize) +add_net_test_case(test_s3_asyncwrite_tolerate_empty_writes) +add_net_test_case(test_s3_asyncwrite_write_from_future_callback) +add_net_test_case(test_s3_asyncwrite_fails_if_request_has_completed) +add_net_test_case(test_s3_asyncwrite_fails_if_write_after_eof) +add_net_test_case(test_s3_asyncwrite_fails_if_writes_overlap) +add_net_test_case(test_s3_asyncwrite_cancel_forces_completion) +add_net_test_case(test_s3_asyncwrite_cancel_sends_abort) if(ENABLE_MRAP_TESTS) add_net_test_case(test_s3_get_object_less_than_part_size_mrap) diff --git a/tests/s3_asyncwrite_tests.c b/tests/s3_asyncwrite_tests.c new file mode 100644 index 000000000..34c1ad2ff --- /dev/null +++ b/tests/s3_asyncwrite_tests.c @@ -0,0 +1,652 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include "s3_tester.h" + +#include +#include +#include +#include +#include +#include +#include + +#define TIMEOUT_NANOS ((uint64_t)AWS_TIMESTAMP_NANOS * 10) /* 10secs */ +#define PART_SIZE MB_TO_BYTES(5) + +struct asyncwrite_tester { + struct aws_allocator *allocator; + struct aws_s3_tester s3_tester; + struct aws_s3_client *client; + struct aws_s3_meta_request *meta_request; + struct aws_s3_meta_request_test_results test_results; + struct aws_byte_buf source_buf; +}; + +static int s_asyncwrite_tester_init( + struct asyncwrite_tester *tester, + struct aws_allocator *allocator, + size_t object_size) { + + AWS_ZERO_STRUCT(*tester); + tester->allocator = allocator; + + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester->s3_tester)); + + /* Create S3 client */ + struct aws_s3_client_config client_config = { + .part_size = PART_SIZE, + }; + ASSERT_SUCCESS(aws_s3_tester_bind_client( + &tester->s3_tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + tester->client = aws_s3_client_new(allocator, &client_config); + ASSERT_NOT_NULL(tester->client); + + /* Create buffer of data to upload */ + aws_byte_buf_init(&tester->source_buf, allocator, object_size); + ASSERT_SUCCESS(aws_device_random_buffer(&tester->source_buf)); + + /* Create meta request */ + aws_s3_meta_request_test_results_init(&tester->test_results, allocator); + + struct aws_string *host_name = + aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region); + struct aws_byte_cursor host_name_cursor = aws_byte_cursor_from_string(host_name); + struct aws_byte_buf object_path; + ASSERT_SUCCESS( + aws_s3_tester_upload_file_path_init(allocator, &object_path, aws_byte_cursor_from_c_str("/asyncwrite.bin"))); + + struct aws_http_message *message = aws_s3_test_put_object_request_new_without_body( + allocator, + &host_name_cursor, + g_test_body_content_type, + aws_byte_cursor_from_buf(&object_path), + object_size, + 0 /*flags*/); + + /* erase content-length header, because async-write doesn't currently support it */ + aws_http_headers_erase(aws_http_message_get_headers(message), g_content_length_header_name); + + struct aws_s3_checksum_config checksum_config = { + .checksum_algorithm = AWS_SCA_CRC32, + .location = AWS_SCL_TRAILER, + }; + + struct aws_s3_meta_request_options meta_request_options = { + .type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .message = message, + .send_using_async_writes = true, + .checksum_config = &checksum_config, + }; + ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester->s3_tester, &meta_request_options, &tester->test_results)); + + tester->meta_request = aws_s3_client_make_meta_request(tester->client, &meta_request_options); + ASSERT_NOT_NULL(tester->meta_request); + + /* Clean up tmp variables */ + aws_string_destroy(host_name); + aws_byte_buf_clean_up(&object_path); + aws_http_message_release(message); + return 0; +} + +static int s_asyncwrite_tester_validate(struct asyncwrite_tester *tester) { + ASSERT_SUCCESS(aws_s3_tester_validate_put_object_results(&tester->test_results, 0 /*flags*/)); + + /* Validate the checksums, to be we uploaded what we meant to upload */ + ASSERT_TRUE(tester->test_results.upload_review.part_count > 0, "Update this code to handle whole-object checksum"); + struct aws_byte_cursor source_cursor = aws_byte_cursor_from_buf(&tester->source_buf); + for (size_t part_i = 0; part_i < tester->test_results.upload_review.part_count; ++part_i) { + /* calculate checksum of this part, from source_buffer */ + uint64_t part_size = tester->test_results.upload_review.part_sizes_array[part_i]; + ASSERT_TRUE(part_size <= source_cursor.len); + ASSERT_TRUE(part_size < INT_MAX); + uint32_t crc32_val = aws_checksums_crc32(source_cursor.ptr, (int)part_size, 0x0 /*previousCrc32*/); + aws_byte_cursor_advance(&source_cursor, (size_t)part_size); + + /* base64-encode the big-endian representation of the CRC32 */ + uint32_t crc32_be_val = aws_hton32(crc32_val); + struct aws_byte_cursor crc32_be_cursor = {.ptr = (uint8_t *)&crc32_be_val, .len = sizeof(crc32_be_val)}; + struct aws_byte_buf crc32_base64_buf; + aws_byte_buf_init(&crc32_base64_buf, tester->allocator, 16); + ASSERT_SUCCESS(aws_base64_encode(&crc32_be_cursor, &crc32_base64_buf)); + + /* compare to what got sent */ + struct aws_string *sent_checksum = tester->test_results.upload_review.part_checksums_array[part_i]; + ASSERT_BIN_ARRAYS_EQUALS( + crc32_base64_buf.buffer, crc32_base64_buf.len, sent_checksum->bytes, sent_checksum->len); + + aws_byte_buf_clean_up(&crc32_base64_buf); + } + return 0; +} + +static int s_asyncwrite_tester_clean_up(struct asyncwrite_tester *tester) { + tester->meta_request = aws_s3_meta_request_release(tester->meta_request); + aws_s3_tester_wait_for_meta_request_shutdown(&tester->s3_tester); + aws_s3_meta_request_test_results_clean_up(&tester->test_results); + aws_byte_buf_clean_up(&tester->source_buf); + tester->client = aws_s3_client_release(tester->client); + aws_s3_tester_clean_up(&tester->s3_tester); + return 0; +} + +static int s_write(struct asyncwrite_tester *tester, struct aws_byte_cursor data, bool eof) { + /* use freshly allocated buffer for each write, so that we're likely to get memory violations + * if this data is used wrong internally. */ + struct aws_byte_buf write_buf; + aws_byte_buf_init_cache_and_update_cursors(&write_buf, tester->allocator, &data, NULL); + + struct aws_future_void *write_future = aws_s3_meta_request_write(tester->meta_request, data, eof); + ASSERT_NOT_NULL(write_future); + ASSERT_TRUE(aws_future_void_wait(write_future, TIMEOUT_NANOS)); + aws_byte_buf_clean_up(&write_buf); + ASSERT_INT_EQUALS(0, aws_future_void_get_error(write_future)); + aws_future_void_release(write_future); + return 0; +} + +struct basic_asyncwrite_options { + /* Total size of object to upload */ + size_t object_size; + /* Max bytes per write(). If zero, defaults to object_size */ + size_t max_bytes_per_write; + /* If true, EOF is passed in a separate final empty write() */ + bool eof_requires_extra_write; +}; + +/* Common function for tests that do successful uploads, without too much weird stuff */ +static int s_basic_asyncwrite( + struct aws_allocator *allocator, + void *ctx, + const struct basic_asyncwrite_options *options) { + + (void)ctx; + struct asyncwrite_tester tester; + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, options->object_size)); + + size_t max_bytes_per_write = options->max_bytes_per_write > 0 ? options->max_bytes_per_write : options->object_size; + bool eof = false; + struct aws_byte_cursor source_cursor = aws_byte_cursor_from_buf(&tester.source_buf); + while (source_cursor.len > 0) { + size_t bytes_to_write = aws_min_size(max_bytes_per_write, source_cursor.len); + struct aws_byte_cursor write_cursor = aws_byte_cursor_advance(&source_cursor, bytes_to_write); + if (source_cursor.len == 0 && !options->eof_requires_extra_write) { + eof = true; + } + + ASSERT_SUCCESS(s_write(&tester, write_cursor, eof)); + } + + /* Ensure EOF is sent (eof_requires_extra_write, or object_size==0) */ + if (!eof) { + ASSERT_SUCCESS(s_write(&tester, (struct aws_byte_cursor){0}, true /*eof*/)); + } + + /* Done */ + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + ASSERT_SUCCESS(s_asyncwrite_tester_validate(&tester)); + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + return 0; +}; + +AWS_TEST_CASE(test_s3_asyncwrite_empty_file, s_test_s3_asyncwrite_empty_file) +static int s_test_s3_asyncwrite_empty_file(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = 0, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +AWS_TEST_CASE(test_s3_asyncwrite_small_file_1_write, s_test_s3_asyncwrite_small_file_1_write) +static int s_test_s3_asyncwrite_small_file_1_write(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = 100, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* In this test, the 1st write must be buffered, since it's less than part-size */ +AWS_TEST_CASE(test_s3_asyncwrite_small_file_1_write_then_eof, s_test_s3_asyncwrite_small_file_1_write_then_eof) +static int s_test_s3_asyncwrite_small_file_1_write_then_eof(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = 100, + .eof_requires_extra_write = true, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* In this test, we must buffer multiple writes, since their cumulative size is under part-size */ +AWS_TEST_CASE(test_s3_asyncwrite_small_file_many_writes, s_test_s3_asyncwrite_small_file_many_writes) +static int s_test_s3_asyncwrite_small_file_many_writes(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = 100, + .max_bytes_per_write = 1, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* 1 part-sized write */ +AWS_TEST_CASE(test_s3_asyncwrite_1_part, s_test_s3_asyncwrite_1_part) +static int s_test_s3_asyncwrite_1_part(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = PART_SIZE, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* Send 1 full part, but spread across many writes. + * This is just stressing data buffering. */ +AWS_TEST_CASE(test_s3_asyncwrite_1_part_many_writes, s_test_s3_asyncwrite_1_part_many_writes) +static int s_test_s3_asyncwrite_1_part_many_writes(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = PART_SIZE, + .max_bytes_per_write = PART_SIZE / 16, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* Send 1 full part, then a separate empty EOF write. + * This probably results in a second (empty) part being uploaded. */ +AWS_TEST_CASE(test_s3_asyncwrite_1_part_then_eof, s_test_s3_asyncwrite_1_part_then_eof) +static int s_test_s3_asyncwrite_1_part_then_eof(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = PART_SIZE, + .eof_requires_extra_write = true, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* Send 1 write, with enough data for 2 full parts. + * This stresses the case where a single write-future must persist while multiple parts are copied. */ +AWS_TEST_CASE(test_s3_asyncwrite_2_parts_1_write, s_test_s3_asyncwrite_2_parts_1_write) +static int s_test_s3_asyncwrite_2_parts_1_write(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = PART_SIZE * 2, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* Send 2 part-sized writes. + * In this case, neither write needed to be buffered. */ +AWS_TEST_CASE(test_s3_asyncwrite_2_parts_2_partsize_writes, s_test_s3_asyncwrite_2_parts_2_partsize_writes) +static int s_test_s3_asyncwrite_2_parts_2_partsize_writes(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = PART_SIZE * 2, + .max_bytes_per_write = PART_SIZE, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* Send 2 full parts, but the first write is larger than part-size. + * This tests the case where the remainder of that first write needs to be buffered + * and then sent along with the data from the second write. */ +AWS_TEST_CASE( + test_s3_asyncwrite_2_parts_first_write_over_partsize, + s_test_s3_asyncwrite_2_parts_first_write_over_partsize) +static int s_test_s3_asyncwrite_2_parts_first_write_over_partsize(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = PART_SIZE * 2, + .max_bytes_per_write = PART_SIZE + 100, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* Send 2 full parts, but the first write is less than part-size. + * This tests the case where a part (the first one) is a combination of buffered and unbuffered data. */ +AWS_TEST_CASE( + test_s3_asyncwrite_2_parts_first_write_under_partsize, + s_test_s3_asyncwrite_2_parts_first_write_under_partsize) +static int s_test_s3_asyncwrite_2_parts_first_write_under_partsize(struct aws_allocator *allocator, void *ctx) { + struct basic_asyncwrite_options options = { + .object_size = PART_SIZE * 2, + .max_bytes_per_write = PART_SIZE - 100, + }; + return s_basic_asyncwrite(allocator, ctx, &options); +} + +/* We don't explicitly bar empty writes, since it's reasonable to do an empty write with the EOF at the end. + * Let's make sure we can tolerate empty writes at other arbitrary points. */ +AWS_TEST_CASE(test_s3_asyncwrite_tolerate_empty_writes, s_test_s3_asyncwrite_tolerate_empty_writes) +static int s_test_s3_asyncwrite_tolerate_empty_writes(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct asyncwrite_tester tester; + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, PART_SIZE /*object_size*/)); + + struct aws_byte_cursor source_cursor = aws_byte_cursor_from_buf(&tester.source_buf); + + /* empty write at start */ + struct aws_byte_cursor empty_data = {0}; + ASSERT_SUCCESS(s_write(&tester, empty_data, false /*eof*/)); + + /* write half the data */ + struct aws_byte_cursor next_chunk = aws_byte_cursor_advance(&source_cursor, PART_SIZE / 2); + ASSERT_SUCCESS(s_write(&tester, next_chunk, false /*eof*/)); + + /* empty write in the middle */ + ASSERT_SUCCESS(s_write(&tester, empty_data, false /*eof*/)); + + /* write up till we're 1 byte short of a full part */ + next_chunk = aws_byte_cursor_advance(&source_cursor, (PART_SIZE / 2) - 1); + ASSERT_SUCCESS(s_write(&tester, next_chunk, false /*eof*/)); + + /* empty write when we're just 1 byte away from having a full part to send */ + ASSERT_SUCCESS(s_write(&tester, empty_data, false /*eof*/)); + + /* write final byte, but don't send EOF yet */ + next_chunk = aws_byte_cursor_advance(&source_cursor, 1); + ASSERT_SUCCESS(s_write(&tester, next_chunk, false /*eof*/)); + + /* empty write at the end, but don't send EOF yet */ + ASSERT_SUCCESS(s_write(&tester, empty_data, false /*eof*/)); + + /* OK, finally send EOF */ + ASSERT_SUCCESS(s_write(&tester, empty_data, true /*eof*/)); + + /* Done */ + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + ASSERT_SUCCESS(s_asyncwrite_tester_validate(&tester)); + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + return 0; +} + +struct asyncwrite_on_another_thread_ctx { + struct asyncwrite_tester *tester; + size_t max_bytes_per_write; + struct aws_byte_cursor source_cursor; + struct aws_future_void *write_future; +}; + +static void s_write_from_future_callback(void *user_data) { + struct asyncwrite_on_another_thread_ctx *thread_ctx = user_data; + + /* If there was a previous write, assert it succeeded */ + if (thread_ctx->write_future != NULL) { + AWS_FATAL_ASSERT(aws_future_void_get_error(thread_ctx->write_future) == 0); + thread_ctx->write_future = aws_future_void_release(thread_ctx->write_future); + } + + /* If that was the final write, we're done */ + if (thread_ctx->source_cursor.len == 0) { + return; + } + + /* Write next chunk */ + size_t bytes_to_write = aws_min_size(thread_ctx->source_cursor.len, thread_ctx->max_bytes_per_write); + struct aws_byte_cursor next_chunk = aws_byte_cursor_advance(&thread_ctx->source_cursor, bytes_to_write); + bool eof = thread_ctx->source_cursor.len == 0; + thread_ctx->write_future = aws_s3_meta_request_write(thread_ctx->tester->meta_request, next_chunk, eof); + + /* Register this function to run again when write completes */ + aws_future_void_register_callback(thread_ctx->write_future, s_write_from_future_callback, thread_ctx); +} + +/* This test tries to submit new writes from the write-future's completion callback, + * which often fires on another thread. */ +AWS_TEST_CASE(test_s3_asyncwrite_write_from_future_callback, s_test_s3_asyncwrite_write_from_future_callback) +static int s_test_s3_asyncwrite_write_from_future_callback(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct asyncwrite_tester tester; + + /* Have a few parts, so we get more chances to write from a callback on another thread */ + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, PART_SIZE * 4 /*object_size*/)); + + struct asyncwrite_on_another_thread_ctx on_another_thread_ctx = { + .tester = &tester, + .source_cursor = aws_byte_cursor_from_buf(&tester.source_buf), + /* Use writes that don't divide nicely into part-size. + * This way, we're passing some buffered data, and some unbuffered data, to each part. + * And getting back some buffered leftovers when the part completes. + * This pushes a lot of edge cases, and makes it likely we'll catch any threading bugs. */ + .max_bytes_per_write = (PART_SIZE / 2) + 1, + }; + + /* Kick off the recursive write-future completion callback loop */ + s_write_from_future_callback(&on_another_thread_ctx); + + /* Done */ + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + ASSERT_SUCCESS(s_asyncwrite_tester_validate(&tester)); + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + return 0; +} + +/* This tests checks that, if the meta request fails before write() is called, + * the the write-future fails with AWS_ERROR_S3_REQUEST_HAS_COMPLETED */ +AWS_TEST_CASE(test_s3_asyncwrite_fails_if_request_has_completed, s_test_s3_asyncwrite_fails_if_request_has_completed) +static int s_test_s3_asyncwrite_fails_if_request_has_completed(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct asyncwrite_tester tester; + + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, PART_SIZE /*object_size*/)); + + /* Cancel meta request before write() call */ + aws_s3_meta_request_cancel(tester.meta_request); + + struct aws_future_void *write_future = + aws_s3_meta_request_write(tester.meta_request, aws_byte_cursor_from_buf(&tester.source_buf), true /*eof*/); + + ASSERT_TRUE(aws_future_void_wait(write_future, TIMEOUT_NANOS)); + + ASSERT_INT_EQUALS(AWS_ERROR_S3_REQUEST_HAS_COMPLETED, aws_future_void_get_error(write_future)); + write_future = aws_future_void_release(write_future); + + /* Done */ + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + + /* The meta request's error-code should still be CANCELED, the failed write() shouldn't affect that */ + ASSERT_INT_EQUALS(AWS_ERROR_S3_CANCELED, tester.test_results.finished_error_code); + + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + return 0; +} + +AWS_TEST_CASE(test_s3_asyncwrite_fails_if_write_after_eof, s_test_s3_asyncwrite_fails_if_write_after_eof) +static int s_test_s3_asyncwrite_fails_if_write_after_eof(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct asyncwrite_tester tester; + + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, PART_SIZE /*object_size*/)); + + /* Write the whole object, with EOF */ + ASSERT_SUCCESS(s_write(&tester, aws_byte_cursor_from_buf(&tester.source_buf), true /*eof*/)); + + /* Any more writes should fail with INVALID_STATE error */ + struct aws_byte_cursor empty_cursor = {0}; + struct aws_future_void *write_future = aws_s3_meta_request_write(tester.meta_request, empty_cursor, true /*eof*/); + ASSERT_TRUE(aws_future_void_wait(write_future, TIMEOUT_NANOS)); + ASSERT_INT_EQUALS(AWS_ERROR_INVALID_STATE, aws_future_void_get_error(write_future)); + write_future = aws_future_void_release(write_future); + + /* Done. Don't really care if the request completes successfully or not */ + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + return 0; +} + +AWS_TEST_CASE(test_s3_asyncwrite_fails_if_writes_overlap, s_test_s3_asyncwrite_fails_if_writes_overlap) +static int s_test_s3_asyncwrite_fails_if_writes_overlap(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct asyncwrite_tester tester; + + /* Make it VERY likely that some writes will overlap by issuing a lot of them as fast as possible */ + enum { num_writes = 100 }; + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, num_writes * PART_SIZE /*object_size*/)); + + bool had_overlapping_write = false; + struct aws_byte_cursor source_cursor = aws_byte_cursor_from_buf(&tester.source_buf); + bool eof = false; + while (!eof && !had_overlapping_write) { + struct aws_byte_cursor write_cursor = aws_byte_cursor_advance(&source_cursor, PART_SIZE); + eof = (source_cursor.len == 0); + struct aws_future_void *write_future = aws_s3_meta_request_write(tester.meta_request, write_cursor, eof); + int write_error_code = aws_future_void_is_done(write_future) ? aws_future_void_get_error(write_future) : 0; + aws_future_void_release(write_future); + + if (write_error_code != 0) { + /* INVALID_STATE is the error code for overlapping writes */ + ASSERT_INT_EQUALS(AWS_ERROR_INVALID_STATE, write_error_code); + had_overlapping_write = true; + } + } + + ASSERT_TRUE(had_overlapping_write); + + /* Any error from the write() call should result in the meta request terminating with INVALID_STATE error */ + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + ASSERT_INT_EQUALS(AWS_ERROR_INVALID_STATE, tester.test_results.finished_error_code); + + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + return 0; +} + +struct cancel_forces_completion_ctx { + aws_thread_id_t thread_id; + struct aws_mutex mutex; + struct aws_atomic_var write_completed_during_cancel; +}; + +static void s_write_callback_for_cancel_forces_completion_test(void *user_data) { + struct cancel_forces_completion_ctx *cancellation_ctx = user_data; + + /* Check if this callback is firing synchronously from within aws_s3_meta_request_cancel() + * (same thread we called cancel() on, and can't get the lock we held while calling cancel()) */ + if (aws_thread_current_thread_id() == cancellation_ctx->thread_id) { + if (aws_mutex_try_lock(&cancellation_ctx->mutex) == AWS_OP_ERR) { + aws_atomic_store_int(&cancellation_ctx->write_completed_during_cancel, 1); + } + } +} + +/* Test that aws_s3_meta_request_cancel() will synchronously complete any pending write-futures. + * This behavior is important for any Rust Future that wraps our write() call. + * The Rust future could be dropped without waiting for the aws-c-s3's write() to complete. + * Rust wants an easy way to guarantee memory won't be touched anymore, so we gave cancel() this power. */ +AWS_TEST_CASE(test_s3_asyncwrite_cancel_forces_completion, s_test_s3_asyncwrite_cancel_forces_completion) +static int s_test_s3_asyncwrite_cancel_forces_completion(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct cancel_forces_completion_ctx cancellation_ctx; + cancellation_ctx.thread_id = aws_thread_current_thread_id(); + aws_mutex_init(&cancellation_ctx.mutex); + aws_atomic_init_int(&cancellation_ctx.write_completed_during_cancel, 0); + + /* It's possible (but unlikely) that the write completes before cancel() is called, + * so we'll try this a bunch of times */ + bool write_completed_during_cancel = false; + for (size_t try_i = 0; try_i < 100 && !write_completed_during_cancel; ++try_i) { + /* Init loop */ + struct asyncwrite_tester tester; + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, PART_SIZE * 3 /*object_size*/)); + + /* Kick off write (large, so write takes a while to complete, + * and EOF=false so the cancel() is sure to kill the meta request */ + struct aws_future_void *write_future = + aws_s3_meta_request_write(tester.meta_request, aws_byte_cursor_from_buf(&tester.source_buf), false /*eof*/); + aws_future_void_register_callback( + write_future, s_write_callback_for_cancel_forces_completion_test, &cancellation_ctx); + + /* Call cancel() while holding a lock */ + aws_mutex_lock(&cancellation_ctx.mutex); + aws_s3_meta_request_cancel(tester.meta_request); + + /* We should be free to clean up the underlying memory after cancel() returns */ + aws_byte_buf_clean_up(&tester.source_buf); + + aws_mutex_unlock(&cancellation_ctx.mutex); + + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + + write_completed_during_cancel = aws_atomic_load_int(&cancellation_ctx.write_completed_during_cancel) != 0; + + if (write_completed_during_cancel || aws_future_void_get_error(write_future) != 0) { + /* Any write failures are unrelated to the write() call itself, so get this particular error */ + ASSERT_INT_EQUALS(AWS_ERROR_S3_REQUEST_HAS_COMPLETED, aws_future_void_get_error(write_future)); + } + + ASSERT_INT_EQUALS(AWS_ERROR_S3_CANCELED, tester.test_results.finished_error_code); + + aws_future_void_release(write_future); + + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + } + + ASSERT_TRUE(write_completed_during_cancel); + return 0; +} + +static int s_wait_for_sub_request_to_send( + struct asyncwrite_tester *tester, + enum aws_s3_request_type request_type, + uint64_t timeout) { + + uint64_t now; + ASSERT_SUCCESS(aws_high_res_clock_get_ticks(&now)); + const uint64_t timeout_timestamp = now + timeout; + const uint64_t sleep_between_checks = aws_timestamp_convert(100, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); + + bool request_sent = false; + while (!request_sent) { + aws_s3_tester_lock_synced_data(&tester->s3_tester); + for (size_t i = 0; i < aws_array_list_length(&tester->test_results.synced_data.metrics); ++i) { + struct aws_s3_request_metrics *metrics = NULL; + ASSERT_SUCCESS(aws_array_list_get_at(&tester->test_results.synced_data.metrics, (void **)&metrics, i)); + enum aws_s3_request_type request_type_i; + aws_s3_request_metrics_get_request_type(metrics, &request_type_i); + if (request_type_i == request_type) { + if (aws_s3_request_metrics_get_error_code(metrics) == 0) { + request_sent = true; + } + } + } + aws_s3_tester_unlock_synced_data(&tester->s3_tester); + + if (!request_sent) { + /* Check for timeout, then sleep a bit before checking again */ + ASSERT_SUCCESS(aws_high_res_clock_get_ticks(&now)); + ASSERT_TRUE( + now < timeout_timestamp, + "Timed out waiting for %s to be sent", + aws_s3_request_type_operation_name(request_type)); + aws_thread_current_sleep(sleep_between_checks); + } + } + return 0; +} + +/* Test that aws_s3_meta_request_cancel() will result in AbortMultipartUpload being sent. + * This is a regression test: once upon a time cancel() forgot to trigger the client's update(), + * and so the meta-request would hang until something else kicked the update loop. */ +AWS_TEST_CASE(test_s3_asyncwrite_cancel_sends_abort, s_test_s3_asyncwrite_cancel_sends_abort) +static int s_test_s3_asyncwrite_cancel_sends_abort(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct asyncwrite_tester tester; + ASSERT_SUCCESS(s_asyncwrite_tester_init(&tester, allocator, PART_SIZE * 3 /*object_size*/)); + + const uint64_t one_sec_in_nanos = aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + + /* Wait for StartMultipartUpload to be sent */ + ASSERT_SUCCESS(s_wait_for_sub_request_to_send( + &tester, AWS_S3_REQUEST_TYPE_CREATE_MULTIPART_UPLOAD, 10 * one_sec_in_nanos /*timeout*/)); + + /* Sleep a bit to ensure the client isn't doing anything, then cancel() */ + aws_thread_current_sleep(one_sec_in_nanos); + + aws_s3_meta_request_cancel(tester.meta_request); + + /* Wait for AbortMultipartUpload to be sent. + * Ugh if timeout is too long we risk some unrelated system updating the client and hiding this bug. + * But if timeout is too short CI will randomly fail on super slow system. */ + ASSERT_SUCCESS(s_wait_for_sub_request_to_send( + &tester, AWS_S3_REQUEST_TYPE_ABORT_MULTIPART_UPLOAD, 5 * one_sec_in_nanos /*timeout*/)); + + /* Wait for meta request to complete */ + aws_s3_tester_wait_for_meta_request_finish(&tester.s3_tester); + + ASSERT_INT_EQUALS(AWS_ERROR_S3_CANCELED, tester.test_results.finished_error_code); + + ASSERT_SUCCESS(s_asyncwrite_tester_clean_up(&tester)); + return 0; +} diff --git a/tests/s3_many_async_uploads_without_data_test.c b/tests/s3_many_async_uploads_without_data_test.c index a90c3165e..e75b1abe9 100644 --- a/tests/s3_many_async_uploads_without_data_test.c +++ b/tests/s3_many_async_uploads_without_data_test.c @@ -23,99 +23,17 @@ */ /* Number of simultaneous upload meta-requests to create */ -/* TODO: when we come up with a real fix, increase to 1000 for all cases. - * But for now the memory_limit_in_bytes limits us, and it has a different default value for 32 and 64 bit */ -#if SIZE_BITS == 32 -# define MANY_ASYNC_UPLOADS_COUNT 80 -#else -# define MANY_ASYNC_UPLOADS_COUNT 200 -#endif +#define MANY_ASYNC_UPLOADS_COUNT 200 /* Number of bytes each meta-request should upload (small so this this doesn't take forever) */ -#define MANY_ASYNC_UPLOADS_OBJECT_SIZE 1 +#define MANY_ASYNC_UPLOADS_OBJECT_SIZE 100 + +/* Bytes per write */ +#define MANY_ASYNC_UPLOADS_BYTES_PER_WRITE 10 /* How long to spend doing nothing, before assuming we're deadlocked */ #define SEND_DATA_TIMEOUT_NANOS ((uint64_t)AWS_TIMESTAMP_NANOS * 10) /* 10secs */ -/* Singleton struct for this test, containing anything touched by helper functions. - * Lock must be held while touching anything in here */ -static struct many_async_uploads_test_data { - struct aws_mutex mutex; - - /* This cvar is notified whenever async-input-stream read() is called - * (at least one index of async_buffers[] or async_futures[] will be non-null) */ - struct aws_condition_variable cvar; - - /* The main thread waits on the cvar until async-input-stream read() is - * called for this meta-request */ - int waiting_on_upload_i; - - /* For each upload i: dest buffer from any pending async-input-stream read() */ - struct aws_byte_buf *async_buffers[MANY_ASYNC_UPLOADS_COUNT]; - - /* For each upload i: future from any pending async-input-stream read() */ - struct aws_future_bool *async_futures[MANY_ASYNC_UPLOADS_COUNT]; - - /* For each upload i: bytes uploaded so far */ - uint64_t bytes_uploaded[MANY_ASYNC_UPLOADS_COUNT]; - -} s_many_async_uploads_test_data; - -/* async-input-stream for this test */ -struct many_async_uploads_stream { - struct aws_async_input_stream base; - int upload_i; -}; - -static void s_many_async_uploads_stream_destroy(struct aws_async_input_stream *stream) { - struct many_async_uploads_stream *stream_impl = stream->impl; - aws_mem_release(stream->alloc, stream_impl); -} - -static struct aws_future_bool *s_many_async_uploads_stream_read( - struct aws_async_input_stream *stream, - struct aws_byte_buf *dest) { - - struct many_async_uploads_stream *stream_impl = stream->impl; - struct aws_future_bool *future = aws_future_bool_new(stream->alloc); - struct many_async_uploads_test_data *test_data = &s_many_async_uploads_test_data; - - /* Store the buffer and future */ - aws_mutex_lock(&test_data->mutex); - - AWS_FATAL_ASSERT(test_data->async_buffers[stream_impl->upload_i] == NULL); - test_data->async_buffers[stream_impl->upload_i] = dest; - - AWS_FATAL_ASSERT(test_data->async_futures[stream_impl->upload_i] == NULL); - test_data->async_futures[stream_impl->upload_i] = aws_future_bool_acquire(future); - - /* Alert the main thread that it may complete this async read */ - aws_condition_variable_notify_all(&test_data->cvar); - aws_mutex_unlock(&s_many_async_uploads_test_data.mutex); - - return future; -} - -static const struct aws_async_input_stream_vtable s_many_async_uploads_stream_vtable = { - .destroy = s_many_async_uploads_stream_destroy, - .read = s_many_async_uploads_stream_read, -}; - -static struct aws_async_input_stream *s_many_async_uploads_stream_new(struct aws_allocator *allocator, int upload_i) { - struct many_async_uploads_stream *stream_impl = - aws_mem_calloc(allocator, 1, sizeof(struct many_async_uploads_stream)); - aws_async_input_stream_init_base(&stream_impl->base, allocator, &s_many_async_uploads_stream_vtable, stream_impl); - stream_impl->upload_i = upload_i; - return &stream_impl->base; -} - -/* Return true if the desired meta-request is able to send data */ -static bool s_waiting_on_upload_i_predicate(void *user_data) { - (void)user_data; - struct many_async_uploads_test_data *test_data = &s_many_async_uploads_test_data; - return test_data->async_buffers[test_data->waiting_on_upload_i] != NULL; -} - /* See top of file for full description of what's going on in this test. */ AWS_TEST_CASE(test_s3_many_async_uploads_without_data, s_test_s3_many_async_uploads_without_data) static int s_test_s3_many_async_uploads_without_data(struct aws_allocator *allocator, void *ctx) { @@ -130,18 +48,12 @@ static int s_test_s3_many_async_uploads_without_data(struct aws_allocator *alloc AWS_ZERO_STRUCT(client_options); ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client)); - struct many_async_uploads_test_data *test_data = &s_many_async_uploads_test_data; - aws_mutex_init(&test_data->mutex); - aws_condition_variable_init(&test_data->cvar); - - // struct aws_s3_meta_request *meta_requests[MANY_ASYNC_UPLOADS_COUNT]; + struct aws_s3_meta_request *meta_requests[MANY_ASYNC_UPLOADS_COUNT]; struct aws_s3_meta_request_test_results meta_request_test_results[MANY_ASYNC_UPLOADS_COUNT]; /* Create N upload meta-requests, each with an async-input-stream that * won't provide data until later in this test... */ for (int i = 0; i < MANY_ASYNC_UPLOADS_COUNT; ++i) { - struct aws_async_input_stream *async_stream = s_many_async_uploads_stream_new(allocator, i); - aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator); struct aws_string *host_name = @@ -168,66 +80,66 @@ static int s_test_s3_many_async_uploads_without_data(struct aws_allocator *alloc struct aws_s3_meta_request_options options = { .type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, .message = message, - .send_async_stream = async_stream, - /* TODO: come up with a real fix, this "internal_use_only" setting is just a temporary workaround. - * that lets us deal with 200+ "stalled" meta-requests. The client still deadlocks if you - * increase MANY_ASYNC_UPLOADS_COUNT to 1000. */ - .maximize_async_stream_reads_internal_use_only = true, + .send_using_async_writes = true, }; ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i])); - struct aws_s3_meta_request *meta_request = aws_s3_client_make_meta_request(client, &options); + meta_requests[i] = aws_s3_client_make_meta_request(client, &options); - /* Release stuff created in this loop. - * The s3_client will keep everything related to the meta-request alive until it completes */ + /* Release stuff created in this loop */ aws_string_destroy(host_name); aws_byte_buf_clean_up(&object_path); aws_http_message_release(message); - aws_async_input_stream_release(async_stream); - aws_s3_meta_request_release(meta_request); } /* Starting at the end, and working backwards, only provide data to one meta-request at a time. */ for (int i = MANY_ASYNC_UPLOADS_COUNT - 1; i >= 0; --i) { - bool upload_done = false; - - while (!upload_done) { - aws_mutex_lock(&test_data->mutex); - test_data->waiting_on_upload_i = i; - - /* Wait until meta-request i's async-input-stream read() is called */ - ASSERT_SUCCESS( - aws_condition_variable_wait_for_pred( - &test_data->cvar, - &test_data->mutex, - SEND_DATA_TIMEOUT_NANOS, - s_waiting_on_upload_i_predicate, - NULL), - "Timed out waiting to send data on upload %d/%d", - i + 1, - MANY_ASYNC_UPLOADS_COUNT); - /* OK, send data for meta-request i */ - struct aws_byte_buf *dest = test_data->async_buffers[i]; - test_data->async_buffers[i] = NULL; + struct aws_s3_meta_request *meta_request_i = meta_requests[i]; + + /* Perform sequential writes to meta_request_i, until EOF */ + size_t bytes_written = 0; + bool eof = false; + while (!eof) { + size_t bytes_to_write = + aws_min_size(MANY_ASYNC_UPLOADS_BYTES_PER_WRITE, MANY_ASYNC_UPLOADS_OBJECT_SIZE - bytes_written); - struct aws_future_bool *future = test_data->async_futures[i]; - test_data->async_futures[i] = NULL; + eof = (bytes_written + bytes_to_write) == MANY_ASYNC_UPLOADS_OBJECT_SIZE; - size_t space_available = dest->capacity - dest->len; - uint64_t bytes_remaining = MANY_ASYNC_UPLOADS_OBJECT_SIZE - test_data->bytes_uploaded[i]; - size_t bytes_to_send = (size_t)aws_min_u64(space_available, bytes_remaining); - ASSERT_TRUE(aws_byte_buf_write_u8_n(dest, 'z', bytes_to_send)); - test_data->bytes_uploaded[i] += bytes_to_send; - upload_done = test_data->bytes_uploaded[i] == MANY_ASYNC_UPLOADS_OBJECT_SIZE; - aws_mutex_unlock(&test_data->mutex); + /* use freshly allocated buffer for each write, so that we're likely to get memory violations + * if this data is used wrong internally. */ + struct aws_byte_buf tmp_data; + aws_byte_buf_init(&tmp_data, allocator, bytes_to_write); + aws_byte_buf_write_u8_n(&tmp_data, 'z', bytes_to_write); - aws_future_bool_set_result(future, upload_done); - aws_future_bool_release(future); + struct aws_future_void *write_future = + aws_s3_meta_request_write(meta_request_i, aws_byte_cursor_from_buf(&tmp_data), eof); + + ASSERT_TRUE( + aws_future_void_wait(write_future, SEND_DATA_TIMEOUT_NANOS), + "Timed out waiting to send data on upload %d/%d." + " After writing %zu bytes, timed out on write(data=%zu, eof=%d)", + i + 1, + MANY_ASYNC_UPLOADS_COUNT, + bytes_written, + bytes_to_write, + eof); + + /* write complete! */ + aws_byte_buf_clean_up(&tmp_data); + + ASSERT_INT_EQUALS(0, aws_future_void_get_error(write_future)); + aws_future_void_release(write_future); + + bytes_written += bytes_to_write; } } /* Wait for everything to finish */ + for (int i = 0; i < MANY_ASYNC_UPLOADS_COUNT; ++i) { + meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]); + } + aws_s3_tester_wait_for_meta_request_finish(&tester); aws_s3_tester_wait_for_meta_request_shutdown(&tester); @@ -239,8 +151,6 @@ static int s_test_s3_many_async_uploads_without_data(struct aws_allocator *alloc /* Cleanup */ aws_s3_client_release(client); aws_s3_tester_clean_up(&tester); - aws_condition_variable_clean_up(&test_data->cvar); - aws_mutex_clean_up(&test_data->mutex); return 0; }