Skip to content

Commit

Permalink
S3 receive filepath (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Sep 4, 2024
1 parent 76096ad commit 502cd62
Show file tree
Hide file tree
Showing 9 changed files with 503 additions and 35 deletions.
5 changes: 5 additions & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ struct aws_s3_meta_request {

/* running checksum of all the parts of a default get, or ranged get meta request*/
struct aws_s3_checksum *meta_request_level_running_response_sum;

/* The receiving file handler */
FILE *recv_file;
struct aws_string *recv_filepath;
bool recv_file_delete_on_failure;
};

/* Info for each part, that we need to remember until we send CompleteMultipartUpload */
Expand Down
2 changes: 2 additions & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ enum aws_s3_errors {
AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED,
AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE,
AWS_ERROR_S3_REQUEST_HAS_COMPLETED,
AWS_ERROR_S3_RECV_FILE_ALREADY_EXISTS,
AWS_ERROR_S3_RECV_FILE_NOT_FOUND,

AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};
Expand Down
50 changes: 50 additions & 0 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,28 @@ enum aws_s3_checksum_location {
AWS_SCL_TRAILER,
};

enum aws_s3_recv_file_option {
/**
* Create a new file if it doesn't exist, otherwise replace the existing file.
*/
AWS_S3_RECV_FILE_CREATE_OR_REPLACE = 0,
/**
* Always create a new file. If the file already exists, AWS_ERROR_S3_RECV_FILE_ALREADY_EXISTS will be raised.
*/
AWS_S3_RECV_FILE_CREATE_NEW,
/**
* Create a new file if it doesn't exist, otherwise append to the existing file.
*/
AWS_S3_RECV_FILE_CREATE_OR_APPEND,

/**
* Write to an existing file at the specified position, defined by the `recv_file_position`.
* If the file does not exist, AWS_ERROR_S3_RECV_FILE_NOT_FOUND will be raised.
* If `recv_file_position` is not configured, start overwriting data at the beginning of the
* file (byte 0).
*/
AWS_S3_RECV_FILE_WRITE_TO_POSITION,
};
/**
* Info about a single part, for you to review before the upload completes.
*/
Expand Down Expand Up @@ -632,6 +654,34 @@ struct aws_s3_meta_request_options {
* Do not set the message's body-stream if the body is being passed by other means (see note above) */
struct aws_http_message *message;

/**
* Optional.
* If set, the received data will be written into this file.
* the `body_callback` will NOT be invoked.
* This gives a better performance when receiving data to write to a file.
* See `aws_s3_recv_file_option` for the configuration on the receive file.
*/
struct aws_byte_cursor recv_filepath;

/**
* Optional.
* Default to AWS_S3_RECV_FILE_CREATE_OR_REPLACE.
* This only works with recv_filepath set.
* See `aws_s3_recv_file_option`.
*/
enum aws_s3_recv_file_option recv_file_option;
/**
* Optional.
* The specified position to start writing at for the recv file when `recv_file_option` is set to
* AWS_S3_RECV_FILE_WRITE_TO_POSITION, ignored otherwise.
*/
uint64_t recv_file_position;
/**
* Set it to be true to delete the receive file on failure, otherwise, the file will be left as-is.
* This only works with recv_filepath set.
*/
bool recv_file_delete_on_failure;

/**
* Optional.
* If set, this file is sent as the request body.
Expand Down
2 changes: 2 additions & 0 deletions source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED, "CreateSession call failed when signing with S3 Express."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE, "part_size mismatch, possibly due to wrong object_size_hint. Retrying with Range instead of partNumber."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_HAS_COMPLETED, "Request has already completed, action cannot be performed."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_RECV_FILE_ALREADY_EXISTS, "File already exists, cannot create as new."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_RECV_FILE_NOT_FOUND, "The receive file doesn't exist, cannot create as configuration required."),
};
/* clang-format on */

Expand Down
122 changes: 110 additions & 12 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
#include <aws/auth/signing_result.h>
#include <aws/common/clock.h>
#include <aws/common/encoding.h>
#include <aws/common/file.h>
#include <aws/common/string.h>
#include <aws/common/system_info.h>
#include <aws/io/async_stream.h>
#include <aws/io/event_loop.h>
#include <aws/io/retry_strategy.h>
#include <aws/io/socket.h>
#include <aws/io/stream.h>
#include <errno.h>
#include <inttypes.h>

static const size_t s_dynamic_body_initial_buf_size = KB_TO_BYTES(1);
Expand Down Expand Up @@ -232,6 +234,57 @@ int aws_s3_meta_request_init_base(
/* Keep original message around, for headers, method, and synchronous body-stream (if any) */
meta_request->initial_request_message = aws_http_message_acquire(options->message);

if (options->recv_filepath.len > 0) {

meta_request->recv_filepath = aws_string_new_from_cursor(allocator, &options->recv_filepath);
switch (options->recv_file_option) {
case AWS_S3_RECV_FILE_CREATE_OR_REPLACE:
meta_request->recv_file = aws_fopen(aws_string_c_str(meta_request->recv_filepath), "wb");
break;

case AWS_S3_RECV_FILE_CREATE_NEW:
if (aws_path_exists(meta_request->recv_filepath)) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Cannot receive file via CREATE_NEW: file already exists",
(void *)meta_request);
aws_raise_error(AWS_ERROR_S3_RECV_FILE_ALREADY_EXISTS);
break;
} else {
meta_request->recv_file = aws_fopen(aws_string_c_str(meta_request->recv_filepath), "wb");
break;
}
case AWS_S3_RECV_FILE_CREATE_OR_APPEND:
meta_request->recv_file = aws_fopen(aws_string_c_str(meta_request->recv_filepath), "ab");
break;
case AWS_S3_RECV_FILE_WRITE_TO_POSITION:
if (!aws_path_exists(meta_request->recv_filepath)) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Cannot receive file via WRITE_TO_POSITION: file not found.",
(void *)meta_request);
aws_raise_error(AWS_ERROR_S3_RECV_FILE_NOT_FOUND);
break;
} else {
meta_request->recv_file = aws_fopen(aws_string_c_str(meta_request->recv_filepath), "r+");
if (meta_request->recv_file &&
aws_fseek(meta_request->recv_file, options->recv_file_position, SEEK_SET) != AWS_OP_SUCCESS) {
/* error out. */
goto error;
}
break;
}

default:
AWS_ASSERT(false);
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
break;
}
if (!meta_request->recv_file) {
goto error;
}
}

