Skip to content

Commit

Permalink
Update the binding to use features from aws-c-s3 (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Oct 12, 2023
1 parent 203ba9e commit 67a6eaf
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 240 deletions.
208 changes: 21 additions & 187 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ struct s3_meta_request_binding {
**/
FILE *recv_file;

struct aws_http_message *copied_message;

/* Batch up the transferred size in one sec. */
uint64_t size_transferred;
/* The time stamp when the progress reported */
Expand All @@ -47,9 +45,6 @@ static void s_destroy(struct s3_meta_request_binding *meta_request) {
if (meta_request->recv_file) {
fclose(meta_request->recv_file);
}
if (meta_request->copied_message) {
aws_http_message_release(meta_request->copied_message);
}
Py_XDECREF(meta_request->py_core);
aws_mem_release(aws_py_get_allocator(), meta_request);
}
Expand Down Expand Up @@ -122,6 +117,7 @@ static int s_s3_request_on_headers(
}
}

/* To avoid reporting progress to python too often. We cache it up and only report to python after at least 1 sec. */
static int s_record_progress(struct s3_meta_request_binding *request_binding, uint64_t length, bool *report_progress) {
if (aws_add_u64_checked(request_binding->size_transferred, length, &request_binding->size_transferred)) {
/* Wow */
Expand Down Expand Up @@ -151,10 +147,6 @@ static int s_s3_request_on_body(
(void)meta_request;
struct s3_meta_request_binding *request_binding = user_data;

bool report_progress;
if (s_record_progress(request_binding, (uint64_t)body->len, &report_progress)) {
return AWS_OP_ERR;
}
if (request_binding->recv_file) {
/* The callback will be invoked with the right order, so we don't need to seek first. */
if (fwrite((void *)body->ptr, body->len, 1, request_binding->recv_file) < 1) {
Expand All @@ -168,9 +160,7 @@ static int s_s3_request_on_body(
aws_error_name(aws_last_error()));
return AWS_OP_ERR;
}
if (!report_progress) {
return AWS_OP_SUCCESS;
}
return AWS_OP_SUCCESS;
}
bool error = true;
/*************** GIL ACQUIRE ***************/
Expand All @@ -179,32 +169,15 @@ static int s_s3_request_on_body(
if (aws_py_gilstate_ensure(&state)) {
return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */
}
if (!request_binding->recv_file) {
result = PyObject_CallMethod(
request_binding->py_core,
"_on_body",
"(y#K)",
(const char *)(body->ptr),
(Py_ssize_t)body->len,
range_start);

if (!result) {
PyErr_WriteUnraisable(request_binding->py_core);
goto done;
}
Py_DECREF(result);
}
if (report_progress) {
/* Hold the GIL before enterring here */
result =
PyObject_CallMethod(request_binding->py_core, "_on_progress", "(K)", request_binding->size_transferred);
if (!result) {
PyErr_WriteUnraisable(request_binding->py_core);
} else {
Py_DECREF(result);
}
request_binding->size_transferred = 0;
result = PyObject_CallMethod(
request_binding->py_core, "_on_body", "(y#K)", (const char *)(body->ptr), (Py_ssize_t)body->len, range_start);

if (!result) {
PyErr_WriteUnraisable(request_binding->py_core);
goto done;
}
Py_DECREF(result);
error = false;
done:
PyGILState_Release(state);
Expand Down Expand Up @@ -252,8 +225,6 @@ static void s_s3_request_on_finish(
PyObject *header_list = NULL;
PyObject *result = NULL;

request_binding->copied_message = aws_http_message_release(request_binding->copied_message);

if (request_binding->size_transferred && (error_code == 0)) {
/* report the remaining progress */
result =
Expand Down Expand Up @@ -343,39 +314,21 @@ static void s_s3_request_on_shutdown(void *user_data) {
/*************** GIL RELEASE ***************/
}

/*
* file-based python input stream for reporting the progress
*/
struct aws_input_py_stream_file_impl {
struct aws_input_stream base;
struct aws_input_stream *actual_stream;
struct s3_meta_request_binding *binding;
};

static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
size_t pre_len = dest->len;

if (aws_input_stream_read(impl->actual_stream, dest)) {
return AWS_OP_ERR;
}
static void s_s3_request_on_progress(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_progress *progress,
void *user_data) {

size_t actually_read = 0;
if (aws_sub_size_checked(dest->len, pre_len, &actually_read)) {
return AWS_OP_ERR;
}
struct s3_meta_request_binding *request_binding = user_data;

bool report_progress;
struct s3_meta_request_binding *request_binding = impl->binding;
if (s_record_progress(request_binding, (uint64_t)actually_read, &report_progress)) {
return AWS_OP_ERR;
}
bool report_progress = false;
s_record_progress(request_binding, progress->bytes_transferred, &report_progress);

if (report_progress) {
/*************** GIL ACQUIRE ***************/
PyGILState_STATE state;
if (aws_py_gilstate_ensure(&state)) {
return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */
return; /* Python has shut down. Nothing matters anymore, but don't crash */
}
PyObject *result =
PyObject_CallMethod(request_binding->py_core, "_on_progress", "(K)", request_binding->size_transferred);
Expand All @@ -385,113 +338,7 @@ static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct
request_binding->size_transferred = 0;
PyGILState_Release(state);
/*************** GIL RELEASE ***************/
if (!result) {
return aws_py_raise_error();
}
}
return AWS_OP_SUCCESS;
}
static int s_aws_input_stream_file_seek(
struct aws_input_stream *stream,
int64_t offset,
enum aws_stream_seek_basis basis) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
return aws_input_stream_seek(impl->actual_stream, offset, basis);
}

static int s_aws_input_stream_file_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
return aws_input_stream_get_status(impl->actual_stream, status);
}

static int s_aws_input_stream_file_get_length(struct aws_input_stream *stream, int64_t *length) {
struct aws_input_py_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_py_stream_file_impl, base);
return aws_input_stream_get_length(impl->actual_stream, length);
}

static void s_aws_input_stream_file_destroy(struct aws_input_py_stream_file_impl *impl) {
struct aws_allocator *allocator = aws_py_get_allocator();
aws_input_stream_release(impl->actual_stream);
aws_mem_release(allocator, impl);
}

static struct aws_input_stream_vtable s_aws_input_stream_file_vtable = {
.seek = s_aws_input_stream_file_seek,
.read = s_aws_input_stream_file_read,
.get_status = s_aws_input_stream_file_get_status,
.get_length = s_aws_input_stream_file_get_length,
};

static struct aws_input_stream *s_input_stream_new_from_file(
struct aws_allocator *allocator,
const char *file_name,
struct s3_meta_request_binding *request_binding) {
struct aws_input_py_stream_file_impl *impl =
aws_mem_calloc(allocator, 1, sizeof(struct aws_input_py_stream_file_impl));

impl->base.vtable = &s_aws_input_stream_file_vtable;
aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_file_destroy);

impl->actual_stream = aws_input_stream_new_from_file(allocator, file_name);
if (!impl->actual_stream) {
aws_mem_release(allocator, impl);
return NULL;
}
impl->binding = request_binding;

return &impl->base;
}

/* Copy an existing HTTP message without body. */
struct aws_http_message *s_copy_http_message(struct aws_allocator *allocator, struct aws_http_message *base_message) {
AWS_PRECONDITION(allocator);
AWS_PRECONDITION(base_message);

struct aws_http_message *message = aws_http_message_new_request(allocator);

if (message == NULL) {
return NULL;
}

struct aws_byte_cursor request_method;
if (aws_http_message_get_request_method(base_message, &request_method)) {
goto error_clean_up;
}

if (aws_http_message_set_request_method(message, request_method)) {
goto error_clean_up;
}

struct aws_byte_cursor request_path;
if (aws_http_message_get_request_path(base_message, &request_path)) {
goto error_clean_up;
}

if (aws_http_message_set_request_path(message, request_path)) {
goto error_clean_up;
}

size_t num_headers = aws_http_message_get_header_count(base_message);
for (size_t header_index = 0; header_index < num_headers; ++header_index) {
struct aws_http_header header;
if (aws_http_message_get_header(base_message, &header, header_index)) {
goto error_clean_up;
}
if (aws_http_message_add_header(message, header)) {
goto error_clean_up;
}
}

return message;

error_clean_up:

if (message != NULL) {
aws_http_message_release(message);
message = NULL;
}

return NULL;
}

PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
Expand Down Expand Up @@ -579,37 +426,24 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
Py_INCREF(meta_request->py_core);

if (recv_filepath) {
meta_request->recv_file = aws_fopen(recv_filepath, "wb+");
meta_request->recv_file = aws_fopen(recv_filepath, "wb");
if (!meta_request->recv_file) {
aws_translate_and_raise_io_error(errno);
PyErr_SetAwsLastError();
goto error;
}
}
if (send_filepath) {
if (type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
/* Copy the http request from python object and replace the old pointer with new pointer */
meta_request->copied_message = s_copy_http_message(allocator, http_request);
struct aws_input_stream *input_body = s_input_stream_new_from_file(allocator, send_filepath, meta_request);
if (!input_body) {
PyErr_SetAwsLastError();
goto error;
}
/* rewrite the input stream of the original request */
aws_http_message_set_body_stream(meta_request->copied_message, input_body);
/* Input body is owned by copied message */
aws_input_stream_release(input_body);
}
}

struct aws_s3_meta_request_options s3_meta_request_opt = {
.type = type,
.message = meta_request->copied_message ? meta_request->copied_message : http_request,
.message = http_request,
.signing_config = signing_config,
.send_filepath = aws_byte_cursor_from_c_str(send_filepath),
.headers_callback = s_s3_request_on_headers,
.body_callback = s_s3_request_on_body,
.finish_callback = s_s3_request_on_finish,
.shutdown_callback = s_s3_request_on_shutdown,
.progress_callback = s_s3_request_on_progress,
.user_data = meta_request,
};

Expand Down
Loading

0 comments on commit 67a6eaf

Please sign in to comment.