diff --git a/awscrt/s3.py b/awscrt/s3.py index c3d847498..d8bfb621c 100644 --- a/awscrt/s3.py +++ b/awscrt/s3.py @@ -243,14 +243,7 @@ class S3Request(NativeResource): structures have all finished shutting down. Shutdown begins when the S3Request object is destroyed. """ - __slots__ = ( - '_on_headers_cb', - '_on_body_cb', - '_on_done_cb', - '_on_progress_cb', - '_finished_future', - '_http_request', - 'shutdown_event') + __slots__ = ('_finished_future', 'shutdown_event') def __init__( self, @@ -274,17 +267,10 @@ def __init__( super().__init__() - self._on_headers_cb = on_headers - self._on_body_cb = on_body - self._on_done_cb = on_done - self._on_progress_cb = on_progress - self._finished_future = Future() - self.shutdown_event = threading.Event() s3_request_core = _S3RequestCore( - client, request, self._finished_future, self.shutdown_event, @@ -333,7 +319,6 @@ class _S3RequestCore: def __init__( self, - client, request, finish_future, shutdown_event, @@ -343,7 +328,6 @@ def __init__( on_done=None, on_progress=None): - self._client = client self._request = request self._credential_provider = credential_provider @@ -355,9 +339,6 @@ def __init__( self._finished_future = finish_future self._shutdown_event = shutdown_event - def _on_shutdown(self): - self._shutdown_event.set() - def _on_headers(self, status_code, headers): if self._on_headers_cb: self._on_headers_cb(status_code=status_code, headers=headers) @@ -366,6 +347,9 @@ def _on_body(self, chunk, offset): if self._on_body_cb: self._on_body_cb(chunk=chunk, offset=offset) + def _on_shutdown(self): + self._shutdown_event.set() + def _on_finish(self, error_code, error_headers, error_body): error = None if error_code: diff --git a/crt/aws-c-s3 b/crt/aws-c-s3 index b5343e18a..ab2120fc7 160000 --- a/crt/aws-c-s3 +++ b/crt/aws-c-s3 @@ -1 +1 @@ -Subproject commit b5343e18a8721bbabd5f6f1b2259c1bae7741dd1 +Subproject commit ab2120fc708d201e780cdf20a621af8e20aef85d diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index a7d67be7c..e34b313b8 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -220,6 +220,10 @@ static void s_s3_request_on_finish( (void)meta_request; struct s3_meta_request_binding *request_binding = user_data; + if (request_binding->recv_file) { + fclose(request_binding->recv_file); + request_binding->recv_file = NULL; + } /*************** GIL ACQUIRE ***************/ PyGILState_STATE state; if (aws_py_gilstate_ensure(&state)) { @@ -280,6 +284,7 @@ static void s_s3_meta_request_capsule_destructor(PyObject *capsule) { if (meta_request->recv_file) { fclose(meta_request->recv_file); + meta_request->recv_file = NULL; } if (meta_request->native) { aws_s3_meta_request_release(meta_request->native); diff --git a/test/test_s3.py b/test/test_s3.py index c7dc9e5ff..33d5494ce 100644 --- a/test/test_s3.py +++ b/test/test_s3.py @@ -9,7 +9,7 @@ from awscrt.http import HttpHeaders, HttpRequest from awscrt.s3 import S3Client, S3RequestType -from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions, init_logging, LogLevel +from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions from awscrt.auth import AwsCredentialsProvider @@ -38,7 +38,6 @@ def s3_client_new(secure, region, part_size=0): class FakeReadStream(object): def __init__(self, read_future): self._future = read_future - pass def read(self, length): fake_string = "x" * length @@ -97,13 +96,15 @@ def _get_object_request(self, object_path): request = HttpRequest("GET", object_path, headers) return request - def _put_object_request(self, file_name): + def _put_object_request(self, file_name, path=None): self.put_body_stream = open(file_name, "r+b") file_stats = os.stat(file_name) self.data_len = file_stats.st_size headers = HttpHeaders([("host", self._build_endpoint_string(self.region, self.bucket_name)), ("Content-Type", "text/plain"), ("Content-Length", str(self.data_len))]) - request = HttpRequest("PUT", self.put_test_object_path, headers, self.put_body_stream) + if path is None: + path = self.put_test_object_path + request = HttpRequest("PUT", path, headers, self.put_body_stream) return request def _on_request_headers(self, status_code, headers, **kargs): @@ -129,7 +130,7 @@ def _validate_successful_get_response(self, put_object): self.received_body_len, "Received body length does not match the Content-Length header") - def _test_s3_put_get_object(self, request, request_type): + def _test_s3_put_get_object(self, request, request_type, exception_name=None): s3_client = s3_client_new(False, self.region, 5 * 1024 * 1024) s3_request = s3_client.make_request( request=request, @@ -137,10 +138,15 @@ def _test_s3_put_get_object(self, request, request_type): on_headers=self._on_request_headers, on_body=self._on_request_body) finished_future = s3_request.finished_future - finished_future.result(self.timeout) - self._validate_successful_get_response(request_type is S3RequestType.PUT_OBJECT) + try: + finished_future.result(self.timeout) + except Exception as e: + self.assertEqual(e.name, exception_name) + else: + self._validate_successful_get_response(request_type is S3RequestType.PUT_OBJECT) + shutdown_event = s3_request.shutdown_event - del s3_request + s3_request = None self.assertTrue(shutdown_event.wait(self.timeout)) def test_get_object(self): @@ -152,6 +158,31 @@ def test_put_object(self): self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT) self.put_body_stream.close() + def test_put_object_multiple_times(self): + s3_client = s3_client_new(False, self.region, 5 * 1024 * 1024) + finished_futures = [] + for i in range(3): + path = "/put_object_test_py_10MB_{}.txt".format(str(i)) + request = self._put_object_request("test/resources/s3_put_object.txt", path) + s3_request = s3_client.make_request( + request=request, + type=S3RequestType.PUT_OBJECT, + send_filepath="test/resources/s3_put_object.txt", + on_headers=self._on_request_headers, + on_body=self._on_request_body) + finished_futures.append(s3_request.finished_future) + try: + for future in finished_futures: + future.result(self.timeout) + except Exception as e: + # failed + self.assertTrue(False) + + client_shutdown_event = s3_client.shutdown_event + del s3_client + self.assertTrue(client_shutdown_event.wait(self.timeout)) + self.put_body_stream.close() + def test_get_object_file_object(self): request = self._get_object_request(self.get_test_object_path) request_type = S3RequestType.GET_OBJECT @@ -180,9 +211,6 @@ def test_get_object_file_object(self): self.transferred_len, "the transferred length reported does not match the content-length header") self.assertEqual(self.response_status_code, 200, "status code is not 200") - shutdown_event = s3_request.shutdown_event - del s3_request - self.assertTrue(shutdown_event.wait(self.timeout)) # TODO verify the content of written file os.remove(file.name) @@ -207,9 +235,6 @@ def test_put_object_file_object(self): self.transferred_len, "the transferred length reported does not match body we sent") self._validate_successful_get_response(request_type is S3RequestType.PUT_OBJECT) - shutdown_event = s3_request.shutdown_event - del s3_request - self.assertTrue(shutdown_event.wait(self.timeout)) def _on_progress_cancel_after_first_chunk(self, progress): self.transferred_len += progress @@ -244,7 +269,7 @@ def test_multipart_get_object_cancel(self): # The on_finish callback may invoke the progress self.assertLessEqual(self.progress_invoked, 2) shutdown_event = self.s3_request.shutdown_event - del self.s3_request + self.s3_request = None self.assertTrue(shutdown_event.wait(self.timeout)) os.remove(file.name) @@ -266,9 +291,8 @@ def test_get_object_quick_cancel(self): finished_future.result(self.timeout) except Exception as e: self.assertEqual(e.name, "AWS_ERROR_S3_CANCELED") - shutdown_event = s3_request.shutdown_event - del s3_request + s3_request = None self.assertTrue(shutdown_event.wait(self.timeout)) os.remove(file.name) @@ -287,17 +311,16 @@ def _put_object_cancel_helper(self, cancel_after_read): if cancel_after_read: read_futrue.result(self.timeout) - s3_request.cancel() finished_future = s3_request.finished_future try: finished_future.result(self.timeout) except Exception as e: self.assertEqual(e.name, "AWS_ERROR_S3_CANCELED") + shutdown_event = s3_request.shutdown_event - del s3_request + s3_request = None self.assertTrue(shutdown_event.wait(self.timeout)) - # TODO If CLI installed, run the following command to ensure the cancel succeed. # aws s3api list-multipart-uploads --bucket aws-crt-canary-bucket --prefix 'cancelled_request' # Nothing should printout @@ -308,6 +331,12 @@ def test_multipart_put_object_cancel(self): def test_put_object_quick_cancel(self): return self._put_object_cancel_helper(False) + def test_multipart_upload_with_invalid_request(self): + request = self._put_object_request("test/resources/s3_put_object.txt") + request.headers.set("Content-MD5", "something") + self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT, "AWS_ERROR_S3_INVALID_RESPONSE_STATUS") + self.put_body_stream.close() + if __name__ == '__main__': unittest.main()