Skip to content

Commit

Permalink
s3 follow up (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Feb 15, 2021
1 parent d020399 commit 5523913
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 41 deletions.
24 changes: 4 additions & 20 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,7 @@ class S3Request(NativeResource):
structures have all finished shutting down. Shutdown begins when the
S3Request object is destroyed.
"""
__slots__ = (
'_on_headers_cb',
'_on_body_cb',
'_on_done_cb',
'_on_progress_cb',
'_finished_future',
'_http_request',
'shutdown_event')
__slots__ = ('_finished_future', 'shutdown_event')

def __init__(
self,
Expand All @@ -274,17 +267,10 @@ def __init__(

super().__init__()

self._on_headers_cb = on_headers
self._on_body_cb = on_body
self._on_done_cb = on_done
self._on_progress_cb = on_progress

self._finished_future = Future()

self.shutdown_event = threading.Event()

s3_request_core = _S3RequestCore(
client,
request,
self._finished_future,
self.shutdown_event,
Expand Down Expand Up @@ -333,7 +319,6 @@ class _S3RequestCore:

def __init__(
self,
client,
request,
finish_future,
shutdown_event,
Expand All @@ -343,7 +328,6 @@ def __init__(
on_done=None,
on_progress=None):

self._client = client
self._request = request
self._credential_provider = credential_provider

Expand All @@ -355,9 +339,6 @@ def __init__(
self._finished_future = finish_future
self._shutdown_event = shutdown_event

def _on_shutdown(self):
self._shutdown_event.set()

def _on_headers(self, status_code, headers):
if self._on_headers_cb:
self._on_headers_cb(status_code=status_code, headers=headers)
Expand All @@ -366,6 +347,9 @@ def _on_body(self, chunk, offset):
if self._on_body_cb:
self._on_body_cb(chunk=chunk, offset=offset)

def _on_shutdown(self):
self._shutdown_event.set()

def _on_finish(self, error_code, error_headers, error_body):
error = None
if error_code:
Expand Down
5 changes: 5 additions & 0 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ static void s_s3_request_on_finish(
(void)meta_request;
struct s3_meta_request_binding *request_binding = user_data;

if (request_binding->recv_file) {
fclose(request_binding->recv_file);
request_binding->recv_file = NULL;
}
/*************** GIL ACQUIRE ***************/
PyGILState_STATE state;
if (aws_py_gilstate_ensure(&state)) {
Expand Down Expand Up @@ -280,6 +284,7 @@ static void s_s3_meta_request_capsule_destructor(PyObject *capsule) {

if (meta_request->recv_file) {
fclose(meta_request->recv_file);
meta_request->recv_file = NULL;
}
if (meta_request->native) {
aws_s3_meta_request_release(meta_request->native);
Expand Down
69 changes: 49 additions & 20 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from awscrt.http import HttpHeaders, HttpRequest
from awscrt.s3 import S3Client, S3RequestType
from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions, init_logging, LogLevel
from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsConnectionOptions, TlsContextOptions
from awscrt.auth import AwsCredentialsProvider


Expand Down Expand Up @@ -38,7 +38,6 @@ def s3_client_new(secure, region, part_size=0):
class FakeReadStream(object):
def __init__(self, read_future):
self._future = read_future
pass

def read(self, length):
fake_string = "x" * length
Expand Down Expand Up @@ -97,13 +96,15 @@ def _get_object_request(self, object_path):
request = HttpRequest("GET", object_path, headers)
return request

def _put_object_request(self, file_name):
def _put_object_request(self, file_name, path=None):
self.put_body_stream = open(file_name, "r+b")
file_stats = os.stat(file_name)
self.data_len = file_stats.st_size
headers = HttpHeaders([("host", self._build_endpoint_string(self.region, self.bucket_name)),
("Content-Type", "text/plain"), ("Content-Length", str(self.data_len))])
request = HttpRequest("PUT", self.put_test_object_path, headers, self.put_body_stream)
if path is None:
path = self.put_test_object_path
request = HttpRequest("PUT", path, headers, self.put_body_stream)
return request

def _on_request_headers(self, status_code, headers, **kargs):
Expand All @@ -129,18 +130,23 @@ def _validate_successful_get_response(self, put_object):
self.received_body_len,
"Received body length does not match the Content-Length header")

def _test_s3_put_get_object(self, request, request_type):
def _test_s3_put_get_object(self, request, request_type, exception_name=None):
s3_client = s3_client_new(False, self.region, 5 * 1024 * 1024)
s3_request = s3_client.make_request(
request=request,
type=request_type,
on_headers=self._on_request_headers,
on_body=self._on_request_body)
finished_future = s3_request.finished_future
finished_future.result(self.timeout)
self._validate_successful_get_response(request_type is S3RequestType.PUT_OBJECT)
try:
finished_future.result(self.timeout)
except Exception as e:
self.assertEqual(e.name, exception_name)
else:
self._validate_successful_get_response(request_type is S3RequestType.PUT_OBJECT)

shutdown_event = s3_request.shutdown_event
del s3_request
s3_request = None
self.assertTrue(shutdown_event.wait(self.timeout))

def test_get_object(self):
Expand All @@ -152,6 +158,31 @@ def test_put_object(self):
self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT)
self.put_body_stream.close()

def test_put_object_multiple_times(self):
s3_client = s3_client_new(False, self.region, 5 * 1024 * 1024)
finished_futures = []
for i in range(3):
path = "/put_object_test_py_10MB_{}.txt".format(str(i))
request = self._put_object_request("test/resources/s3_put_object.txt", path)
s3_request = s3_client.make_request(
request=request,
type=S3RequestType.PUT_OBJECT,
send_filepath="test/resources/s3_put_object.txt",
on_headers=self._on_request_headers,
on_body=self._on_request_body)
finished_futures.append(s3_request.finished_future)
try:
for future in finished_futures:
future.result(self.timeout)
except Exception as e:
# failed
self.assertTrue(False)

client_shutdown_event = s3_client.shutdown_event
del s3_client
self.assertTrue(client_shutdown_event.wait(self.timeout))
self.put_body_stream.close()

def test_get_object_file_object(self):
request = self._get_object_request(self.get_test_object_path)
request_type = S3RequestType.GET_OBJECT
Expand Down Expand Up @@ -180,9 +211,6 @@ def test_get_object_file_object(self):
self.transferred_len,
"the transferred length reported does not match the content-length header")
self.assertEqual(self.response_status_code, 200, "status code is not 200")
shutdown_event = s3_request.shutdown_event
del s3_request
self.assertTrue(shutdown_event.wait(self.timeout))
# TODO verify the content of written file
os.remove(file.name)

Expand All @@ -207,9 +235,6 @@ def test_put_object_file_object(self):
self.transferred_len,
"the transferred length reported does not match body we sent")
self._validate_successful_get_response(request_type is S3RequestType.PUT_OBJECT)
shutdown_event = s3_request.shutdown_event
del s3_request
self.assertTrue(shutdown_event.wait(self.timeout))

def _on_progress_cancel_after_first_chunk(self, progress):
self.transferred_len += progress
Expand Down Expand Up @@ -244,7 +269,7 @@ def test_multipart_get_object_cancel(self):
# The on_finish callback may invoke the progress
self.assertLessEqual(self.progress_invoked, 2)
shutdown_event = self.s3_request.shutdown_event
del self.s3_request
self.s3_request = None
self.assertTrue(shutdown_event.wait(self.timeout))
os.remove(file.name)

Expand All @@ -266,9 +291,8 @@ def test_get_object_quick_cancel(self):
finished_future.result(self.timeout)
except Exception as e:
self.assertEqual(e.name, "AWS_ERROR_S3_CANCELED")

shutdown_event = s3_request.shutdown_event
del s3_request
s3_request = None
self.assertTrue(shutdown_event.wait(self.timeout))
os.remove(file.name)

Expand All @@ -287,17 +311,16 @@ def _put_object_cancel_helper(self, cancel_after_read):

if cancel_after_read:
read_futrue.result(self.timeout)

s3_request.cancel()
finished_future = s3_request.finished_future
try:
finished_future.result(self.timeout)
except Exception as e:
self.assertEqual(e.name, "AWS_ERROR_S3_CANCELED")

shutdown_event = s3_request.shutdown_event
del s3_request
s3_request = None
self.assertTrue(shutdown_event.wait(self.timeout))

# TODO If CLI installed, run the following command to ensure the cancel succeed.
# aws s3api list-multipart-uploads --bucket aws-crt-canary-bucket --prefix 'cancelled_request'
# Nothing should printout
Expand All @@ -308,6 +331,12 @@ def test_multipart_put_object_cancel(self):
def test_put_object_quick_cancel(self):
return self._put_object_cancel_helper(False)

def test_multipart_upload_with_invalid_request(self):
request = self._put_object_request("test/resources/s3_put_object.txt")
request.headers.set("Content-MD5", "something")
self._test_s3_put_get_object(request, S3RequestType.PUT_OBJECT, "AWS_ERROR_S3_INVALID_RESPONSE_STATUS")
self.put_body_stream.close()


if __name__ == '__main__':
unittest.main()

0 comments on commit 5523913

Please sign in to comment.