Skip to content

Commit

Permalink
Progress callbacks fire for all meta-request types (#344)
Browse files Browse the repository at this point in the history
**Description of changes:**
- progress_callback is now invoked for all meta-request types.
- Previously, it was only invoked for multi-part PUT and multi-part
COPY. It wasn't invoked when PUT or COPY was done as a single-part
operation, and wasn't invoked for any type of GET.
- progress_callbacks now fire sequentially, and do not overlap with the
meta-request's other callbacks.
- Previously, progress_callbacks could fire on different threads and
overlap with each other (and overlap the body_callback).
- TODO: telemetry_callback can still overlap with other callbacks. We
should give it the same treatment.

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
graebm authored Aug 25, 2023
1 parent ea0b980 commit a0ee6a9
Show file tree
Hide file tree
Showing 17 changed files with 490 additions and 230 deletions.
4 changes: 3 additions & 1 deletion include/aws/s3/private/s3_auto_ranged_get.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ struct aws_s3_auto_ranged_get {
uint64_t object_range_start;

/* The last byte of the data that will be retrieved from the object.
* (ignore this if object_range_empty) */
* (ignore this if object_range_empty)
* Note this is inclusive: https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
* So if begin=0 and end=0 then 1 byte is being downloaded. */
uint64_t object_range_end;

/* The total number of parts that are being used in downloading the object range. Note that "part" here
Expand Down
56 changes: 52 additions & 4 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ struct aws_s3_prepare_request_payload {
void *user_data;
};

/* An event to be delivered on the meta-request's io_event_loop thread. */
struct aws_s3_meta_request_event {
enum aws_s3_meta_request_event_type {
AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */
AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */
/* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */
} type;

union {
/* data for AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY */
struct {
struct aws_s3_request *completed_request;
} response_body;

/* data for AWS_S3_META_REQUEST_EVENT_PROGRESS */
struct {
struct aws_s3_meta_request_progress info;
} progress;
} u;
};

struct aws_s3_meta_request_vtable {
/* Update the meta request. out_request is required to be non-null. Returns true if there is any work in
* progress, false if there is not. */
Expand Down Expand Up @@ -179,11 +200,19 @@ struct aws_s3_meta_request {
* failed.)*/
uint32_t num_parts_delivery_completed;

/* Number of parts that have been successfully delivered to the caller. */
uint32_t num_parts_delivery_succeeded;
/* Task for delivering events on the meta-request's io_event_loop thread.
* We do this to ensure a meta-request's callbacks are fired sequentially and non-overlapping.
* If `event_delivery_array` has items in it, then this task is scheduled.
* If `event_delivery_active` is true, then this task is actively running.
* Delivery is not 100% complete until `event_delivery_array` is empty AND `event_delivery_active` is false
* (use aws_s3_meta_request_are_events_out_for_delivery_synced() to check) */
struct aws_task event_delivery_task;

/* Number of parts that have failed while trying to be delivered to the caller. */
uint32_t num_parts_delivery_failed;
/* Array of `struct aws_s3_meta_request_event` to deliver when the `event_delivery_task` runs. */
struct aws_array_list event_delivery_array;

/* When true, events are actively being delivered to the user. */
bool event_delivery_active;

/* The end finish result of the meta request. */
struct aws_s3_meta_request_result finish_result;
Expand All @@ -205,6 +234,14 @@ struct aws_s3_meta_request {

} client_process_work_threaded_data;

/* Anything in this structure should only ever be accessed by the meta-request from its io_event_loop thread. */
struct {
/* When delivering events, we swap contents with `synced_data.event_delivery_array`.
* This is an optimization, we could have just copied the array when the task runs,
* but swapping two array-lists back and forth avoids an allocation. */
struct aws_array_list event_delivery_array;
} io_threaded_data;

const bool should_compute_content_md5;

/* deep copy of the checksum config. */
Expand Down Expand Up @@ -316,6 +353,17 @@ void aws_s3_meta_request_stream_response_body_synced(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request);

/* Add an event for delivery on the meta-request's io_event_loop thread.
* These events usually correspond to callbacks that must fire sequentially and non-overlapping,
* such as delivery of a part's response body. */
void aws_s3_meta_request_add_event_for_delivery_synced(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_event *event);

/* Returns whether any events are out for delivery.
* The meta-request's finish callback must not be invoked until this returns false. */
bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);

/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
* as reading from the stream could cause user code to call back into aws-c-s3.
* This will fill the buffer to capacity, unless end of stream is reached.
Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ AWS_S3_API
void aws_s3_request_clean_up_send_data(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_acquire(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_acquire(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_release(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_release(struct aws_s3_request *request);

AWS_S3_API
struct aws_s3_request_metrics *aws_s3_request_metrics_new(
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#endif
#define KB_TO_BYTES(kb) ((kb)*1024)
#define MB_TO_BYTES(mb) ((mb)*1024 * 1024)
#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024)
#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024ULL)

#define MS_TO_NS(ms) ((uint64_t)(ms)*1000000)
#define SEC_TO_NS(ms) ((uint64_t)(ms)*1000000000)
Expand Down
7 changes: 6 additions & 1 deletion include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ struct aws_s3_meta_request_progress {
};

/**
* Invoked to report progress of multi-part upload and copy object requests.
* Invoked to report progress of a meta-request.
* For PutObject, progress refers to bytes uploaded.
* For CopyObject, progress refers to bytes copied.
* For GetObject, progress refers to bytes downloaded.
* For anything else, progress refers to response body bytes received.
*/
typedef void(aws_s3_meta_request_progress_fn)(
struct aws_s3_meta_request *meta_request,
Expand Down Expand Up @@ -534,6 +538,7 @@ struct aws_s3_meta_request_options {

/**
* Invoked to report progress of the meta request execution.
* See `aws_s3_meta_request_progress_fn`.
*/
aws_s3_meta_request_progress_fn *progress_callback;

Expand Down
24 changes: 21 additions & 3 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static bool s_s3_auto_ranged_get_update(
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);

request->part_range_start = 0;
request->part_range_end = meta_request->part_size - 1;
request->part_range_end = meta_request->part_size - 1; /* range-end is inclusive */
request->discovers_object_size = true;

++auto_ranged_get->synced_data.num_parts_requested;
Expand Down Expand Up @@ -317,6 +317,10 @@ static bool s_s3_auto_ranged_get_update(
}

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, s_s3_auto_ranged_get_success_status(meta_request));
Expand Down Expand Up @@ -494,7 +498,7 @@ static int s_discover_object_range_and_content_length(
* object range and total object size. Otherwise, the size and range should be equal to the
* total_content_length. */
if (!auto_ranged_get->initial_message_has_range_header) {
object_range_end = total_content_length - 1;
object_range_end = total_content_length - 1; /* range-end is inclusive */
} else if (aws_s3_parse_content_range_response_header(
meta_request->allocator,
request->send_data.response_headers,
Expand Down Expand Up @@ -553,7 +557,7 @@ static int s_discover_object_range_and_content_length(

/* When discovering the object size via first-part, the object range is the entire object. */
object_range_start = 0;
object_range_end = total_content_length - 1;
object_range_end = total_content_length - 1; /* range-end is inclusive */

result = AWS_OP_SUCCESS;
break;
Expand Down Expand Up @@ -695,6 +699,20 @@ static void s_s3_auto_ranged_get_request_finished(
}
++auto_ranged_get->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->send_data.response_body.len;
if (auto_ranged_get->synced_data.object_range_empty) {
event.u.progress.info.content_length = 0;
} else {
/* Note that range-end is inclusive */
event.u.progress.info.content_length = auto_ranged_get->synced_data.object_range_end + 1 -
auto_ranged_get->synced_data.object_range_start;
}
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);

AWS_LOGF_DEBUG(
Expand Down
31 changes: 24 additions & 7 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,10 @@ static bool s_s3_auto_ranged_put_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -1598,6 +1602,8 @@ static void s_s3_auto_ranged_put_request_finished(
AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request);
error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED;
} else if (!has_more_results) {
uint64_t bytes_previously_uploaded = 0;

for (size_t part_index = 0;
part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list);
part_index++) {
Expand All @@ -1607,6 +1613,8 @@ static void s_s3_auto_ranged_put_request_finished(
/* Update the number of parts sent/completed previously */
++auto_ranged_put->synced_data.num_parts_started;
++auto_ranged_put->synced_data.num_parts_completed;

bytes_previously_uploaded += part->size;
}
}

Expand All @@ -1616,6 +1624,14 @@ static void s_s3_auto_ranged_put_request_finished(
(void *)meta_request,
auto_ranged_put->synced_data.num_parts_completed,
auto_ranged_put->total_num_parts_from_content_length);

/* Deliver an initial progress_callback to report all previously uploaded parts. */
if (meta_request->progress_callback != NULL && bytes_previously_uploaded > 0) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = bytes_previously_uploaded;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}
}
}

Expand Down Expand Up @@ -1742,13 +1758,6 @@ static void s_s3_auto_ranged_put_request_finished(
etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes);
}
}
if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = request->request_body.len,
.content_length = auto_ranged_put->content_length,
};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
}
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1782,6 +1791,14 @@ static void s_s3_auto_ranged_put_request_finished(

++auto_ranged_put->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->request_body.len;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

/* Store part's ETag */
struct aws_s3_mpu_part_info *part = NULL;
aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index);
Expand Down
10 changes: 4 additions & 6 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1481,8 +1481,7 @@ static void s_s3_client_prepare_callback_queue_request(
request_is_noop = request->is_noop != 0;
s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1522,8 +1521,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
} else if (
s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
s_s3_client_create_connection_for_request(client, request);
Expand Down Expand Up @@ -1856,8 +1854,8 @@ void aws_s3_client_notify_connection_finished(
}

if (connection->request != NULL) {
aws_s3_request_release(connection->request);
connection->request = NULL;

connection->request = aws_s3_request_release(connection->request);
}

aws_retry_token_release(connection->retry_token);
Expand Down
24 changes: 20 additions & 4 deletions source/s3_copy_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ static bool s_s3_copy_object_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -439,6 +443,7 @@ static struct aws_future_void *s_s3_copy_object_prepare_request(struct aws_s3_re
case AWS_S3_COPY_OBJECT_REQUEST_TAG_MULTIPART_COPY: {
/* Create a new uploadPartCopy message to upload a part. */
/* compute sub-request range */
/* note that range-end is inclusive */
uint64_t range_start = (request->part_number - 1) * copy_object->synced_data.part_size;
uint64_t range_end = range_start + copy_object->synced_data.part_size - 1;
if (range_end >= copy_object->synced_data.content_length) {
Expand Down Expand Up @@ -640,6 +645,15 @@ static void s_s3_copy_object_request_finished(

/* Signals completion of the meta request */
if (error_code == AWS_ERROR_SUCCESS) {

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.content_length;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

copy_object->synced_data.copy_request_bypass_completed = true;
} else {
/* Bypassed CopyObject request failed */
Expand Down Expand Up @@ -720,11 +734,13 @@ static void s_s3_copy_object_request_finished(
AWS_ASSERT(etag != NULL);

++copy_object->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread. */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = copy_object->synced_data.part_size,
.content_length = copy_object->synced_data.content_length};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.part_size;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

struct aws_s3_mpu_part_info *part = NULL;
Expand Down
Loading

0 comments on commit a0ee6a9

Please sign in to comment.