/* If the request's body is being passed in some other way, set that up.
* (we checked earlier that the request body is not being passed multiple ways) */
if (options->send_filepath.len > 0) {
Expand Down Expand Up @@ -440,6 +493,15 @@ static void s_s3_meta_request_destroy(void *user_data) {
/* endpoint should have already been released and set NULL by the meta request finish call.
* But call release() again, just in case we're tearing down a half-initialized meta request */
aws_s3_endpoint_release(meta_request->endpoint);
if (meta_request->recv_file) {
fclose(meta_request->recv_file);
meta_request->recv_file = NULL;
if (meta_request->recv_file_delete_on_failure) {
/* If the meta request succeed, the file should be closed from finish call. So it must be failing. */
aws_file_delete(meta_request->recv_filepath);
}
}
aws_string_destroy(meta_request->recv_filepath);

/* Client may be NULL if meta request failed mid-creation (or this some weird testing mock with no client) */
if (meta_request->client != NULL) {
Expand Down Expand Up @@ -1779,19 +1841,47 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a

if (error_code == AWS_ERROR_SUCCESS && response_body.len > 0) {
if (meta_request->meta_request_level_running_response_sum) {
aws_checksum_update(meta_request->meta_request_level_running_response_sum, &response_body);
if (aws_checksum_update(
meta_request->meta_request_level_running_response_sum, &response_body)) {
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Failed to update checksum. last error:%s",
(void *)meta_request,
aws_error_name(error_code));
}
}
if (meta_request->body_callback != NULL &&
meta_request->body_callback(
meta_request, &response_body, request->part_range_start, meta_request->user_data)) {

error_code = aws_last_error_or_unknown();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Response body callback raised error %d (%s).",
(void *)meta_request,
error_code,
aws_error_str(error_code));
if (error_code == AWS_ERROR_SUCCESS) {
if (meta_request->recv_file) {
/* Write the data directly to the file. No need to seek, since the event will always be
* delivered with the right order. */
if (fwrite((void *)response_body.ptr, response_body.len, 1, meta_request->recv_file) < 1) {
int errno_value = ferror(meta_request->recv_file) ? errno : 0; /* Always cache errno */
aws_translate_and_raise_io_error_or(errno_value, AWS_ERROR_FILE_WRITE_FAILURE);
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Failed writing to file. errno:%d. aws-error:%s",
(void *)meta_request,
errno_value,
aws_error_name(error_code));
}
if (meta_request->client->enable_read_backpressure) {
aws_s3_meta_request_increment_read_window(meta_request, response_body.len);
}
} else if (
meta_request->body_callback != NULL &&
meta_request->body_callback(
meta_request, &response_body, request->part_range_start, meta_request->user_data)) {

error_code = aws_last_error_or_unknown();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Response body callback raised error %d (%s).",
(void *)meta_request,
error_code,
aws_error_str(error_code));
}
}
}
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);
Expand Down Expand Up @@ -1979,6 +2069,14 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
pending_async_write_waker(pending_async_write_waker_user_data);
}

if (meta_request->recv_file) {
fclose(meta_request->recv_file);
meta_request->recv_file = NULL;
if (finish_result.error_code && meta_request->recv_file_delete_on_failure) {
aws_file_delete(meta_request->recv_filepath);
}
}

while (!aws_linked_list_empty(&release_request_list)) {
struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list);
struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
Expand Down
7 changes: 7 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ add_net_test_case(test_s3_get_object_tls_disabled)
add_net_test_case(test_s3_get_object_tls_enabled)
add_net_test_case(test_s3_get_object_tls_default)
add_net_test_case(test_s3_get_object_less_than_part_size)
add_net_test_case(test_s3_get_object_file_path)
add_net_test_case(test_s3_get_object_file_path_create_new)
add_net_test_case(test_s3_get_object_file_path_append)
add_net_test_case(test_s3_get_object_file_path_to_position)
add_net_test_case(test_s3_get_object_empty_object)
add_net_test_case(test_s3_get_object_multiple)
add_net_test_case(test_s3_get_object_multiple_serial)
Expand All @@ -77,6 +81,9 @@ add_net_test_case(test_s3_default_get_object_looks_like_async_error_xml)
add_net_test_case(test_s3_get_object_backpressure_small_increments)
add_net_test_case(test_s3_get_object_backpressure_big_increments)
add_net_test_case(test_s3_get_object_backpressure_initial_size_zero)
add_net_test_case(test_s3_get_object_backpressure_small_increments_recv_filepath)
add_net_test_case(test_s3_get_object_backpressure_big_increments_recv_filepath)
add_net_test_case(test_s3_get_object_backpressure_initial_size_zero_recv_filepath)
add_net_test_case(test_s3_get_object_part)
add_net_test_case(test_s3_no_signing)
add_net_test_case(test_s3_signing_override)
Expand Down
Loading

0 comments on commit 502cd62

Please sign in to comment.