From f4a52c8f5f480fcdfe4126c00bea2c0a35eb380a Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 28 Dec 2023 15:49:33 -0800 Subject: [PATCH] Support request level part size configuration (#539) - Add support for request level override for part size and upload threshold - Add support for on_done callback with the did_validate checksum and the checksum algorithm. - Submodules update for several S3 related improvements: - Fix Get-Object with partNumber - Skip HeaderRequest for ranged get - Cancel/Pause s3 request now faster (will cancel the ongoing HTTP requests now) Co-authored-by: Michael Graeb --- awscrt/s3.py | 68 +++++++++++++++++++++++++++++++++++----- crt/aws-c-common | 2 +- crt/aws-c-http | 2 +- crt/aws-c-s3 | 2 +- crt/aws-c-sdkutils | 2 +- crt/aws-lc | 2 +- crt/s2n | 2 +- source/s3_meta_request.c | 14 +++++++-- test/test_s3.py | 54 ++++++++++++++++++++++++++++++- 9 files changed, 130 insertions(+), 18 deletions(-) diff --git a/awscrt/s3.py b/awscrt/s3.py index 7c043581c..9a0aab525 100644 --- a/awscrt/s3.py +++ b/awscrt/s3.py @@ -182,13 +182,14 @@ class S3Client(NativeResource): for each connection, unless `tls_mode` is :attr:`S3RequestTlsMode.DISABLED` part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in. - Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5 MiB. - (8*1024*1024 by default) + Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits. + (max number of parts per upload is 10,000, minimum upload part size is 5 MiB) multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads. + This only affects :attr:`S3RequestType.PUT_OBJECT` request. Uploads over this size will use the multipart upload strategy. Uploads this size or less will use a single request. - If not set, `part_size` is used as the threshold. + If not set, maximal of `part_size` and 5 MiB will be used. throughput_target_gbps (Optional[float]): Throughput target in Gigabits per second (Gbps) that we are trying to reach. @@ -296,6 +297,8 @@ def make_request( signing_config=None, credential_provider=None, checksum_config=None, + part_size=None, + multipart_upload_threshold=None, on_headers=None, on_body=None, on_done=None, @@ -347,6 +350,20 @@ def make_request( checksum_config (Optional[S3ChecksumConfig]): Optional checksum settings. + part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in. + If not set, the part size configured for the client will be used. + Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits. + (max number of parts per upload is 10,000, minimum upload part size is 5 MiB) + + multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads. + This only affects :attr:`S3RequestType.PUT_OBJECT` request. + Uploads over this size will use the multipart upload strategy. + Uploads this size or less will use a single request. + If set, this should be at least `part_size`. + If not set, `part_size` adjusted by client will be used as the threshold. + If both `part_size` and `multipart_upload_threshold` are not set, + the values from `aws_s3_client_config` are used. + on_headers: Optional callback invoked as the response received, and even the API request has been split into multiple parts, this callback will only be invoked once as it's just making one API request to S3. @@ -401,6 +418,13 @@ def make_request( this is the final response's status code. If the operation failed for another reason, None is returned. + * `did_validate_checksum` (bool): + Was the server side checksum compared against a calculated checksum of the response body. + This may be false even if :attr:`S3ChecksumConfig.validate_response` was set because + the object was uploaded without a checksum, or downloaded differently from how it's uploaded. + + * `checksum_validation_algorithm` (Optional[S3ChecksumAlgorithm]): The checksum algorithm used to validate the response. + * `**kwargs` (dict): Forward-compatibility kwargs. on_progress: Optional callback invoked when part of the transfer is done to report the progress. @@ -423,6 +447,8 @@ def make_request( signing_config=signing_config, credential_provider=credential_provider, checksum_config=checksum_config, + part_size=part_size, + multipart_upload_threshold=multipart_upload_threshold, on_headers=on_headers, on_body=on_body, on_done=on_done, @@ -458,6 +484,8 @@ def __init__( signing_config=None, credential_provider=None, checksum_config=None, + part_size=None, + multipart_upload_threshold=None, on_headers=None, on_body=None, on_done=None, @@ -468,14 +496,21 @@ def __init__( assert callable(on_headers) or on_headers is None assert callable(on_body) or on_body is None assert callable(on_done) or on_done is None + assert isinstance(part_size, int) or part_size is None + assert isinstance(multipart_upload_threshold, int) or multipart_upload_threshold is None super().__init__() self._finished_future = Future() self.shutdown_event = threading.Event() - checksum_algorithm = 0 # 0 means NONE in C - checksum_location = 0 # 0 means NONE in C + # C layer uses 0 to indicate defaults + if part_size is None: + part_size = 0 + if multipart_upload_threshold is None: + multipart_upload_threshold = 0 + checksum_algorithm = 0 + checksum_location = 0 validate_response_checksum = False if checksum_config is not None: if checksum_config.algorithm is not None: @@ -509,6 +544,8 @@ def __init__( checksum_algorithm, checksum_location, validate_response_checksum, + part_size, + multipart_upload_threshold, s3_request_core) @property @@ -623,7 +660,15 @@ def _on_body(self, chunk, offset): def _on_shutdown(self): self._shutdown_event.set() - def _on_finish(self, error_code, status_code, error_headers, error_body, error_operation_name): + def _on_finish( + self, + error_code, + status_code, + error_headers, + error_body, + error_operation_name, + did_validate_checksum, + checksum_validation_algorithm): # If C layer gives status_code 0, that means "unknown" if status_code == 0: status_code = None @@ -631,7 +676,6 @@ def _on_finish(self, error_code, status_code, error_headers, error_body, error_o error = None if error_code: error = awscrt.exceptions.from_code(error_code) - if isinstance(error, awscrt.exceptions.AwsCrtError): if (error.name == "AWS_ERROR_CRT_CALLBACK_EXCEPTION" and self._python_callback_exception is not None): @@ -651,13 +695,21 @@ def _on_finish(self, error_code, status_code, error_headers, error_body, error_o self._finished_future.set_exception(error) else: self._finished_future.set_result(None) + + if checksum_validation_algorithm: + checksum_validation_algorithm = S3ChecksumAlgorithm(checksum_validation_algorithm) + else: + checksum_validation_algorithm = None + if self._on_done_cb: self._on_done_cb( error=error, error_headers=error_headers, error_body=error_body, error_operation_name=error_operation_name, - status_code=status_code) + status_code=status_code, + did_validate_checksum=did_validate_checksum, + checksum_validation_algorithm=checksum_validation_algorithm) def _on_progress(self, progress): if self._on_progress_cb: diff --git a/crt/aws-c-common b/crt/aws-c-common index 80f21b3ca..b7e04cae9 160000 --- a/crt/aws-c-common +++ b/crt/aws-c-common @@ -1 +1 @@ -Subproject commit 80f21b3cac5ac51c6b8a62c7d2a5ef58a75195ee +Subproject commit b7e04cae96df482c89be38eceb55f5eea6f13aea diff --git a/crt/aws-c-http b/crt/aws-c-http index a082f8a20..18352c8e0 160000 --- a/crt/aws-c-http +++ b/crt/aws-c-http @@ -1 +1 @@ -Subproject commit a082f8a2067e4a31db73f1d4ffd702a8dc0f7089 +Subproject commit 18352c8e065c29e3451fb6161ebc246364e12cf6 diff --git a/crt/aws-c-s3 b/crt/aws-c-s3 index de36fee8f..7dd72a99a 160000 --- a/crt/aws-c-s3 +++ b/crt/aws-c-s3 @@ -1 +1 @@ -Subproject commit de36fee8fe7ab02f10987877ae94a805bf440c1f +Subproject commit 7dd72a99adc6594a48186f0eeb2f18d92b93e149 diff --git a/crt/aws-c-sdkutils b/crt/aws-c-sdkutils index a6fd80cf7..fd8c0ba2e 160000 --- a/crt/aws-c-sdkutils +++ b/crt/aws-c-sdkutils @@ -1 +1 @@ -Subproject commit a6fd80cf7c163062d31abb28f309e47330fbfc17 +Subproject commit fd8c0ba2e233997eaaefe82fb818b8b444b956d3 diff --git a/crt/aws-lc b/crt/aws-lc index 80f3f3324..dc4e28145 160000 --- a/crt/aws-lc +++ b/crt/aws-lc @@ -1 +1 @@ -Subproject commit 80f3f3324e75737d43af3052b270fd2ffa169d29 +Subproject commit dc4e28145ceb6d46b5475e833f2da8def6d583fe diff --git a/crt/s2n b/crt/s2n index 95753f0c5..a9a07a25f 160000 --- a/crt/s2n +++ b/crt/s2n @@ -1 +1 @@ -Subproject commit 95753f0c528b59025343e8799cb25d3e9df89e21 +Subproject commit a9a07a25fa0d897ba6ad2b161b4b8ea9e97b6abd diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 56b853ef0..7586537e5 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -263,13 +263,15 @@ static void s_s3_request_on_finish( result = PyObject_CallMethod( request_binding->py_core, "_on_finish", - "(iiOy#s)", + "(iiOy#sOi)", error_code, meta_request_result->response_status, header_list ? header_list : Py_None, (const char *)(error_body.buffer), (Py_ssize_t)error_body.len, - operation_name); + operation_name, + meta_request_result->did_validate ? Py_True : Py_False, + (int)meta_request_result->validation_algorithm); if (result) { Py_DECREF(result); @@ -372,10 +374,12 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) { enum aws_s3_checksum_algorithm checksum_algorithm; /* i */ enum aws_s3_checksum_location checksum_location; /* i */ int validate_response_checksum; /* p - boolean predicate */ + uint64_t part_size; /* K */ + uint64_t multipart_upload_threshold; /* K */ PyObject *py_core; /* O */ if (!PyArg_ParseTuple( args, - "OOOizOOzzs#iipO", + "OOOizOOzzs#iipKKO", &py_s3_request, &s3_client_py, &http_request_py, @@ -390,6 +394,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) { &checksum_algorithm, &checksum_location, &validate_response_checksum, + &part_size, + &multipart_upload_threshold, &py_core)) { return NULL; } @@ -470,6 +476,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) { .finish_callback = s_s3_request_on_finish, .shutdown_callback = s_s3_request_on_shutdown, .progress_callback = s_s3_request_on_progress, + .part_size = part_size, + .multipart_upload_threshold = multipart_upload_threshold, .user_data = meta_request, }; diff --git a/test/test_s3.py b/test/test_s3.py index 5499bb1e6..c354e258f 100644 --- a/test/test_s3.py +++ b/test/test_s3.py @@ -260,6 +260,8 @@ def setUp(self): self.done_error_headers = None self.done_error_body = None self.done_error_operation_name = None + self.done_did_validate_checksum = None + self.done_checksum_validation_algorithm = None self.files = FileCreator() self.temp_put_obj_file_path = self.files.create_file_with_size("temp_put_obj_10mb", 10 * MB) @@ -305,12 +307,23 @@ def _on_request_headers(self, status_code, headers, **kargs): def _on_request_body(self, chunk, offset, **kargs): self.received_body_len = self.received_body_len + len(chunk) - def _on_request_done(self, error, error_headers, error_body, error_operation_name, status_code, **kwargs): + def _on_request_done( + self, + error, + error_headers, + error_body, + error_operation_name, + status_code, + did_validate_checksum, + checksum_validation_algorithm, + **kwargs): self.done_error = error self.done_error_headers = error_headers self.done_error_body = error_body self.done_error_operation_name = error_operation_name self.done_status_code = status_code + self.done_did_validate_checksum = did_validate_checksum + self.done_checksum_validation_algorithm = checksum_validation_algorithm def _on_progress(self, progress): self.transferred_len += progress @@ -323,6 +336,11 @@ def _validate_successful_response(self, is_put_object): self.assertIsNone(self.done_error_headers) self.assertIsNone(self.done_error_body) self.assertIsNone(self.done_error_operation_name) + self.assertIsInstance(self.done_did_validate_checksum, bool) + if self.done_did_validate_checksum: + self.assertIsInstance(self.done_checksum_validation_algorithm, S3ChecksumAlgorithm) + else: + self.assertIsNone(self.done_checksum_validation_algorithm) headers = HttpHeaders(self.response_headers) self.assertIsNone(headers.get("Content-Range")) body_length = headers.get("Content-Length") @@ -452,6 +470,38 @@ def test_put_object_multiple_times(self): del s3_client self.assertTrue(client_shutdown_event.wait(self.timeout)) + def test_put_object_request_override_part_size(self): + s3_client = s3_client_new(False, self.region, 5 * MB) + + tempfile = self.files.create_file_with_size("temp_file_override", 10 * MB) + path = "/put_object_test_py_10MB_override.txt" + content_length = os.stat(tempfile).st_size + request = self._put_object_request(None, content_length, path=path) + # Override the threshold to 10 MB, which will result in a single part upload + s3_request = s3_client.make_request( + request=request, + type=S3RequestType.PUT_OBJECT, + send_filepath=tempfile, + on_headers=self._on_request_headers, + on_body=self._on_request_body, + on_done=self._on_request_done, + multipart_upload_threshold=10 * MB) + try: + s3_request.finished_future.result(self.timeout) + except Exception as e: + # failed + self.assertTrue(False) + + # Etag headers for a MPU will be formatted with `-[part number]` + etag = HttpHeaders(self.response_headers).get("Etag") + # make sure we uploaded as single part as we override the threshold + self.assertFalse("-" in etag) + + del s3_request + client_shutdown_event = s3_client.shutdown_event + del s3_client + self.assertTrue(client_shutdown_event.wait(self.timeout)) + def test_get_object_filepath(self): request = self._get_object_request(self.get_test_object_path) request_type = S3RequestType.GET_OBJECT @@ -559,6 +609,8 @@ def test_put_get_with_checksum(self): download_checksum_config = S3ChecksumConfig(validate_response=True) self._test_s3_put_get_object(download_request, S3RequestType.GET_OBJECT, checksum_config=download_checksum_config) + self.assertTrue(self.done_did_validate_checksum) + self.assertEqual(self.done_checksum_validation_algorithm, S3ChecksumAlgorithm.CRC32) self.assertEqual(HttpHeaders(self.response_headers).get('x-amz-checksum-crc32'), crc32_base64_str)