diff --git a/docs/memory_aware_request_execution.md b/docs/memory_aware_request_execution.md new file mode 100644 index 000000000..9d2dd4934 --- /dev/null +++ b/docs/memory_aware_request_execution.md @@ -0,0 +1,76 @@ +CRT S3 client was designed with throughput as a primary goal. As such, the client +scales resource usage, such as number of parallel requests in flight, to achieve +target throughput. The client creates buffers to hold data it is sending or +receiving for each request and scaling requests in flight has direct impact on +memory used. In practice, setting high target throughput or larger part size can +lead to high observed memory usage. + +To mitigate high memory usages, memory reuse improvements were recently added to +the client along with options to limit max memory used. The following sections +will go into more detail on aspects of those changes and how the affect the +client. + +### Memory Reuse +At the basic level, CRT S3 client starts with a meta request for operation like +put or get, breaks it into smaller part-sized requests and executes those in +parallel. CRT S3 client used to allocate part sized buffer for each of those +requests and release it right after the request was done. That approach, +resulted in a lot of very short lived allocations and allocator thrashing, +overall leading to memory use spikes considerably higher than whats needed. To +address that, the client is switching to a pooled buffer approach, discussed +below. + +Note: approach described below is work in progress and concentrates on improving +the common cases (default 8mb part sizes and part sizes smaller than 64mb). + +Several observations about the client usage of buffers: +- Client does not automatically switch to buffers above default 8mb for upload, until + upload passes 10,000 parts (~80 GB). +- Get operations always use either the configured part size or default of 8mb. + Part size for get is not adjusted, since there is no 10,000 part limitation. +- Both Put and Get operations go through fill and drain phases. Ex. for Put, the + client first schedules a number of reads to 'fil' the buffers from the source + and as those reads complete, the buffer are send over to the networking layer + are 'drained' +- individual uploadParts or ranged gets operations typically have a similar + lifespan (with some caveats). in practice part buffers are acquired/released + in bulk at the same time + +The buffer pooling takes advantage of some of those allocation patterns and +works as follows. +The memory is split into primary and secondary areas. Secondary area is used for +requests with part size bigger than a predefined value (currently 4 times part size) +allocations from it got directly to allocator and are effectively old way of +doing things. + +Primary memory area is split into blocks of fixed size (part size if defined or +8mb if not times 16). Blocks are allocated on demand. Each block is logically +subdivided into part sized chunks. Pool allocates and releases in chunk sizes +only, and supports acquiring several chunks (up to 4) at once. + +Blocks are kept around while there are ongoing requests and are released async, +when there is low pressure on memory. + +### Scheduling +Running out of memory is a terminal condition within CRT and in general its not +practical to try to set overall memory limit on all allocations, since it +dramatically increases the complexity of the code that deals with cases where +only part of a memory was allocated for a task. + +Comparatively, majority of memory usage within S3 Client comes from buffers +allocated for Put/Get parts. So to control memory usage, the client will +concentrate on controlling the number of buffers allocated. Effectively, this +boils down to a back pressure mechanism of limiting the number of parts +scheduled as memory gets closer to the limit. Memory used for other resources, +ex. http connections data, various supporting structures, are not actively +controlled and instead some memory is taken out from overall limit. + +Overall, scheduling does a best-effort memory limiting. At the time of +scheduling, the client reserves memory by using buffer pool ticketing mechanism. +Buffer is acquired from the pool using the ticket as close to the usage as +possible (this approach peaks at lower mem usage than preallocating all mem +upfront because buffers cannot be used right away, ex reading from file will +fill buffers slower than they are sent, leading to decent amount of buffer reuse) +Reservation mechanism is approximate and in some cases can lead to actual memory +usage being higher once tickets are redeemed. The client reserves some memory to +mitigate overflows like that. diff --git a/include/aws/s3/private/s3_buffer_pool.h b/include/aws/s3/private/s3_buffer_pool.h new file mode 100644 index 000000000..431abea75 --- /dev/null +++ b/include/aws/s3/private/s3_buffer_pool.h @@ -0,0 +1,133 @@ +#ifndef AWS_S3_BUFFER_ALLOCATOR_H +#define AWS_S3_BUFFER_ALLOCATOR_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +/* + * S3 buffer pool. + * Buffer pool used for pooling part sized buffers for Put/Get operations. + * Provides additional functionally for limiting overall memory used. + * High-level buffer pool usage flow: + * - Create buffer with overall memory limit and common buffer size, aka chunk + * size (typically part size configured on client) + * - For each request: + * -- call reserve to acquire ticket for future buffer acquisition. this will + * mark memory reserved, but would not allocate it. if reserve call hits + * memory limit, it fails and reservation hold is put on the whole buffer + * pool. (aws_s3_buffer_pool_remove_reservation_hold can be used to remove + * reservation hold). + * -- once request needs memory, it can exchange ticket for a buffer using + * aws_s3_buffer_pool_acquire_buffer. this operation never fails, even if it + * ends up going over memory limit. + * -- buffer lifetime is tied to the ticket. so once request is done with the + * buffer, ticket is released and buffer returns back to the pool. + */ + +AWS_EXTERN_C_BEGIN + +struct aws_s3_buffer_pool; +struct aws_s3_buffer_pool_ticket; + +struct aws_s3_buffer_pool_usage_stats { + /* Effective Max memory limit. Memory limit value provided during construction minus + * buffer reserved for overhead of the pool */ + size_t mem_limit; + + /* How much mem is used in primary storage. includes memory used by blocks + * that are waiting on all allocs to release before being put back in circulation. */ + size_t primary_used; + /* Overall memory allocated for blocks. */ + size_t primary_allocated; + /* Reserved memory. Does not account for how that memory will map into + * blocks and in practice can be lower than used memory. */ + size_t primary_reserved; + /* Number of blocks allocated in primary. */ + size_t primary_num_blocks; + + /* Secondary mem used. Accurate, maps directly to base allocator. */ + size_t secondary_used; + /* Secondary mem reserved. Accurate, maps directly to base allocator. */ + size_t secondary_reserved; +}; + +/* + * Create new buffer pool. + * chunk_size - specifies the size of memory that will most commonly be acquired + * from the pool (typically part size). + * mem_limit - limit on how much mem buffer pool can use. once limit is hit, + * buffers can no longer be reserved from (reservation hold is placed on the pool). + * Returns buffer pool pointer on success and NULL on failure. + */ +AWS_S3_API struct aws_s3_buffer_pool *aws_s3_buffer_pool_new( + struct aws_allocator *allocator, + size_t chunk_size, + size_t mem_limit); + +/* + * Destroys buffer pool. + * Does nothing if buffer_pool is NULL. + */ +AWS_S3_API void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool); + +/* + * Reserves memory from the pool for later use. + * Best effort and can potentially reserve memory slightly over the limit. + * Reservation takes some memory out of the available pool, but does not + * allocate it right away. + * On success ticket will be returned. + * On failure NULL is returned, error is raised and reservation hold is placed + * on the buffer. Any further reservations while hold is active will fail. + * Remove reservation hold to unblock reservations. + */ +AWS_S3_API struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve( + struct aws_s3_buffer_pool *buffer_pool, + size_t size); + +/* + * Whether pool has a reservation hold. + */ +AWS_S3_API bool aws_s3_buffer_pool_has_reservation_hold(struct aws_s3_buffer_pool *buffer_pool); + +/* + * Remove reservation hold on pool. + */ +AWS_S3_API void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffer_pool); + +/* + * Trades in the ticket for a buffer. + * Cannot fail and can over allocate above mem limit if reservation was not accurate. + * Using the same ticket twice will return the same buffer. + * Buffer is only valid until the ticket is released. + */ +AWS_S3_API struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer( + struct aws_s3_buffer_pool *buffer_pool, + struct aws_s3_buffer_pool_ticket *ticket); + +/* + * Releases the ticket. + * Any buffers associated with the ticket are invalidated. + */ +AWS_S3_API void aws_s3_buffer_pool_release_ticket( + struct aws_s3_buffer_pool *buffer_pool, + struct aws_s3_buffer_pool_ticket *ticket); + +/* + * Get pool memory usage stats. + */ +AWS_S3_API struct aws_s3_buffer_pool_usage_stats aws_s3_buffer_pool_get_usage(struct aws_s3_buffer_pool *buffer_pool); + +/* + * Trims all unused mem from the pool. + * Warning: fairly slow operation, do not use in critical path. + * TODO: partial trimming? ex. only trim down to 50% of max? + */ +AWS_S3_API void aws_s3_buffer_pool_trim(struct aws_s3_buffer_pool *buffer_pool); + +AWS_EXTERN_C_END + +#endif /* AWS_S3_BUFFER_ALLOCATOR_H */ diff --git a/include/aws/s3/private/s3_client_impl.h b/include/aws/s3/private/s3_client_impl.h index d19f2ae22..8652d5095 100644 --- a/include/aws/s3/private/s3_client_impl.h +++ b/include/aws/s3/private/s3_client_impl.h @@ -196,6 +196,8 @@ struct aws_s3_upload_part_timeout_stats { struct aws_s3_client { struct aws_allocator *allocator; + struct aws_s3_buffer_pool *buffer_pool; + struct aws_s3_client_vtable *vtable; struct aws_ref_count ref_count; @@ -340,6 +342,9 @@ struct aws_s3_client { /* Task for processing requests from meta requests on connections. */ struct aws_task process_work_task; + /* Task for trimming buffer bool. */ + struct aws_task trim_buffer_pool_task; + /* Number of endpoints currently allocated. Used during clean up to know how many endpoints are still in * memory.*/ uint32_t num_endpoints_allocated; @@ -378,6 +383,9 @@ struct aws_s3_client { /* Number of requests currently being prepared. */ uint32_t num_requests_being_prepared; + + /* Whether or not work processing is currently scheduled. */ + uint32_t trim_buffer_pool_task_scheduled : 1; } threaded_data; }; diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index b141018fe..ae00a9f18 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -12,6 +12,7 @@ #include #include +#include #include struct aws_http_message; @@ -22,6 +23,7 @@ enum aws_s3_request_flags { AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS = 0x00000001, AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY = 0x00000002, AWS_S3_REQUEST_FLAG_ALWAYS_SEND = 0x00000004, + AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY = 0x00000008, }; /** @@ -112,6 +114,8 @@ struct aws_s3_request { * retried.*/ struct aws_byte_buf request_body; + struct aws_s3_buffer_pool_ticket *ticket; + /* Beginning range of this part. */ /* TODO currently only used by auto_range_get, could be hooked up to auto_range_put as well. */ uint64_t part_range_start; @@ -184,7 +188,10 @@ struct aws_s3_request { uint32_t record_response_headers : 1; /* When true, the response body buffer will be allocated in the size of a part. */ - uint32_t part_size_response_body : 1; + uint32_t has_part_size_response_body : 1; + + /* When true, the request body buffer will be allocated in the size of a part. */ + uint32_t has_part_size_request_body : 1; /* When true, this request is being tracked by the client for limiting the amount of in-flight-requests/stats. */ uint32_t tracked_by_client : 1; diff --git a/include/aws/s3/s3.h b/include/aws/s3/s3.h index d30ca9407..c931f1b15 100644 --- a/include/aws/s3/s3.h +++ b/include/aws/s3/s3.h @@ -41,6 +41,7 @@ enum aws_s3_errors { AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, AWS_ERROR_S3_FILE_MODIFIED, + AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT, AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID) }; diff --git a/include/aws/s3/s3_client.h b/include/aws/s3/s3_client.h index 9f7970a6a..71f95d75d 100644 --- a/include/aws/s3/s3_client.h +++ b/include/aws/s3/s3_client.h @@ -344,6 +344,9 @@ struct aws_s3_client_config { /* Throughput target in Gbps that we are trying to reach. */ double throughput_target_gbps; + /* How much memory can we use. */ + size_t memory_limit_in_bytes; + /* Retry strategy to use. If NULL, a default retry strategy will be used. */ struct aws_retry_strategy *retry_strategy; diff --git a/source/s3.c b/source/s3.c index 20237bf5a..4e8dd8347 100644 --- a/source/s3.c +++ b/source/s3.c @@ -41,6 +41,7 @@ static struct aws_error_info s_errors[] = { AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, "Request body length must match Content-Length header."), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, "RequestTimeTooSkewed error received from S3."), AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_FILE_MODIFIED, "The file was modified during upload."), + AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT, "Request was not created due to used memory exceeding memory limit."), }; /* clang-format on */ diff --git a/source/s3_auto_ranged_get.c b/source/s3_auto_ranged_get.c index df15aea1e..e338d7f4a 100644 --- a/source/s3_auto_ranged_get.c +++ b/source/s3_auto_ranged_get.c @@ -177,13 +177,21 @@ static bool s_s3_auto_ranged_get_update( meta_request, AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_HEAD_OBJECT, 0, - AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY); + AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS); request->discovers_object_size = true; auto_ranged_get->synced_data.head_object_sent = true; } } else if (auto_ranged_get->synced_data.num_parts_requested == 0) { + + struct aws_s3_buffer_pool_ticket *ticket = + aws_s3_buffer_pool_reserve(meta_request->client->buffer_pool, meta_request->part_size); + + if (ticket == NULL) { + goto has_work_remaining; + } + /* If we aren't using a head object, then discover the size of the object while trying to get the * first part. */ request = aws_s3_request_new( @@ -192,6 +200,7 @@ static bool s_s3_auto_ranged_get_update( 1, AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY); + request->ticket = ticket; request->part_range_start = 0; request->part_range_end = meta_request->part_size - 1; /* range-end is inclusive */ request->discovers_object_size = true; @@ -253,12 +262,21 @@ static bool s_s3_auto_ranged_get_update( auto_ranged_get->synced_data.read_window_warning_issued = 0; } + struct aws_s3_buffer_pool_ticket *ticket = + aws_s3_buffer_pool_reserve(meta_request->client->buffer_pool, meta_request->part_size); + + if (ticket == NULL) { + goto has_work_remaining; + } + request = aws_s3_request_new( meta_request, AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART, auto_ranged_get->synced_data.num_parts_requested + 1, AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY); + request->ticket = ticket; + aws_s3_get_part_range( auto_ranged_get->synced_data.object_range_start, auto_ranged_get->synced_data.object_range_end, @@ -412,10 +430,11 @@ static struct aws_future_void *s_s3_auto_ranged_get_prepare_request(struct aws_s /* Success! */ AWS_LOGF_DEBUG( AWS_LS_S3_META_REQUEST, - "id=%p: Created request %p for part %d", + "id=%p: Created request %p for part %d part sized %d", (void *)meta_request, (void *)request, - request->part_number); + request->part_number, + request->has_part_size_response_body); success = true; diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 78b6452f7..adcfb086f 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -556,29 +556,36 @@ static bool s_s3_auto_ranged_put_update( if (should_create_next_part_request) { - /* Allocate a request for another part. */ - request = aws_s3_request_new( - meta_request, - AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART, - 0, - AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS); + struct aws_s3_buffer_pool_ticket *ticket = + aws_s3_buffer_pool_reserve(meta_request->client->buffer_pool, meta_request->part_size); - request->part_number = auto_ranged_put->threaded_update_data.next_part_number; + if (ticket != NULL) { + /* Allocate a request for another part. */ + request = aws_s3_request_new( + meta_request, + AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART, + 0, + AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY); + + request->part_number = auto_ranged_put->threaded_update_data.next_part_number; - /* If request was previously uploaded, we prepare it to ensure checksums still match, - * but ultimately it gets marked no-op and we don't send it */ - request->was_previously_uploaded = request_previously_uploaded; + /* If request was previously uploaded, we prepare it to ensure checksums still match, + * but ultimately it gets marked no-op and we don't send it */ + request->was_previously_uploaded = request_previously_uploaded; - ++auto_ranged_put->threaded_update_data.next_part_number; - ++auto_ranged_put->synced_data.num_parts_started; - ++auto_ranged_put->synced_data.num_parts_pending_read; + request->ticket = ticket; - AWS_LOGF_DEBUG( - AWS_LS_S3_META_REQUEST, - "id=%p: Returning request %p for part %d", - (void *)meta_request, - (void *)request, - request->part_number); + ++auto_ranged_put->threaded_update_data.next_part_number; + ++auto_ranged_put->synced_data.num_parts_started; + ++auto_ranged_put->synced_data.num_parts_pending_read; + + AWS_LOGF_DEBUG( + AWS_LS_S3_META_REQUEST, + "id=%p: Returning request %p for part %d", + (void *)meta_request, + (void *)request, + request->part_number); + } goto has_work_remaining; } @@ -942,7 +949,12 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request * /* Read the body */ uint64_t offset = 0; size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset); - aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size); + if (request->request_body.capacity == 0) { + AWS_FATAL_ASSERT(request->ticket); + request->request_body = + aws_s3_buffer_pool_acquire_buffer(request->meta_request->client->buffer_pool, request->ticket); + request->request_body.capacity = request_body_size; + } part_prep->asyncstep_read_part = aws_s3_meta_request_read_body(meta_request, offset, &request->request_body); aws_future_bool_register_callback( @@ -970,17 +982,21 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { if (error_code != AWS_ERROR_SUCCESS) { AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, - "id=%p: Failed reading request body, error %d (%s)", + "id=%p: Failed reading request body, error %d (%s) req len %zu req cap %zu", (void *)meta_request, error_code, - aws_error_str(error_code)); + aws_error_str(error_code), + request->request_body.len, + request->request_body.capacity); goto on_done; } /* Reading succeeded. */ bool is_body_stream_at_end = aws_future_bool_get_result(part_prep->asyncstep_read_part); + uint64_t offset = 0; + size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset); /* If Content-Length is defined, check that we read the expected amount */ - if (has_content_length && (request->request_body.len < request->request_body.capacity)) { + if (has_content_length && (request->request_body.len < request_body_size)) { error_code = AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH; AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, diff --git a/source/s3_buffer_pool.c b/source/s3_buffer_pool.c new file mode 100644 index 000000000..2706c3267 --- /dev/null +++ b/source/s3_buffer_pool.c @@ -0,0 +1,418 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include +#include +#include + +/* + * S3 Buffer Pool. + * Fairly trivial implementation of "arena" style allocator. + * Note: current implementation is not optimized and instead tries to be + * as straightforward as possible. Given that pool manages a small number + * of big allocations, performance impact is not that bad, but something we need + * to look into on the next iteration. + * + * Basic approach is to divide acquires into primary and secondary. + * User provides chunk size during construction. Acquires below 4 * chunks_size + * are done from primary and the rest are from secondary. + * + * Primary storage consists of blocks that are each s_chunks_per_block * + * chunk_size in size. blocks are created on demand as needed. + * Acquire operation from primary basically works by determining how many chunks + * are needed and then finding available space in existing blocks or creating a + * new block. Acquire will always take over the whole chunk, so some space is + * likely wasted. + * Ex. say chunk_size is 8mb and s_chunks_per_block is 16, which makes block size 128mb. + * acquires up to 32mb will be done from primary. So 1 block can hold 4 buffers + * of 32mb (4 chunks) or 16 buffers of 8mb (1 chunk). If requested buffer size + * is 12mb, 2 chunks are used for acquire and 4mb will be wasted. + * Secondary storage delegates directly to system allocator. + */ + +struct aws_s3_buffer_pool_ticket { + size_t size; + uint8_t *ptr; + size_t chunks_used; +}; + +/* Default size for blocks array. Note: this is just for meta info, blocks + * themselves are not preallocated. */ +static size_t s_block_list_initial_capacity = 5; + +/* Amount of mem reserved for use outside of buffer pool. + * This is an optimistic upper bound on mem used as we dont track it. + * Covers both usage outside of pool, i.e. all allocations done as part of s3 + * client as well as any allocations overruns due to memory waste in the pool. */ +static const size_t s_buffer_pool_reserved_mem = MB_TO_BYTES(128); + +static const size_t s_chunks_per_block = 16; + +struct aws_s3_buffer_pool { + struct aws_allocator *base_allocator; + struct aws_mutex mutex; + + size_t block_size; + size_t chunk_size; + /* size at which allocations should go to secondary */ + size_t primary_size_cutoff; + + size_t mem_limit; + + bool has_reservation_hold; + + size_t primary_allocated; + size_t primary_reserved; + size_t primary_used; + + size_t secondary_reserved; + size_t secondary_used; + + struct aws_array_list blocks; +}; + +struct s3_buffer_pool_block { + size_t block_size; + uint8_t *block_ptr; + uint16_t alloc_bit_mask; +}; + +/* + * Sets n bits at position starting with LSB. + * Note: n must be at most 8, but in practice will always be at most 4. + * position + n should at most be 16 + */ +static inline uint16_t s_set_bits(uint16_t num, size_t position, size_t n) { + AWS_PRECONDITION(n <= 8); + AWS_PRECONDITION(position + n <= 16); + uint16_t mask = ((uint16_t)0x00FF) >> (8 - n); + return num | (mask << position); +} + +/* + * Clears n bits at position starting with LSB. + * Note: n must be at most 8, but in practice will always be at most 4. + * position + n should at most be 16 + */ +static inline uint16_t s_clear_bits(uint16_t num, size_t position, size_t n) { + AWS_PRECONDITION(n <= 8); + AWS_PRECONDITION(position + n <= 16); + uint16_t mask = ((uint16_t)0x00FF) >> (8 - n); + return num & ~(mask << position); +} + +/* + * Checks whether n bits are set at position starting with LSB. + * Note: n must be at most 8, but in practice will always be at most 4. + * position + n should at most be 16 + */ +static inline bool s_check_bits(uint16_t num, size_t position, size_t n) { + AWS_PRECONDITION(n <= 8); + AWS_PRECONDITION(position + n <= 16); + uint16_t mask = ((uint16_t)0x00FF) >> (8 - n); + return (num >> position) & mask; +} + +struct aws_s3_buffer_pool *aws_s3_buffer_pool_new( + struct aws_allocator *allocator, + size_t chunk_size, + size_t mem_limit) { + + if (mem_limit < GB_TO_BYTES(1)) { + AWS_LOGF_ERROR( + AWS_LS_S3_CLIENT, "Failed to initialize buffer pool. Min supported value for Memory Limit is 1GB."); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + if (!(chunk_size == 0 || (chunk_size > (1024) && chunk_size % 1024 == 0))) { + AWS_LOGF_ERROR( + AWS_LS_S3_CLIENT, + "Failed to initialize buffer pool. Chunk size must be either 0 or more than 1 KB and size must be 1 KB " + "aligned."); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + size_t adjusted_mem_lim = mem_limit - s_buffer_pool_reserved_mem; + + if (chunk_size * s_chunks_per_block > adjusted_mem_lim) { + AWS_LOGF_ERROR( + AWS_LS_S3_CLIENT, + "Failed to initialize buffer pool. Chunk size is too large for the memory limit. " + "Consider adjusting memory limit or part size."); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + return NULL; + } + + struct aws_s3_buffer_pool *buffer_pool = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_buffer_pool)); + + AWS_FATAL_ASSERT(buffer_pool != NULL); + + buffer_pool->base_allocator = allocator; + buffer_pool->chunk_size = chunk_size; + buffer_pool->block_size = s_chunks_per_block * chunk_size; + /* Somewhat arbitrary number. + * Tries to balance between how many allocations use buffer and buffer space + * being wasted. */ + buffer_pool->primary_size_cutoff = chunk_size * 4; + buffer_pool->mem_limit = adjusted_mem_lim; + int mutex_error = aws_mutex_init(&buffer_pool->mutex); + AWS_FATAL_ASSERT(mutex_error == AWS_OP_SUCCESS); + + aws_array_list_init_dynamic( + &buffer_pool->blocks, allocator, s_block_list_initial_capacity, sizeof(struct s3_buffer_pool_block)); + + return buffer_pool; +} + +void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool) { + if (buffer_pool == NULL) { + return; + } + + for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) { + struct s3_buffer_pool_block *block; + aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); + + AWS_FATAL_ASSERT(block->alloc_bit_mask == 0 && "Allocator still has outstanding blocks"); + aws_mem_release(buffer_pool->base_allocator, block->block_ptr); + } + + aws_array_list_clean_up(&buffer_pool->blocks); + + aws_mutex_clean_up(&buffer_pool->mutex); + struct aws_allocator *base = buffer_pool->base_allocator; + aws_mem_release(base, buffer_pool); +} + +void s_buffer_pool_trim_synced(struct aws_s3_buffer_pool *buffer_pool) { + for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks);) { + struct s3_buffer_pool_block *block; + aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); + + if (block->alloc_bit_mask == 0) { + aws_mem_release(buffer_pool->base_allocator, block->block_ptr); + aws_array_list_erase(&buffer_pool->blocks, i); + /* do not increment since we just released element */ + } else { + ++i; + } + } +} + +void aws_s3_buffer_pool_trim(struct aws_s3_buffer_pool *buffer_pool) { + aws_mutex_lock(&buffer_pool->mutex); + s_buffer_pool_trim_synced(buffer_pool); + aws_mutex_unlock(&buffer_pool->mutex); +} + +struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffer_pool *buffer_pool, size_t size) { + AWS_PRECONDITION(buffer_pool); + + if (buffer_pool->has_reservation_hold) { + return NULL; + } + + AWS_FATAL_ASSERT(size != 0); + AWS_FATAL_ASSERT(size <= buffer_pool->mem_limit); + + struct aws_s3_buffer_pool_ticket *ticket = NULL; + aws_mutex_lock(&buffer_pool->mutex); + + size_t overall_taken = buffer_pool->primary_used + buffer_pool->primary_reserved + buffer_pool->secondary_used + + buffer_pool->secondary_reserved; + + /* + * If we are allocating from secondary and there is unused space in + * primary, trim the primary in hopes we can free up enough memory. + * TODO: something smarter, like partial trim? + */ + if (size > buffer_pool->primary_size_cutoff && (size + overall_taken) > buffer_pool->mem_limit && + (buffer_pool->primary_allocated > + (buffer_pool->primary_used + buffer_pool->primary_reserved + buffer_pool->block_size))) { + s_buffer_pool_trim_synced(buffer_pool); + overall_taken = buffer_pool->primary_used + buffer_pool->primary_reserved + buffer_pool->secondary_used + + buffer_pool->secondary_reserved; + } + + if ((size + overall_taken) <= buffer_pool->mem_limit) { + ticket = aws_mem_calloc(buffer_pool->base_allocator, 1, sizeof(struct aws_s3_buffer_pool_ticket)); + ticket->size = size; + if (size <= buffer_pool->primary_size_cutoff) { + buffer_pool->primary_reserved += size; + } else { + buffer_pool->secondary_reserved += size; + } + } else { + buffer_pool->has_reservation_hold = true; + } + + aws_mutex_unlock(&buffer_pool->mutex); + + if (ticket == NULL) { + AWS_LOGF_TRACE( + AWS_LS_S3_CLIENT, + "Memory limit reached while trying to allocate buffer of size %zu. " + "Putting new buffer reservations on hold...", + size); + aws_raise_error(AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT); + } + return ticket; +} + +bool aws_s3_buffer_pool_has_reservation_hold(struct aws_s3_buffer_pool *buffer_pool) { + AWS_PRECONDITION(buffer_pool); + AWS_LOGF_TRACE(AWS_LS_S3_CLIENT, "Releasing buffer reservation hold."); + return buffer_pool->has_reservation_hold; +} + +void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffer_pool) { + AWS_PRECONDITION(buffer_pool); + buffer_pool->has_reservation_hold = false; +} + +static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool, size_t size, size_t *out_chunks_used) { + uint8_t *alloc_ptr = NULL; + + size_t chunks_needed = size / buffer_pool->chunk_size; + if (size % buffer_pool->chunk_size != 0) { + ++chunks_needed; /* round up */ + } + *out_chunks_used = chunks_needed; + + /* Look for space in existing blocks */ + for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) { + struct s3_buffer_pool_block *block; + aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); + + for (size_t chunk_i = 0; chunk_i < s_chunks_per_block - chunks_needed + 1; ++chunk_i) { + if (!s_check_bits(block->alloc_bit_mask, chunk_i, chunks_needed)) { + alloc_ptr = block->block_ptr + chunk_i * buffer_pool->chunk_size; + block->alloc_bit_mask = s_set_bits(block->alloc_bit_mask, chunk_i, chunks_needed); + goto on_allocated; + } + } + } + + /* No space available. Allocate new block. */ + struct s3_buffer_pool_block block; + block.alloc_bit_mask = s_set_bits(0, 0, chunks_needed); + block.block_ptr = aws_mem_acquire(buffer_pool->base_allocator, buffer_pool->block_size); + block.block_size = buffer_pool->block_size; + aws_array_list_push_back(&buffer_pool->blocks, &block); + alloc_ptr = block.block_ptr; + + buffer_pool->primary_allocated += buffer_pool->block_size; + +on_allocated: + buffer_pool->primary_reserved -= size; + buffer_pool->primary_used += size; + + return alloc_ptr; +} + +struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer( + struct aws_s3_buffer_pool *buffer_pool, + struct aws_s3_buffer_pool_ticket *ticket) { + AWS_PRECONDITION(buffer_pool); + AWS_PRECONDITION(ticket); + + if (ticket->ptr != NULL) { + return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size); + } + + uint8_t *alloc_ptr = NULL; + + aws_mutex_lock(&buffer_pool->mutex); + + if (ticket->size <= buffer_pool->primary_size_cutoff) { + alloc_ptr = s_primary_acquire_synced(buffer_pool, ticket->size, &ticket->chunks_used); + } else { + alloc_ptr = aws_mem_acquire(buffer_pool->base_allocator, ticket->size); + buffer_pool->secondary_reserved -= ticket->size; + buffer_pool->secondary_used += ticket->size; + } + + aws_mutex_unlock(&buffer_pool->mutex); + ticket->ptr = alloc_ptr; + + return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size); +} + +void aws_s3_buffer_pool_release_ticket( + struct aws_s3_buffer_pool *buffer_pool, + struct aws_s3_buffer_pool_ticket *ticket) { + + if (buffer_pool == NULL || ticket == NULL) { + return; + } + + if (ticket->ptr == NULL) { + /* Ticket was never used, make sure to clean up reserved count. */ + aws_mutex_lock(&buffer_pool->mutex); + if (ticket->size <= buffer_pool->primary_size_cutoff) { + buffer_pool->primary_reserved -= ticket->size; + } else { + buffer_pool->secondary_reserved -= ticket->size; + } + aws_mutex_unlock(&buffer_pool->mutex); + aws_mem_release(buffer_pool->base_allocator, ticket); + return; + } + + aws_mutex_lock(&buffer_pool->mutex); + if (ticket->size <= buffer_pool->primary_size_cutoff) { + + size_t chunks_used = ticket->size / buffer_pool->chunk_size; + if (ticket->size % buffer_pool->chunk_size != 0) { + ++chunks_used; /* round up */ + } + + bool found = false; + for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) { + struct s3_buffer_pool_block *block; + aws_array_list_get_at_ptr(&buffer_pool->blocks, (void **)&block, i); + + if (block->block_ptr <= ticket->ptr && block->block_ptr + block->block_size > ticket->ptr) { + size_t alloc_i = (ticket->ptr - block->block_ptr) / buffer_pool->chunk_size; + + block->alloc_bit_mask = s_clear_bits(block->alloc_bit_mask, alloc_i, chunks_used); + buffer_pool->primary_used -= ticket->size; + + found = true; + break; + } + } + + AWS_FATAL_ASSERT(found); + } else { + aws_mem_release(buffer_pool->base_allocator, ticket->ptr); + buffer_pool->secondary_used -= ticket->size; + } + + aws_mem_release(buffer_pool->base_allocator, ticket); + + aws_mutex_unlock(&buffer_pool->mutex); +} + +struct aws_s3_buffer_pool_usage_stats aws_s3_buffer_pool_get_usage(struct aws_s3_buffer_pool *buffer_pool) { + aws_mutex_lock(&buffer_pool->mutex); + + struct aws_s3_buffer_pool_usage_stats ret = (struct aws_s3_buffer_pool_usage_stats){ + .mem_limit = buffer_pool->mem_limit, + .primary_allocated = buffer_pool->primary_allocated, + .primary_used = buffer_pool->primary_used, + .primary_reserved = buffer_pool->primary_reserved, + .primary_num_blocks = aws_array_list_length(&buffer_pool->blocks), + .secondary_used = buffer_pool->secondary_used, + .secondary_reserved = buffer_pool->secondary_reserved, + }; + + aws_mutex_unlock(&buffer_pool->mutex); + return ret; +} diff --git a/source/s3_chunk_stream.c b/source/s3_chunk_stream.c index 14dea3664..40b9e80bf 100644 --- a/source/s3_chunk_stream.c +++ b/source/s3_chunk_stream.c @@ -70,12 +70,12 @@ static int s_set_post_chunk_stream(struct aws_chunk_stream *parent_stream) { struct aws_byte_cursor checksum_result_cursor = aws_byte_cursor_from_buf(&parent_stream->checksum_result); if (parent_stream->checksum_result_output && aws_byte_buf_init_copy_from_cursor( - parent_stream->checksum_result_output, aws_default_allocator(), checksum_result_cursor)) { + parent_stream->checksum_result_output, parent_stream->allocator, checksum_result_cursor)) { return AWS_OP_ERR; } if (aws_byte_buf_init( &parent_stream->post_chunk_buffer, - aws_default_allocator(), + parent_stream->allocator, final_chunk_cursor.len + parent_stream->checksum_header_name->len + colon_cursor.len + checksum_result_cursor.len + post_trailer_cursor.len)) { goto error; @@ -88,7 +88,7 @@ static int s_set_post_chunk_stream(struct aws_chunk_stream *parent_stream) { goto error; } struct aws_byte_cursor post_chunk_cursor = aws_byte_cursor_from_buf(&parent_stream->post_chunk_buffer); - parent_stream->current_stream = aws_input_stream_new_from_cursor(aws_default_allocator(), &post_chunk_cursor); + parent_stream->current_stream = aws_input_stream_new_from_cursor(parent_stream->allocator, &post_chunk_cursor); parent_stream->set_current_stream_fn = s_set_null_stream; return AWS_OP_SUCCESS; error: diff --git a/source/s3_client.c b/source/s3_client.c index 64c3f0138..0d071c212 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -5,6 +5,7 @@ #include "aws/s3/private/s3_auto_ranged_get.h" #include "aws/s3/private/s3_auto_ranged_put.h" +#include "aws/s3/private/s3_buffer_pool.h" #include "aws/s3/private/s3_client_impl.h" #include "aws/s3/private/s3_copy_object.h" #include "aws/s3/private/s3_default_meta_request.h" @@ -80,6 +81,9 @@ static size_t s_dns_host_address_ttl_seconds = 5 * 60; * 30 seconds mirrors the value currently used by the Java SDK. */ static const uint32_t s_default_throughput_failure_interval_seconds = 30; +/* Amount of time spent idling before trimming buffer. */ +static const size_t s_buffer_pool_trim_time_offset_in_s = 5; + /* Called when ref count is 0. */ static void s_s3_client_start_destroy(void *user_data); @@ -257,12 +261,58 @@ struct aws_s3_client *aws_s3_client_new( struct aws_s3_client *client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client)); client->allocator = allocator; + + size_t mem_limit = 0; + if (client_config->memory_limit_in_bytes == 0) { +#if SIZE_BITS == 32 + if (client_config->throughput_target_gbps > 25.0) { + mem_limit = GB_TO_BYTES(2); + } else { + mem_limit = GB_TO_BYTES(1); + } +#else + if (client_config->throughput_target_gbps > 75.0) { + mem_limit = GB_TO_BYTES(8); + } else if (client_config->throughput_target_gbps > 25.0) { + mem_limit = GB_TO_BYTES(4); + } else { + mem_limit = GB_TO_BYTES(2); + } +#endif + } else { + mem_limit = client_config->memory_limit_in_bytes; + } + + size_t part_size; + if (client_config->part_size != 0) { + part_size = (size_t)client_config->part_size; + } else { + part_size = s_default_part_size; + } + + client->buffer_pool = aws_s3_buffer_pool_new(allocator, part_size, mem_limit); + + if (client->buffer_pool == NULL) { + goto on_early_fail; + } + + struct aws_s3_buffer_pool_usage_stats pool_usage = aws_s3_buffer_pool_get_usage(client->buffer_pool); + + if (client_config->max_part_size > pool_usage.mem_limit) { + AWS_LOGF_ERROR( + AWS_LS_S3_CLIENT, + "Cannot create client from client_config; configured max part size should not exceed memory limit." + "size."); + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + goto on_early_fail; + } + client->vtable = &s_s3_client_default_vtable; aws_ref_count_init(&client->ref_count, client, (aws_simple_completion_callback *)s_s3_client_start_destroy); if (aws_mutex_init(&client->synced_data.lock) != AWS_OP_SUCCESS) { - goto lock_init_fail; + goto on_early_fail; } aws_linked_list_init(&client->synced_data.pending_meta_request_work); @@ -293,17 +343,18 @@ struct aws_s3_client *aws_s3_client_new( /* Make a copy of the region string. */ client->region = aws_string_new_from_array(allocator, client_config->region.ptr, client_config->region.len); - if (client_config->part_size != 0) { - *((size_t *)&client->part_size) = (size_t)client_config->part_size; - } else { - *((size_t *)&client->part_size) = s_default_part_size; - } + *((size_t *)&client->part_size) = part_size; if (client_config->max_part_size != 0) { *((uint64_t *)&client->max_part_size) = client_config->max_part_size; } else { *((uint64_t *)&client->max_part_size) = s_default_max_part_size; } + + if (client_config->max_part_size > pool_usage.mem_limit) { + *((uint64_t *)&client->max_part_size) = pool_usage.mem_limit; + } + if (client->max_part_size > SIZE_MAX) { /* For the 32bit max part size to be SIZE_MAX */ *((uint64_t *)&client->max_part_size) = SIZE_MAX; @@ -488,7 +539,7 @@ struct aws_s3_client *aws_s3_client_new( aws_event_loop_group_release(client->client_bootstrap->event_loop_group); aws_client_bootstrap_release(client->client_bootstrap); aws_mutex_clean_up(&client->synced_data.lock); -lock_init_fail: +on_early_fail: aws_mem_release(client->allocator, client); return NULL; } @@ -551,6 +602,10 @@ static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) { AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client); + if (client->threaded_data.trim_buffer_pool_task_scheduled) { + aws_event_loop_cancel_task(client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task); + } + aws_string_destroy(client->region); client->region = NULL; @@ -588,6 +643,7 @@ static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) { aws_s3_client_shutdown_complete_callback_fn *shutdown_callback = client->shutdown_callback; void *shutdown_user_data = client->shutdown_callback_user_data; + aws_s3_buffer_pool_destroy(client->buffer_pool); aws_mem_release(client->allocator, client); client = NULL; @@ -1130,6 +1186,57 @@ static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_clien client->synced_data.process_work_task_scheduled = true; } +/* Task function for trying to find a request that can be processed. */ +static void s_s3_client_trim_buffer_pool_task(struct aws_task *task, void *arg, enum aws_task_status task_status) { + AWS_PRECONDITION(task); + (void)task; + (void)task_status; + + if (task_status != AWS_TASK_STATUS_RUN_READY) { + return; + } + + struct aws_s3_client *client = arg; + AWS_PRECONDITION(client); + + client->threaded_data.trim_buffer_pool_task_scheduled = false; + + uint32_t num_reqs_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight); + + if (num_reqs_in_flight == 0) { + aws_s3_buffer_pool_trim(client->buffer_pool); + } +} + +static void s_s3_client_schedule_buffer_pool_trim_synced(struct aws_s3_client *client) { + ASSERT_SYNCED_DATA_LOCK_HELD(client); + + if (client->threaded_data.trim_buffer_pool_task_scheduled) { + return; + } + + uint32_t num_reqs_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight); + if (num_reqs_in_flight > 0) { + return; + } + + aws_task_init( + &client->synced_data.trim_buffer_pool_task, + s_s3_client_trim_buffer_pool_task, + client, + "s3_client_buffer_pool_trim_task"); + + uint64_t trim_time = 0; + aws_event_loop_current_clock_time(client->process_work_event_loop, &trim_time); + trim_time += + aws_timestamp_convert(s_buffer_pool_trim_time_offset_in_s, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + + aws_event_loop_schedule_task_future( + client->process_work_event_loop, &client->synced_data.trim_buffer_pool_task, trim_time); + + client->threaded_data.trim_buffer_pool_task_scheduled = true; +} + void aws_s3_client_schedule_process_work(struct aws_s3_client *client) { AWS_PRECONDITION(client); @@ -1193,6 +1300,10 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) { client->synced_data.process_work_task_scheduled = false; client->synced_data.process_work_task_in_progress = true; + if (client->synced_data.active) { + s_s3_client_schedule_buffer_pool_trim_synced(client); + } + aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work); uint32_t num_requests_queued = @@ -1383,6 +1494,8 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) { const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags); + aws_s3_buffer_pool_remove_reservation_hold(client->buffer_pool); + for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) { /* While: @@ -1425,6 +1538,9 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) { struct aws_s3_request *request = NULL; /* Try to grab the next request from the meta request. */ + /* TODO: should we bail out if request fails to update due to mem or + * continue going and hopping that following reqs can fit into mem? + * check if avail space is at least part size? */ bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request); if (work_remaining) { @@ -1860,7 +1976,6 @@ void aws_s3_client_notify_connection_finished( } if (connection->request != NULL) { - connection->request = aws_s3_request_release(connection->request); } @@ -2057,7 +2172,7 @@ struct aws_byte_cursor aws_s3_meta_request_resume_token_upload_id( static uint64_t s_upload_timeout_threshold_ns = 5000000000; /* 5 Secs */ const size_t g_expect_timeout_offset_ms = - 700; /* 0.7 Secs. From experienments on c5n.18xlarge machine for 30 GiB upload, it gave us best performance. */ + 700; /* 0.7 Secs. From experiments on c5n.18xlarge machine for 30 GiB upload, it gave us best performance. */ /** * The upload timeout optimization: explained. diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 44f7b15b7..12639aa31 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -102,8 +102,8 @@ static int s_meta_request_get_response_headers_checksum_callback( if (header_sum.len == encoded_len - 1) { /* encoded_len includes the nullptr length. -1 is the expected length. */ aws_byte_buf_init_copy_from_cursor( - &meta_request->meta_request_level_response_header_checksum, aws_default_allocator(), header_sum); - meta_request->meta_request_level_running_response_sum = aws_checksum_new(aws_default_allocator(), i); + &meta_request->meta_request_level_response_header_checksum, meta_request->allocator, header_sum); + meta_request->meta_request_level_running_response_sum = aws_checksum_new(meta_request->allocator, i); } break; } @@ -151,10 +151,10 @@ static void s_meta_request_get_response_finish_checksum_callback( /* what error should I raise for these? */ aws_base64_compute_encoded_len( meta_request->meta_request_level_running_response_sum->digest_size, &encoded_checksum_len); - aws_byte_buf_init(&encoded_response_body_sum, aws_default_allocator(), encoded_checksum_len); + aws_byte_buf_init(&encoded_response_body_sum, meta_request->allocator, encoded_checksum_len); aws_byte_buf_init( &response_body_sum, - aws_default_allocator(), + meta_request->allocator, meta_request->meta_request_level_running_response_sum->digest_size); aws_checksum_finalize(meta_request->meta_request_level_running_response_sum, &response_body_sum, 0); struct aws_byte_cursor response_body_sum_cursor = aws_byte_cursor_from_buf(&response_body_sum); @@ -962,8 +962,8 @@ static void s_get_part_response_headers_checksum_helper( aws_base64_compute_encoded_len(aws_get_digest_size_from_algorithm(i), &encoded_len); if (header_sum.len == encoded_len - 1) { aws_byte_buf_init_copy_from_cursor( - &connection->request->request_level_response_header_checksum, aws_default_allocator(), header_sum); - connection->request->request_level_running_response_sum = aws_checksum_new(aws_default_allocator(), i); + &connection->request->request_level_response_header_checksum, meta_request->allocator, header_sum); + connection->request->request_level_running_response_sum = aws_checksum_new(meta_request->allocator, i); } break; } @@ -990,9 +990,9 @@ static void s_get_response_part_finish_checksum_helper(struct aws_s3_connection size_t encoded_checksum_len = 0; request->did_validate = true; aws_base64_compute_encoded_len(request->request_level_running_response_sum->digest_size, &encoded_checksum_len); - aws_byte_buf_init(&encoded_response_body_sum, aws_default_allocator(), encoded_checksum_len); + aws_byte_buf_init(&encoded_response_body_sum, request->allocator, encoded_checksum_len); aws_byte_buf_init( - &response_body_sum, aws_default_allocator(), request->request_level_running_response_sum->digest_size); + &response_body_sum, request->allocator, request->request_level_running_response_sum->digest_size); aws_checksum_finalize(request->request_level_running_response_sum, &response_body_sum, 0); struct aws_byte_cursor response_body_sum_cursor = aws_byte_cursor_from_buf(&response_body_sum); aws_base64_encode(&response_body_sum_cursor, &encoded_response_body_sum); @@ -1084,6 +1084,14 @@ static int s_s3_meta_request_incoming_headers( return AWS_OP_SUCCESS; } +/* + * Small helper to either do a static or dynamic append. + * TODO: something like this would be useful in common. + */ +static int s_response_body_append(bool is_dynamic, struct aws_byte_buf *buf, const struct aws_byte_cursor *data) { + return is_dynamic ? aws_byte_buf_append_dynamic(buf, data) : aws_byte_buf_append(buf, data); +} + static int s_s3_meta_request_incoming_body( struct aws_http_stream *stream, const struct aws_byte_cursor *data, @@ -1117,17 +1125,19 @@ static int s_s3_meta_request_incoming_body( } if (request->send_data.response_body.capacity == 0) { - size_t buffer_size = s_dynamic_body_initial_buf_size; - - if (request->part_size_response_body) { - buffer_size = meta_request->part_size; + if (request->has_part_size_response_body) { + AWS_FATAL_ASSERT(request->ticket); + request->send_data.response_body = + aws_s3_buffer_pool_acquire_buffer(request->meta_request->client->buffer_pool, request->ticket); + } else { + size_t buffer_size = s_dynamic_body_initial_buf_size; + aws_byte_buf_init(&request->send_data.response_body, meta_request->allocator, buffer_size); } - - aws_byte_buf_init(&request->send_data.response_body, meta_request->allocator, buffer_size); } - if (aws_byte_buf_append_dynamic(&request->send_data.response_body, data)) { - + /* Note: not having part sized response body means the buffer is dynamic and + * can grow. */ + if (s_response_body_append(!request->has_part_size_response_body, &request->send_data.response_body, data)) { AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, "id=%p: Request %p could not append to response body due to error %d (%s)", diff --git a/source/s3_platform_info.c b/source/s3_platform_info.c index efa192a98..84e87aa80 100644 --- a/source/s3_platform_info.c +++ b/source/s3_platform_info.c @@ -74,11 +74,15 @@ static struct aws_s3_platform_info s_c5n_9xlarge_platform_info = { /****** End c5n.9large *****/ /***** Begin p4d.24xlarge and p4de.24xlarge ****/ -static struct aws_byte_cursor s_p4d_socket1_array[] = {AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth0"), - AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth1")}; +static struct aws_byte_cursor s_p4d_socket1_array[] = { + AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth0"), + AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth1"), +}; -static struct aws_byte_cursor s_p4d_socket2_array[] = {AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth2"), - AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth3")}; +static struct aws_byte_cursor s_p4d_socket2_array[] = { + AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth2"), + AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("eth3"), +}; static struct aws_s3_cpu_group_info s_p4d_cpu_group_info_array[] = { { diff --git a/source/s3_request.c b/source/s3_request.c index 055f6e1b9..d1f91f218 100644 --- a/source/s3_request.c +++ b/source/s3_request.c @@ -31,7 +31,8 @@ struct aws_s3_request *aws_s3_request_new( request->request_tag = request_tag; request->part_number = part_number; request->record_response_headers = (flags & AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS) != 0; - request->part_size_response_body = (flags & AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY) != 0; + request->has_part_size_response_body = (flags & AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY) != 0; + request->has_part_size_request_body = (flags & AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY) != 0; request->always_send = (flags & AWS_S3_REQUEST_FLAG_ALWAYS_SEND) != 0; return request; @@ -122,6 +123,7 @@ static void s_s3_request_destroy(void *user_data) { aws_s3_request_clean_up_send_data(request); aws_byte_buf_clean_up(&request->request_body); + aws_s3_buffer_pool_release_ticket(request->meta_request->client->buffer_pool, request->ticket); aws_s3_meta_request_release(request->meta_request); aws_mem_release(request->allocator, request); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d2d73e25a..ec59e1285 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -294,6 +294,12 @@ endif() add_test_case(parallel_read_stream_from_file_sanity_test) add_test_case(parallel_read_stream_from_large_file_test) +add_test_case(test_s3_buffer_pool_threaded_allocs_and_frees) +add_test_case(test_s3_buffer_pool_limits) +add_test_case(test_s3_buffer_pool_trim) +add_test_case(test_s3_buffer_pool_reservation_hold) +add_net_test_case(test_s3_put_object_buffer_pool_trim) + add_test_case(client_update_upload_part_timeout) set(TEST_BINARY_NAME ${PROJECT_NAME}-tests) diff --git a/tests/s3_buffer_pool_tests.c b/tests/s3_buffer_pool_tests.c new file mode 100644 index 000000000..9b4cad64d --- /dev/null +++ b/tests/s3_buffer_pool_tests.c @@ -0,0 +1,188 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include + +#include +#include +#include + +#define NUM_TEST_ALLOCS 100 +#define NUM_TEST_THREADS 8 + +struct pool_thread_test_data { + struct aws_s3_buffer_pool *pool; + uint32_t thread_idx; +}; + +static void s_thread_test(struct aws_allocator *allocator, void (*thread_fn)(void *), struct aws_s3_buffer_pool *pool) { + const struct aws_thread_options *thread_options = aws_default_thread_options(); + struct aws_thread threads[NUM_TEST_THREADS]; + struct pool_thread_test_data thread_data[NUM_TEST_THREADS]; + AWS_ZERO_ARRAY(threads); + AWS_ZERO_ARRAY(thread_data); + for (size_t thread_idx = 0; thread_idx < AWS_ARRAY_SIZE(threads); ++thread_idx) { + struct aws_thread *thread = &threads[thread_idx]; + aws_thread_init(thread, allocator); + struct pool_thread_test_data *data = &thread_data[thread_idx]; + data->pool = pool; + data->thread_idx = (uint32_t)thread_idx; + aws_thread_launch(thread, thread_fn, data, thread_options); + } + + for (size_t thread_idx = 0; thread_idx < AWS_ARRAY_SIZE(threads); ++thread_idx) { + struct aws_thread *thread = &threads[thread_idx]; + aws_thread_join(thread); + } +} + +static void s_threaded_alloc_worker(void *user_data) { + struct aws_s3_buffer_pool *pool = ((struct pool_thread_test_data *)user_data)->pool; + + struct aws_s3_buffer_pool_ticket *tickets[NUM_TEST_ALLOCS]; + for (size_t count = 0; count < NUM_TEST_ALLOCS / NUM_TEST_THREADS; ++count) { + size_t size = 8 * 1024 * 1024; + struct aws_s3_buffer_pool_ticket *ticket = aws_s3_buffer_pool_reserve(pool, size); + AWS_FATAL_ASSERT(ticket); + + struct aws_byte_buf buf = aws_s3_buffer_pool_acquire_buffer(pool, ticket); + AWS_FATAL_ASSERT(buf.buffer); + memset(buf.buffer, 0, buf.capacity); + tickets[count] = ticket; + } + + for (size_t count = 0; count < NUM_TEST_ALLOCS / NUM_TEST_THREADS; ++count) { + aws_s3_buffer_pool_release_ticket(pool, tickets[count]); + } +} + +static int s_test_s3_buffer_pool_threaded_allocs_and_frees(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + (void)ctx; + + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(2)); + + s_thread_test(allocator, s_threaded_alloc_worker, buffer_pool); + + aws_s3_buffer_pool_destroy(buffer_pool); + + return 0; +} +AWS_TEST_CASE(test_s3_buffer_pool_threaded_allocs_and_frees, s_test_s3_buffer_pool_threaded_allocs_and_frees) + +static int s_test_s3_buffer_pool_limits(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + (void)ctx; + + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); + + struct aws_s3_buffer_pool_ticket *ticket1 = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(64)); + ASSERT_NOT_NULL(ticket1); + struct aws_byte_buf buf1 = aws_s3_buffer_pool_acquire_buffer(buffer_pool, ticket1); + ASSERT_NOT_NULL(buf1.buffer); + + struct aws_s3_buffer_pool_ticket *tickets[6]; + for (size_t i = 0; i < 6; ++i) { + tickets[i] = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(128)); + ASSERT_NOT_NULL(tickets[i]); + struct aws_byte_buf buf = aws_s3_buffer_pool_acquire_buffer(buffer_pool, tickets[i]); + ASSERT_NOT_NULL(buf.buffer); + } + + ASSERT_NULL(aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(128))); + ASSERT_NULL(aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(96))); + + aws_s3_buffer_pool_remove_reservation_hold(buffer_pool); + struct aws_s3_buffer_pool_ticket *ticket2 = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(32)); + ASSERT_NOT_NULL(ticket2); + struct aws_byte_buf buf2 = aws_s3_buffer_pool_acquire_buffer(buffer_pool, ticket2); + ASSERT_NOT_NULL(buf2.buffer); + + for (size_t i = 0; i < 6; ++i) { + aws_s3_buffer_pool_release_ticket(buffer_pool, tickets[i]); + } + + aws_s3_buffer_pool_release_ticket(buffer_pool, ticket1); + aws_s3_buffer_pool_release_ticket(buffer_pool, ticket2); + + aws_s3_buffer_pool_destroy(buffer_pool); + + return 0; +} +AWS_TEST_CASE(test_s3_buffer_pool_limits, s_test_s3_buffer_pool_limits) + +static int s_test_s3_buffer_pool_trim(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + (void)ctx; + + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); + + struct aws_s3_buffer_pool_ticket *tickets[40]; + for (size_t i = 0; i < 40; ++i) { + tickets[i] = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(8)); + ASSERT_NOT_NULL(tickets[i]); + struct aws_byte_buf buf = aws_s3_buffer_pool_acquire_buffer(buffer_pool, tickets[i]); + ASSERT_NOT_NULL(buf.buffer); + } + + struct aws_s3_buffer_pool_usage_stats stats_before = aws_s3_buffer_pool_get_usage(buffer_pool); + + for (size_t i = 0; i < 20; ++i) { + aws_s3_buffer_pool_release_ticket(buffer_pool, tickets[i]); + } + + aws_s3_buffer_pool_trim(buffer_pool); + + struct aws_s3_buffer_pool_usage_stats stats_after = aws_s3_buffer_pool_get_usage(buffer_pool); + + ASSERT_TRUE(stats_before.primary_num_blocks > stats_after.primary_num_blocks); + + for (size_t i = 20; i < 40; ++i) { + aws_s3_buffer_pool_release_ticket(buffer_pool, tickets[i]); + } + + aws_s3_buffer_pool_destroy(buffer_pool); + + return 0; +}; +AWS_TEST_CASE(test_s3_buffer_pool_trim, s_test_s3_buffer_pool_trim) + +static int s_test_s3_buffer_pool_reservation_hold(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + (void)ctx; + + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); + + struct aws_s3_buffer_pool_ticket *tickets[112]; + for (size_t i = 0; i < 112; ++i) { + tickets[i] = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(8)); + ASSERT_NOT_NULL(tickets[i]); + struct aws_byte_buf buf = aws_s3_buffer_pool_acquire_buffer(buffer_pool, tickets[i]); + ASSERT_NOT_NULL(buf.buffer); + } + + ASSERT_NULL(aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(8))); + + ASSERT_TRUE(aws_s3_buffer_pool_has_reservation_hold(buffer_pool)); + + for (size_t i = 0; i < 112; ++i) { + aws_s3_buffer_pool_release_ticket(buffer_pool, tickets[i]); + } + + ASSERT_NULL(aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(8))); + + aws_s3_buffer_pool_remove_reservation_hold(buffer_pool); + + struct aws_s3_buffer_pool_ticket *ticket = aws_s3_buffer_pool_reserve(buffer_pool, MB_TO_BYTES(8)); + ASSERT_NOT_NULL(ticket); + + aws_s3_buffer_pool_release_ticket(buffer_pool, ticket); + + aws_s3_buffer_pool_destroy(buffer_pool); + + return 0; +}; +AWS_TEST_CASE(test_s3_buffer_pool_reservation_hold, s_test_s3_buffer_pool_reservation_hold) diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 9a64fdf6a..bcd03f293 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -377,6 +377,10 @@ static int s_test_s3_request_create_destroy(struct aws_allocator *allocator, voi struct aws_s3_meta_request *meta_request = aws_s3_tester_mock_meta_request_new(&tester); ASSERT_TRUE(meta_request != NULL); + struct aws_s3_client *client = aws_s3_tester_mock_client_new(&tester); + ASSERT_TRUE(client != NULL); + meta_request->client = aws_s3_client_acquire(client); + struct aws_http_message *request_message = aws_s3_tester_dummy_http_request_new(&tester); ASSERT_TRUE(request_message != NULL); @@ -407,6 +411,7 @@ static int s_test_s3_request_create_destroy(struct aws_allocator *allocator, voi aws_s3_request_release(request); aws_http_message_release(request_message); aws_s3_meta_request_release(meta_request); + aws_s3_client_release(client); aws_s3_tester_clean_up(&tester); @@ -600,6 +605,7 @@ static int s_test_s3_client_queue_requests(struct aws_allocator *allocator, void aws_linked_list_init(&mock_client->threaded_data.request_queue); struct aws_s3_meta_request *mock_meta_request = aws_s3_tester_mock_meta_request_new(&tester); + mock_meta_request->client = aws_s3_client_acquire(mock_client); struct aws_s3_request *pivot_request = aws_s3_request_new(mock_meta_request, 0, 0, 0); @@ -815,6 +821,7 @@ static int s_test_s3_update_meta_requests_trigger_prepare(struct aws_allocator * aws_linked_list_init(&mock_client->threaded_data.meta_requests); struct aws_s3_meta_request *mock_meta_request_without_work = aws_s3_tester_mock_meta_request_new(&tester); + mock_meta_request_without_work->client = aws_s3_client_acquire(mock_client); mock_meta_request_without_work->endpoint = aws_s3_tester_mock_endpoint_new(&tester); struct test_work_meta_request_update_user_data mock_meta_request_without_work_data = { @@ -836,6 +843,7 @@ static int s_test_s3_update_meta_requests_trigger_prepare(struct aws_allocator * aws_s3_meta_request_acquire(mock_meta_request_without_work); struct aws_s3_meta_request *mock_meta_request_with_work = aws_s3_tester_mock_meta_request_new(&tester); + mock_meta_request_with_work->client = aws_s3_client_acquire(mock_client); struct test_work_meta_request_update_user_data mock_meta_request_with_work_data = { .has_work_remaining = true, }; @@ -952,19 +960,6 @@ static int s_test_s3_client_update_connections_finish_result(struct aws_allocato struct aws_s3_tester tester; aws_s3_tester_init(allocator, &tester); - struct s3_test_update_connections_finish_result_user_data test_update_connections_finish_result_user_data; - AWS_ZERO_STRUCT(test_update_connections_finish_result_user_data); - - /* Put together a mock meta request that is finished. */ - struct aws_s3_meta_request *mock_meta_request = aws_s3_tester_mock_meta_request_new(&tester); - mock_meta_request->synced_data.finish_result_set = true; - mock_meta_request->user_data = &test_update_connections_finish_result_user_data; - mock_meta_request->endpoint = aws_s3_tester_mock_endpoint_new(&tester); - - struct aws_s3_meta_request_vtable *mock_meta_request_vtable = - aws_s3_tester_patch_meta_request_vtable(&tester, mock_meta_request, NULL); - mock_meta_request_vtable->finished_request = s_s3_test_meta_request_has_finish_result_finished_request; - struct aws_client_bootstrap mock_client_bootstrap; AWS_ZERO_STRUCT(mock_client_bootstrap); @@ -978,6 +973,20 @@ static int s_test_s3_client_update_connections_finish_result(struct aws_allocato aws_linked_list_init(&mock_client->threaded_data.request_queue); + struct s3_test_update_connections_finish_result_user_data test_update_connections_finish_result_user_data; + AWS_ZERO_STRUCT(test_update_connections_finish_result_user_data); + + /* Put together a mock meta request that is finished. */ + struct aws_s3_meta_request *mock_meta_request = aws_s3_tester_mock_meta_request_new(&tester); + mock_meta_request->client = aws_s3_client_acquire(mock_client); + mock_meta_request->synced_data.finish_result_set = true; + mock_meta_request->user_data = &test_update_connections_finish_result_user_data; + mock_meta_request->endpoint = aws_s3_tester_mock_endpoint_new(&tester); + + struct aws_s3_meta_request_vtable *mock_meta_request_vtable = + aws_s3_tester_patch_meta_request_vtable(&tester, mock_meta_request, NULL); + mock_meta_request_vtable->finished_request = s_s3_test_meta_request_has_finish_result_finished_request; + /* Verify that the request does not get sent because the meta request has finish-result. */ { struct aws_s3_request *request = aws_s3_request_new(mock_meta_request, 0, 0, 0); @@ -1844,6 +1853,53 @@ static int s_test_s3_put_object_less_than_part_size(struct aws_allocator *alloca return 0; } +AWS_TEST_CASE(test_s3_put_object_buffer_pool_trim, s_test_s3_put_object_buffer_pool_trim) +static int s_test_s3_put_object_buffer_pool_trim(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_s3_client_config client_config = { + .part_size = 8 * 1024 * 1024, + }; + + ASSERT_SUCCESS(aws_s3_tester_bind_client( + &tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + + struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + + ASSERT_TRUE(client != NULL); + + struct aws_s3_tester_meta_request_options put_options = { + .allocator = allocator, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .client = client, + .put_options = + { + .object_size_mb = 32, + }, + }; + + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL)); + + struct aws_s3_buffer_pool_usage_stats usage_before = aws_s3_buffer_pool_get_usage(client->buffer_pool); + + ASSERT_TRUE(0 != usage_before.primary_num_blocks); + + aws_thread_current_sleep(aws_timestamp_convert(6, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + + struct aws_s3_buffer_pool_usage_stats usage_after = aws_s3_buffer_pool_get_usage(client->buffer_pool); + + ASSERT_INT_EQUALS(0, usage_after.primary_num_blocks); + + client = aws_s3_client_release(client); + + aws_s3_tester_clean_up(&tester); + + return 0; +} + AWS_TEST_CASE( test_s3_put_object_less_than_part_size_with_content_encoding, s_test_s3_put_object_less_than_part_size_with_content_encoding) @@ -2206,7 +2262,7 @@ AWS_TEST_CASE( test_s3_put_large_object_no_content_length_with_checksum, s_test_s3_put_large_object_no_content_length_with_checksum) static int s_test_s3_put_large_object_no_content_length_with_checksum(struct aws_allocator *allocator, void *ctx) { - ASSERT_SUCCESS(s3_no_content_length_test_helper(allocator, ctx, 1280, true)); + ASSERT_SUCCESS(s3_no_content_length_test_helper(allocator, ctx, 128, true)); return 0; } @@ -3898,10 +3954,6 @@ static int s_test_s3_meta_request_default(struct aws_allocator *allocator, void ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS); aws_s3_tester_unlock_synced_data(&tester); - /* Check the size of the metrics should be the same as the number of requests, which should be 1 */ - ASSERT_UINT_EQUALS(1, aws_array_list_length(&meta_request_test_results.synced_data.metrics)); - struct aws_s3_request_metrics *metrics = NULL; - aws_array_list_back(&meta_request_test_results.synced_data.metrics, (void **)&metrics); ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(&meta_request_test_results, 0)); @@ -3909,6 +3961,19 @@ static int s_test_s3_meta_request_default(struct aws_allocator *allocator, void aws_s3_tester_wait_for_meta_request_shutdown(&tester); + /* + * TODO: telemetry is sent from request destructor, http threads hold on to + * req for a little bit after on_req_finished callback and its possible that + * telemetry callback will be invoked after meta reqs on_finished callback. + * Moving the telemetry check to after meta req shutdown callback. Need to + * figure out whether current behavior can be improved. + */ + /* Check the size of the metrics should be the same as the number of + requests, which should be 1 */ + ASSERT_UINT_EQUALS(1, aws_array_list_length(&meta_request_test_results.synced_data.metrics)); + struct aws_s3_request_metrics *metrics = NULL; + aws_array_list_back(&meta_request_test_results.synced_data.metrics, (void **)&metrics); + aws_s3_meta_request_test_results_clean_up(&meta_request_test_results); aws_http_message_release(message); diff --git a/tests/s3_tester.c b/tests/s3_tester.c index 8410ea139..a3425cf5f 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -787,6 +787,7 @@ static void s_s3_mock_client_start_destroy(void *user_data) { struct aws_s3_client *client = user_data; AWS_ASSERT(client); + aws_s3_buffer_pool_destroy(client->buffer_pool); aws_mem_release(client->allocator, client); } @@ -795,6 +796,7 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester struct aws_s3_client *mock_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client)); mock_client->allocator = allocator; + mock_client->buffer_pool = aws_s3_buffer_pool_new(allocator, MB_TO_BYTES(8), GB_TO_BYTES(1)); mock_client->vtable = &g_aws_s3_client_mock_vtable; aws_ref_count_init(