Skip to content

Commit

Permalink
Actually moving read outside of lock (#76)
Browse files Browse the repository at this point in the history
* It looks like stream reads in my previous PR got moved back inside of a mutex while other bug fixes were being made in the same branch.  Re-moving it back out of the mutex.
* Removing unused state, cleaned up a little bit of logic, leaving TODOs in some places for refactoring.
* Fixing cancel bug if the create was attempted to be sent, but failed to ever make it to the service, causing the cancel. 
 
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
* Changing return code
  • Loading branch information
rccarper authored Jan 15, 2021
1 parent ad7bb45 commit c54d280
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 66 deletions.
4 changes: 3 additions & 1 deletion include/aws/s3/private/s3_auto_ranged_put.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ enum aws_s3_auto_ranged_put_state {
AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_CANCEL,
AWS_S3_AUTO_RANGED_PUT_STATE_SEND_COMPLETE,
AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_COMPLETE,
AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_SINGLE_REQUEST
};

enum aws_s3_auto_ranged_put_request_tag {
Expand Down Expand Up @@ -49,6 +48,9 @@ struct aws_s3_auto_ranged_put {

struct aws_http_headers *needed_response_headers;
struct aws_s3_meta_request_finish_options *cached_finish_options;

bool create_multipart_upload_successful;

} synced_data;
};

Expand Down
6 changes: 2 additions & 4 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ struct aws_s3_request {
/* When true, the response body buffer will be allocated in the size of a part. */
uint32_t part_size_response_body : 1;

uint32_t request_was_sent : 1;

/* Members of this structure will be repopulated each time the request is sent. For example, If the request fails,
* and needs to be retried, then the members of this structure will be cleaned up and re-populated on the next send.
*/
Expand All @@ -108,10 +110,6 @@ struct aws_s3_request {
int response_status;

} send_data;

struct {
bool in_flight;
} client_data;
};

/* Options for finishing the meta request. */
Expand Down
119 changes: 81 additions & 38 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -290,40 +290,43 @@ static int s_s3_auto_ranged_put_next_request(
struct aws_s3_request *request = NULL;
struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;

int result = AWS_OP_SUCCESS;
bool finish_canceling = false;

s_s3_auto_ranged_put_lock_synced_data(auto_ranged_put);
bool cancelling = meta_request->synced_data.state == AWS_S3_META_REQUEST_STATE_CANCELING;
bool canceling = meta_request->synced_data.state == AWS_S3_META_REQUEST_STATE_CANCELING;

switch (auto_ranged_put->synced_data.state) {
case AWS_S3_AUTO_RANGED_PUT_STATE_START: {

if (cancelling) {
s_s3_auto_ranged_put_unlock_synced_data(auto_ranged_put);
if (canceling) {
/* If we are canceling, then at this point, we haven't sent anything yet, so go ahead and finish
* canceling. */
finish_canceling = true;
} else {
/* Setup for a create-multipart upload */
request = aws_s3_request_new(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD,
0,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);

s_s3_auto_ranged_put_cancel_finished(meta_request);
return result;
/* We'll need to wait for the initial create to get back so that we can get the upload-id. */
auto_ranged_put->synced_data.state = AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_CREATE;
}

/* Setup for a create-multipart upload */
request = aws_s3_request_new(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD,
0,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);

/* We'll need to wait for the initial create to get back so that we can get the upload-id. */
auto_ranged_put->synced_data.state = AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_CREATE;
break;
}
case AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_CREATE: {
break;
}
case AWS_S3_AUTO_RANGED_PUT_STATE_SENDING_PARTS: {

if (cancelling) {
if (canceling) {
if (!auto_ranged_put->synced_data.create_multipart_upload_successful) {
finish_canceling = true;
} else if (
auto_ranged_put->synced_data.num_parts_completed == auto_ranged_put->synced_data.num_parts_sent) {

if (auto_ranged_put->synced_data.num_parts_completed == auto_ranged_put->synced_data.num_parts_sent) {
request = aws_s3_request_new(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD,
Expand All @@ -344,15 +347,8 @@ static int s_s3_auto_ranged_put_next_request(

aws_byte_buf_init(&request->request_body, meta_request->allocator, meta_request->part_size);
request->part_number = auto_ranged_put->threaded_next_request_data.next_part_number;

++auto_ranged_put->threaded_next_request_data.next_part_number;
++auto_ranged_put->synced_data.num_parts_sent;
if (aws_s3_meta_request_read_body(meta_request, &request->request_body)) {
s_s3_auto_ranged_put_unlock_synced_data(auto_ranged_put);
aws_s3_request_release(request);
request = NULL;
result = AWS_OP_ERR;
goto after_unlock;
}
}

break;
Expand All @@ -361,7 +357,7 @@ static int s_s3_auto_ranged_put_next_request(
break;
}
case AWS_S3_AUTO_RANGED_PUT_STATE_SEND_COMPLETE: {
if (cancelling) {
if (canceling) {
request = aws_s3_request_new(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD,
Expand All @@ -387,9 +383,6 @@ static int s_s3_auto_ranged_put_next_request(
case AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_COMPLETE: {
break;
}
case AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_SINGLE_REQUEST: {
break;
}
case AWS_S3_AUTO_RANGED_PUT_STATE_WAITING_FOR_CANCEL:
break;

Expand All @@ -400,8 +393,37 @@ static int s_s3_auto_ranged_put_next_request(

s_s3_auto_ranged_put_unlock_synced_data(auto_ranged_put);

after_unlock:
if (request != NULL) {
if (finish_canceling) {
AWS_ASSERT(request == NULL);
s_s3_auto_ranged_put_cancel_finished(meta_request);
return AWS_OP_SUCCESS;
}

if (request != NULL && request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART) {

if (aws_s3_meta_request_read_body(meta_request, &request->request_body)) {
aws_s3_request_release(request);
return AWS_OP_ERR;
}

bool no_longer_active = false;

/* Now we know that we're going to return the request, increment our counter that it has been sent.*/
/* TODO having to do this active state check here is awkward. Basically, we need to cover the case that failure
* happened in between reading the request and needing to increment the synced_data.num_parts_sent variable. */
s_s3_auto_ranged_put_lock_synced_data(auto_ranged_put);
if (meta_request->synced_data.state == AWS_S3_META_REQUEST_STATE_ACTIVE) {
++auto_ranged_put->synced_data.num_parts_sent;
} else {
no_longer_active = true;
}
s_s3_auto_ranged_put_unlock_synced_data(auto_ranged_put);

if (no_longer_active) {
aws_s3_request_release(request);
return AWS_OP_SUCCESS;
}

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p: Returning request %p for part %d",
Expand All @@ -411,8 +433,7 @@ static int s_s3_auto_ranged_put_next_request(
}

*out_request = request;

return result;
return AWS_OP_SUCCESS;
}

/* Given a request, prepare it for sending based on its description. */
Expand Down Expand Up @@ -667,12 +688,11 @@ static int s_s3_auto_ranged_put_stream_complete(
/* Store the multipart upload id and set that we are ready for sending parts. */
auto_ranged_put->upload_id = upload_id;

/* Record success of the create multipart upload. Wait until the request cleans up entirely for advancing
* the state. */
s_s3_auto_ranged_put_lock_synced_data(auto_ranged_put);
auto_ranged_put->synced_data.state = AWS_S3_AUTO_RANGED_PUT_STATE_SENDING_PARTS;
auto_ranged_put->synced_data.create_multipart_upload_successful = true;
s_s3_auto_ranged_put_unlock_synced_data(auto_ranged_put);

/* Create Multipart Upload finished successfully, so now we should have parts to send. */
aws_s3_meta_request_push_to_client(meta_request);
break;
}
case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART: {
Expand Down Expand Up @@ -741,6 +761,8 @@ static int s_s3_auto_ranged_put_stream_complete(
return AWS_OP_SUCCESS;
}

/* TODO: make this callback into a notify_request_finished function, and move all stream complete logic (which currently
* only happens on success) into here. */
static void s_s3_auto_ranged_put_notify_request_destroyed(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request) {
Expand All @@ -751,13 +773,34 @@ static void s_s3_auto_ranged_put_notify_request_destroyed(

struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;

if (request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART) {
if (!request->request_was_sent) {
return;
}

if (request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD) {

/* Any time a create multipart upload request has finished, be it success or failure, advance to the sending
* parts state, which will immediately cancel if there has been failure with the create. */
/* TODO branch on the success/failure of the request here and go to a different state that makes this logic more
* clear.*/
s_s3_auto_ranged_put_lock_synced_data(auto_ranged_put);
auto_ranged_put->synced_data.state = AWS_S3_AUTO_RANGED_PUT_STATE_SENDING_PARTS;
s_s3_auto_ranged_put_unlock_synced_data(auto_ranged_put);

aws_s3_meta_request_push_to_client(meta_request);

} else if (request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART) {

bool notify_work_available = false;

s_s3_auto_ranged_put_lock_synced_data(auto_ranged_put);
bool cancelling = meta_request->synced_data.state == AWS_S3_META_REQUEST_STATE_CANCELING;

/* TODO This part is confusing/unclear and should be slightly refactored. This function can get called on
* success OR failure of the request. This really only works because we're currently assuming that the meta
* request will fail entirely when a request completely fails (initiating a cancel), and that that logic will
* happen before this function is called. All of that is client detail, which makes this a bit hacky right
* now.*/
++auto_ranged_put->synced_data.num_parts_completed;

if (cancelling) {
Expand Down
4 changes: 2 additions & 2 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {
/* At this point, the vip connection owns the only existing ref count to the request.*/
vip_connection->request = request;
vip_connection->is_retry = false;
request->client_data.in_flight = true;
request->request_was_sent = true;
++client->threaded_data.num_requests_in_flight;
s_s3_client_process_request(client, vip_connection);
}
Expand Down Expand Up @@ -1839,7 +1839,7 @@ void aws_s3_client_notify_request_destroyed(struct aws_s3_client *client, struct
AWS_PRECONDITION(client);
AWS_PRECONDITION(request);

if (request->client_data.in_flight) {
if (request->request_was_sent) {
aws_s3_client_lock_synced_data(client);
++client->synced_data.pending_request_count;
s_s3_client_schedule_process_work_synced(client);
Expand Down
33 changes: 12 additions & 21 deletions source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ static int s_s3_meta_request_default_next_request(

struct aws_s3_meta_request_default *meta_request_default = meta_request->impl;
struct aws_s3_request *request = NULL;
int result = AWS_OP_SUCCESS;

s_s3_meta_request_default_lock_synced_data(meta_request_default);

Expand All @@ -173,36 +172,27 @@ static int s_s3_meta_request_default_next_request(
}

const uint32_t part_number = 1;

request = aws_s3_request_new(meta_request, 0, part_number, request_flags);

if (meta_request_default->content_length > 0) {
aws_byte_buf_init(&request->request_body, meta_request->allocator, meta_request_default->content_length);

if (aws_s3_meta_request_read_body(meta_request, &request->request_body)) {
result = AWS_OP_ERR;
goto unlock;
}
}

meta_request_default->synced_data.state = AWS_S3_META_REQUEST_DEFAULT_WAITING_FOR_REQUEST;

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST, "id=%p: Meta Request created request %p", (void *)meta_request, (void *)request);
}

unlock:
s_s3_meta_request_default_unlock_synced_data(meta_request_default);

if (result == AWS_OP_SUCCESS) {
*out_request = request;
} else {
aws_s3_request_release(request);
request = NULL;
*out_request = NULL;
if (request != NULL && meta_request_default->content_length > 0) {
aws_byte_buf_init(&request->request_body, meta_request->allocator, meta_request_default->content_length);

if (aws_s3_meta_request_read_body(meta_request, &request->request_body)) {
aws_s3_request_release(request);
request = NULL;
return AWS_OP_ERR;
}
}

return result;
*out_request = request;
return AWS_OP_SUCCESS;
}

/* Given a request, prepare it for sending based on its description. */
Expand Down Expand Up @@ -275,7 +265,8 @@ static void s_s3_meta_request_default_notify_request_destroyed(
AWS_PRECONDITION(meta_request_default);

aws_s3_meta_request_lock_synced_data(meta_request);
bool finish = meta_request_default->synced_data.state == AWS_S3_META_REQUEST_DEFAULT_WAITING_FOR_REQUEST;
bool finish = request->request_was_sent &&
meta_request_default->synced_data.state == AWS_S3_META_REQUEST_DEFAULT_WAITING_FOR_REQUEST;
aws_s3_meta_request_unlock_synced_data(meta_request);

if (finish) {
Expand Down

0 comments on commit c54d280

Please sign in to comment.