From 386ee9c23a95e672cd4806a6d9c5f20c7ce94f7b Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 28 Nov 2023 07:31:17 -0800 Subject: [PATCH] Performance optimization Updates (#531) --- awscrt/auth.py | 15 ++++---- awscrt/s3.py | 51 ++++++++++++++++++++++++---- crt/aws-c-auth | 2 +- crt/aws-c-s3 | 2 +- source/auth.h | 1 + source/auth_credentials.c | 9 +++++ source/auth_signing_config.c | 20 ++++++----- source/s3_client.c | 52 +++++++++++++++++----------- test/test_s3.py | 66 ++++++++++++++++++++++++++++++------ 9 files changed, 164 insertions(+), 54 deletions(-) diff --git a/awscrt/auth.py b/awscrt/auth.py index 10b007f13..a41648854 100644 --- a/awscrt/auth.py +++ b/awscrt/auth.py @@ -455,6 +455,9 @@ class AwsSigningAlgorithm(IntEnum): V4_ASYMMETRIC = 1 """Signature Version 4 - Asymmetric""" + V4_S3EXPRESS = 2 + """Signature Version 4 - S3 Express""" + class AwsSignatureType(IntEnum): """Which sort of signature should be computed from the signable.""" @@ -595,11 +598,11 @@ class AwsSigningConfig(NativeResource): ) def __init__(self, - algorithm, - signature_type, - credentials_provider, - region, - service, + algorithm=AwsSigningAlgorithm.V4, + signature_type=AwsSignatureType.HTTP_REQUEST_HEADERS, + credentials_provider=None, + region="", + service="", date=None, should_sign_header=None, use_double_uri_encode=True, @@ -612,7 +615,7 @@ def __init__(self, assert isinstance(algorithm, AwsSigningAlgorithm) assert isinstance(signature_type, AwsSignatureType) - assert isinstance(credentials_provider, AwsCredentialsProvider) + assert isinstance(credentials_provider, AwsCredentialsProvider) or credentials_provider is None assert isinstance(region, str) assert isinstance(service, str) assert callable(should_sign_header) or should_sign_header is None diff --git a/awscrt/s3.py b/awscrt/s3.py index 125a6d2c4..7c043581c 100644 --- a/awscrt/s3.py +++ b/awscrt/s3.py @@ -10,11 +10,12 @@ from awscrt import NativeResource from awscrt.http import HttpRequest from awscrt.io import ClientBootstrap, TlsConnectionOptions +from awscrt.auth import AwsCredentials, AwsCredentialsProvider, AwsSignatureType, AwsSignedBodyHeaderType, AwsSignedBodyValue, AwsSigningAlgorithm, AwsSigningConfig from awscrt.auth import AwsCredentialsProvider, AwsSignatureType, AwsSignedBodyHeaderType, AwsSignedBodyValue, \ AwsSigningAlgorithm, AwsSigningConfig import awscrt.exceptions -from dataclasses import dataclass import threading +from dataclasses import dataclass from typing import List, Optional, Tuple from enum import IntEnum @@ -156,9 +157,22 @@ class S3Client(NativeResource): If this is :attr:`S3RequestTlsMode.DISABLED`: No TLS options will be used, regardless of `tls_connection_options` value. - signing_config (Optional[AwsSigningConfig]): - Configuration for signing of the client. Use :func:`create_default_s3_signing_config()` to create the default config. - If None is provided, the request will not be signed. + signing_config (Optional[AwsSigningConfig]): Configuration for signing of the client. + Use :func:`create_default_s3_signing_config()` to create the default config. + + If not set, a default config will be used with anonymous credentials and skip signing the request. + + If set: + Credentials provider is required. Other configs are all optional, and will be default to what + needs to sign the request for S3, only overrides when Non-zero/Not-empty is set. + + S3 Client will derive the right config for signing process based on this. + + Notes: + + 1. For SIGV4_S3EXPRESS, S3 client will use the credentials in the config to derive the S3 Express + credentials that are used in the signing process. + 2. Client may make modifications to signing config before passing it on to signer. credential_provider (Optional[AwsCredentialsProvider]): Deprecated, prefer `signing_config` instead. Credentials providers source the :class:`~awscrt.auth.AwsCredentials` needed to sign an authenticated AWS request. @@ -181,6 +195,11 @@ class S3Client(NativeResource): You can also use `get_recommended_throughput_target_gbps()` to get recommended value for your system. 10.0 Gbps by default (may change in future) + enable_s3express (Optional[bool]): To enable S3 Express support for the client. + The typical usage for a S3 Express request is to set this to true and let the request to be + signed with `AwsSigningAlgorithm.V4_S3EXPRESS`, either from the client-level `signing_config` + or the request-level override. + memory_limit (Optional[int]): Memory limit, in bytes, of how much memory client can use for buffering data for requests. Default values scale with target throughput and are currently @@ -201,6 +220,7 @@ def __init__( part_size=None, multipart_upload_threshold=None, throughput_target_gbps=None, + enable_s3express=False, memory_limit=None): assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None assert isinstance(region, str) @@ -213,6 +233,7 @@ def __init__( int) or isinstance( throughput_target_gbps, float) or throughput_target_gbps is None + assert isinstance(enable_s3express, bool) or enable_s3express is None if credential_provider and signing_config: raise ValueError("'credential_provider' has been deprecated in favor of 'signing_config'. " @@ -231,7 +252,11 @@ def on_shutdown(): if not bootstrap: bootstrap = ClientBootstrap.get_or_create_static_default() - s3_client_core = _S3ClientCore(bootstrap, credential_provider, signing_config, tls_connection_options) + s3_client_core = _S3ClientCore( + bootstrap, + credential_provider, + signing_config, + tls_connection_options) # C layer uses 0 to indicate defaults if tls_mode is None: @@ -256,6 +281,7 @@ def on_shutdown(): part_size, multipart_upload_threshold, throughput_target_gbps, + enable_s3express, memory_limit, s3_client_core) @@ -300,10 +326,21 @@ def make_request( request's `body_stream` is ignored. This should give better performance than reading a file from a stream. - signing_config (Optional[AwsSigningConfig]): - Configuration for signing of the request to override the configuration from client. Use :func:`create_default_s3_signing_config()` to create the default config. + signing_config (Optional[AwsSigningConfig]): Configuration for signing of the request to override the configuration from client. + Use :func:`create_default_s3_signing_config()` to create the default config. + If None is provided, the client configuration will be used. + If set: + All fields are optional. The credentials will be resolve from client if not set. + S3 Client will derive the right config for signing process based on this. + + Notes: + + 1. For SIGV4_S3EXPRESS, S3 client will use the credentials in the config to derive the S3 Express + credentials that are used in the signing process. + 2. Client may make modifications to signing config before passing it on to signer. + credential_provider (Optional[AwsCredentialsProvider]): Deprecated, prefer `signing_config` instead. Credentials providers source the :class:`~awscrt.auth.AwsCredentials` needed to sign an authenticated AWS request, for this request only. If None is provided, the client configuration will be used. diff --git a/crt/aws-c-auth b/crt/aws-c-auth index 50f3e0d1c..baeffa791 160000 --- a/crt/aws-c-auth +++ b/crt/aws-c-auth @@ -1 +1 @@ -Subproject commit 50f3e0d1ce28e39d914818048d84e3f5a11afe43 +Subproject commit baeffa791d9d1cf61460662a6d9ac2186aaf05df diff --git a/crt/aws-c-s3 b/crt/aws-c-s3 index cc6ba346b..dc9001092 160000 --- a/crt/aws-c-s3 +++ b/crt/aws-c-s3 @@ -1 +1 @@ -Subproject commit cc6ba346b55ef012f7131d98dcc68e16acc16d95 +Subproject commit dc9001092b4e7dc3bb2b21631ecb928fe3f98ee4 diff --git a/source/auth.h b/source/auth.h index 3055298c3..57632032f 100644 --- a/source/auth.h +++ b/source/auth.h @@ -47,5 +47,6 @@ PyObject *aws_py_sign_request_aws(PyObject *self, PyObject *args); struct aws_credentials *aws_py_get_credentials(PyObject *credentials); struct aws_credentials_provider *aws_py_get_credentials_provider(PyObject *credentials_provider); struct aws_signing_config_aws *aws_py_get_signing_config(PyObject *signing_config); +PyObject *aws_py_credentials_new_request_from_native(struct aws_credentials *credentials); #endif // AWS_CRT_PYTHON_AUTH_H diff --git a/source/auth_credentials.c b/source/auth_credentials.c index 85f9a3a08..ec6e964f1 100644 --- a/source/auth_credentials.c +++ b/source/auth_credentials.c @@ -23,6 +23,15 @@ static void s_credentials_capsule_destructor(PyObject *capsule) { aws_credentials_release(credentials); } +PyObject *aws_py_credentials_new_request_from_native(struct aws_credentials *credentials) { + PyObject *capsule = PyCapsule_New(credentials, s_capsule_name_credentials, s_credentials_capsule_destructor); + if (!capsule) { + return NULL; + } + aws_credentials_acquire(credentials); + return capsule; +} + PyObject *aws_py_credentials_new(PyObject *self, PyObject *args) { (void)self; diff --git a/source/auth_signing_config.c b/source/auth_signing_config.c index 0b998f362..e9edc1cc8 100644 --- a/source/auth_signing_config.c +++ b/source/auth_signing_config.c @@ -72,7 +72,7 @@ PyObject *aws_py_signing_config_new(PyObject *self, PyObject *args) { int algorithm; int signature_type; - PyObject *py_credentials_provider; + PyObject *py_credentials_provider = NULL; struct aws_byte_cursor region; struct aws_byte_cursor service; PyObject *py_date; @@ -136,13 +136,14 @@ PyObject *aws_py_signing_config_new(PyObject *self, PyObject *args) { binding->native.flags.omit_session_token = PyObject_IsTrue(py_omit_session_token); /* credentials_provider */ - binding->native.credentials_provider = aws_py_get_credentials_provider(py_credentials_provider); - if (!binding->native.credentials_provider) { - goto error; + if (py_credentials_provider != Py_None) { + binding->native.credentials_provider = aws_py_get_credentials_provider(py_credentials_provider); + if (!binding->native.credentials_provider) { + goto error; + } + binding->py_credentials_provider = py_credentials_provider; + Py_INCREF(binding->py_credentials_provider); } - binding->py_credentials_provider = py_credentials_provider; - Py_INCREF(binding->py_credentials_provider); - /* backup strings */ if (aws_byte_buf_init_cache_and_update_cursors( &binding->string_storage, @@ -220,8 +221,9 @@ PyObject *aws_py_signing_config_get_credentials_provider(PyObject *self, PyObjec if (!binding) { return NULL; } - - Py_INCREF(binding->py_credentials_provider); + if (binding->py_credentials_provider) { + Py_INCREF(binding->py_credentials_provider); + } return binding->py_credentials_provider; } diff --git a/source/s3_client.c b/source/s3_client.c index af2e88d53..47d785698 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -6,7 +6,11 @@ #include "auth.h" #include "io.h" + +#include #include +#include +#include #include static const char *s_capsule_name_s3_client = "aws_s3_client"; @@ -158,6 +162,8 @@ PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args) { PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args) { (void)self; + (void)args; + PyObject *lock_capsule; /* O */ if (!PyArg_ParseTuple(args, "O", &lock_capsule)) { @@ -249,11 +255,12 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { uint64_t part_size; /* K */ uint64_t multipart_upload_threshold; /* K */ double throughput_target_gbps; /* d */ + int enable_s3express; /* p */ uint64_t mem_limit; /* K */ PyObject *py_core; /* O */ if (!PyArg_ParseTuple( args, - "OOOOOs#iKKdKO", + "OOOOOs#iKKdpKO", &bootstrap_py, &signing_config_py, &credential_provider_py, @@ -265,6 +272,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { &part_size, &multipart_upload_threshold, &throughput_target_gbps, + &enable_s3express, &mem_limit, &py_core)) { return NULL; @@ -282,20 +290,6 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { return NULL; } } - struct aws_signing_config_aws *signing_config = NULL; - if (signing_config_py != Py_None) { - signing_config = aws_py_get_signing_config(signing_config_py); - if (!signing_config) { - return NULL; - } - } - struct aws_signing_config_aws signing_config_from_credentials_provider; - AWS_ZERO_STRUCT(signing_config_from_credentials_provider); - - if (credential_provider) { - aws_s3_init_default_signing_config(&signing_config_from_credentials_provider, region, credential_provider); - signing_config = &signing_config_from_credentials_provider; - } struct aws_tls_connection_options *tls_options = NULL; if (tls_options_py != Py_None) { @@ -305,15 +299,33 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { } } - struct s3_client_binding *s3_client = aws_mem_calloc(allocator, 1, sizeof(struct s3_client_binding)); - if (!s3_client) { - return PyErr_AwsLastError(); + struct aws_signing_config_aws default_signing_config; + AWS_ZERO_STRUCT(default_signing_config); + + struct aws_signing_config_aws *signing_config = NULL; + struct aws_credentials *anonymous_credentials = NULL; + if (signing_config_py != Py_None) { + signing_config = aws_py_get_signing_config(signing_config_py); + if (!signing_config) { + return NULL; + } + } else if (credential_provider) { + aws_s3_init_default_signing_config(&default_signing_config, region, credential_provider); + signing_config = &default_signing_config; + } else { + /* Default to use a signing config with anonymous credentials */ + anonymous_credentials = aws_credentials_new_anonymous(allocator); + default_signing_config.credentials = anonymous_credentials; + signing_config = &default_signing_config; } + struct s3_client_binding *s3_client = aws_mem_calloc(allocator, 1, sizeof(struct s3_client_binding)); + /* From hereon, we need to clean up if errors occur */ PyObject *capsule = PyCapsule_New(s3_client, s_capsule_name_s3_client, s_s3_client_capsule_destructor); if (!capsule) { + aws_credentials_release(anonymous_credentials); aws_mem_release(allocator, s3_client); return NULL; } @@ -336,6 +348,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { .throughput_target_gbps = throughput_target_gbps, .shutdown_callback = s_s3_client_shutdown, .shutdown_callback_user_data = s3_client, + .enable_s3express = enable_s3express, }; s3_client->native = aws_s3_client_new(allocator, &s3_config); @@ -343,10 +356,11 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { PyErr_SetAwsLastError(); goto error; } - + aws_credentials_release(anonymous_credentials); return capsule; error: + aws_credentials_release(anonymous_credentials); Py_DECREF(capsule); return NULL; } diff --git a/test/test_s3.py b/test/test_s3.py index 4116477ec..5499bb1e6 100644 --- a/test/test_s3.py +++ b/test/test_s3.py @@ -14,6 +14,9 @@ from multiprocessing import Process from awscrt.http import HttpHeaders, HttpRequest +from awscrt.s3 import S3Client, S3RequestType, create_default_s3_signing_config +from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions +from awscrt.auth import AwsCredentials, AwsCredentialsProvider, AwsSignatureType, AwsSignedBodyHeaderType, AwsSignedBodyValue, AwsSigningAlgorithm, AwsSigningConfig from awscrt.s3 import ( S3ChecksumAlgorithm, S3ChecksumConfig, @@ -45,6 +48,7 @@ MB = 1024 ** 2 GB = 1024 ** 3 +S3EXPRESS_ENDPOINT = "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com" cross_process_lock_name = "instance_lock_test" @@ -148,7 +152,13 @@ def full_path(self, filename): return os.path.join(self.rootdir, filename) -def s3_client_new(secure, region, part_size=0, is_cancel_test=False, mem_limit=None): +def s3_client_new( + secure, + region, + part_size=0, + is_cancel_test=False, + enable_s3express=False, + mem_limit=None): if is_cancel_test: # for cancellation tests, make things slow, so it's less likely that @@ -177,8 +187,9 @@ def s3_client_new(secure, region, part_size=0, is_cancel_test=False, mem_limit=N signing_config=signing_config, tls_connection_options=tls_option, part_size=part_size, - memory_limit=mem_limit, - throughput_target_gbps=throughput_target_gbps) + throughput_target_gbps=throughput_target_gbps, + enable_s3express=enable_s3express, + memory_limit=mem_limit) return s3_client @@ -252,22 +263,33 @@ def setUp(self): self.files = FileCreator() self.temp_put_obj_file_path = self.files.create_file_with_size("temp_put_obj_10mb", 10 * MB) + self.s3express_preload_cache = [('key_1', AwsCredentials("accesskey_1", "secretAccessKey", "sessionToken")), + ('key_2', AwsCredentials("accesskey_2", "secretAccessKey", "sessionToken"))] def tearDown(self): self.files.remove_all() + self.s3express_preload_cache = None super().tearDown() - def _build_endpoint_string(self, region, bucket_name): + def _build_endpoint_string(self, region, bucket_name, enable_s3express=False): + if enable_s3express: + return S3EXPRESS_ENDPOINT return bucket_name + ".s3." + region + ".amazonaws.com" - def _get_object_request(self, object_path): - headers = HttpHeaders([("host", self._build_endpoint_string(self.region, self.bucket_name))]) + def _get_object_request(self, object_path, enable_s3express=False): + headers = HttpHeaders([("host", self._build_endpoint_string(self.region, self.bucket_name, enable_s3express))]) request = HttpRequest("GET", object_path, headers) return request - def _put_object_request(self, input_stream, content_len, path=None, unknown_content_length=False): + def _put_object_request( + self, + input_stream, + content_len, + path=None, + unknown_content_length=False, + enable_s3express=False): # if send file path is set, the body_stream of http request will be ignored (using file handler from C instead) - headers = HttpHeaders([("host", self._build_endpoint_string(self.region, self.bucket_name)), + headers = HttpHeaders([("host", self._build_endpoint_string(self.region, self.bucket_name, enable_s3express)), ("Content-Type", "text/plain")]) if unknown_content_length is False: headers.add("Content-Length", str(content_len)) @@ -317,14 +339,25 @@ def _test_s3_put_get_object( request, request_type, exception_name=None, + enable_s3express=False, + region="us-west-2", mem_limit=None, - **kwargs, - ): + **kwargs): + s3_client = s3_client_new( + False, + region, + 5 * MB, + enable_s3express=enable_s3express, + mem_limit=mem_limit) + signing_config = None + if enable_s3express: + signing_config = AwsSigningConfig( + algorithm=AwsSigningAlgorithm.V4_S3EXPRESS) - s3_client = s3_client_new(False, self.region, 5 * MB, mem_limit=mem_limit) s3_request = s3_client.make_request( request=request, type=request_type, + signing_config=signing_config, on_headers=self._on_request_headers, on_body=self._on_request_body, on_done=self._on_request_done, @@ -379,6 +412,17 @@ def test_put_object_unknown_content_length_single_part(self): self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT) put_body_stream.close() + def test_get_object_s3express(self): + request = self._get_object_request("/crt-download-10MB", enable_s3express=True) + self._test_s3_put_get_object(request, S3RequestType.GET_OBJECT, enable_s3express=True, region="us-east-1") + + def test_put_object_s3express(self): + put_body_stream = open(self.temp_put_obj_file_path, "rb") + content_length = os.stat(self.temp_put_obj_file_path).st_size + request = self._put_object_request(put_body_stream, content_length, enable_s3express=True) + self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT, enable_s3express=True, region="us-east-1") + put_body_stream.close() + def test_put_object_multiple_times(self): s3_client = s3_client_new(False, self.region, 5 * MB) finished_futures = []