diff --git a/awscrt/s3.py b/awscrt/s3.py index 9019a4c64..2278d8856 100644 --- a/awscrt/s3.py +++ b/awscrt/s3.py @@ -19,6 +19,37 @@ from enum import IntEnum +class CrossProcessLock(NativeResource): + """ + Class representing an exclusive cross-process lock, scoped by `lock_scope_name` + + Recommended usage is to either explicitly call acquire() followed by release() when the lock is no longer required, or use this in a 'with' statement. + + acquire() will throw a RuntimeError with AWS_MUTEX_CALLER_NOT_OWNER as the error code, if the lock could not be acquired. + + If the lock has not been explicitly released when the process exits, it will be released by the operating system. + + Keyword Args: + lock_scope_name (str): Unique string identifying the caller holding the lock. + """ + + def __init__(self, lock_scope_name): + super().__init__() + self._binding = _awscrt.s3_cross_process_lock_new(lock_scope_name) + + def acquire(self): + _awscrt.s3_cross_process_lock_acquire(self._binding) + + def __enter__(self): + self.acquire() + + def release(self): + _awscrt.s3_cross_process_lock_release(self._binding) + + def __exit__(self, exc_type, exc_value, exc_tb): + self.release() + + class S3RequestType(IntEnum): """The type of the AWS S3 request""" diff --git a/crt/aws-c-common b/crt/aws-c-common index e381a7bee..fb3182c54 160000 --- a/crt/aws-c-common +++ b/crt/aws-c-common @@ -1 +1 @@ -Subproject commit e381a7beeacb070f1816989dcb0e2c0ae6eccaea +Subproject commit fb3182c5411e4f5da2ee9372e0d66aa3f15a026d diff --git a/source/module.c b/source/module.c index 182b50860..20c6fadfe 100644 --- a/source/module.c +++ b/source/module.c @@ -801,6 +801,9 @@ static PyMethodDef s_module_methods[] = { AWS_PY_METHOD_DEF(s3_meta_request_cancel, METH_VARARGS), AWS_PY_METHOD_DEF(s3_get_ec2_instance_type, METH_NOARGS), AWS_PY_METHOD_DEF(s3_is_crt_s3_optimized_for_system, METH_NOARGS), + AWS_PY_METHOD_DEF(s3_cross_process_lock_new, METH_VARARGS), + AWS_PY_METHOD_DEF(s3_cross_process_lock_acquire, METH_VARARGS), + AWS_PY_METHOD_DEF(s3_cross_process_lock_release, METH_VARARGS), /* WebSocket */ AWS_PY_METHOD_DEF(websocket_client_connect, METH_VARARGS), diff --git a/source/s3.h b/source/s3.h index 85543adc0..48c847ea2 100644 --- a/source/s3.h +++ b/source/s3.h @@ -15,6 +15,10 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args); PyObject *aws_py_s3_meta_request_cancel(PyObject *self, PyObject *args); +PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args); +PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args); +PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args); + struct aws_s3_client *aws_py_get_s3_client(PyObject *s3_client); struct aws_s3_meta_request *aws_py_get_s3_meta_request(PyObject *s3_client); diff --git a/source/s3_client.c b/source/s3_client.c index dc3454159..25966d756 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -6,9 +6,11 @@ #include "auth.h" #include "io.h" +#include #include static const char *s_capsule_name_s3_client = "aws_s3_client"; +static const char *s_capsule_name_s3_instance_lock = "aws_cross_process_lock"; PyObject *aws_py_s3_get_ec2_instance_type(PyObject *self, PyObject *args) { (void)self; @@ -37,6 +39,103 @@ PyObject *aws_py_s3_is_crt_s3_optimized_for_system(PyObject *self, PyObject *arg Py_RETURN_FALSE; } +struct cross_process_lock_binding { + struct aws_cross_process_lock *lock; + struct aws_string *name; +}; + +/* Invoked when the python object gets cleaned up */ +static void s_s3_cross_process_lock_destructor(PyObject *capsule) { + struct cross_process_lock_binding *lock_binding = PyCapsule_GetPointer(capsule, s_capsule_name_s3_instance_lock); + + if (lock_binding->lock) { + aws_cross_process_lock_release(lock_binding->lock); + lock_binding->lock = NULL; + } + + if (lock_binding->name) { + aws_string_destroy(lock_binding->name); + } + + aws_mem_release(aws_py_get_allocator(), lock_binding); +} + +PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args) { + (void)self; + + struct aws_allocator *allocator = aws_py_get_allocator(); + + struct aws_byte_cursor lock_name; /* s# */ + + if (!PyArg_ParseTuple(args, "s#", &lock_name.ptr, &lock_name.len)) { + return NULL; + } + + struct cross_process_lock_binding *binding = + aws_mem_calloc(allocator, 1, sizeof(struct cross_process_lock_binding)); + binding->name = aws_string_new_from_cursor(allocator, &lock_name); + + PyObject *capsule = PyCapsule_New(binding, s_capsule_name_s3_instance_lock, s_s3_cross_process_lock_destructor); + if (!capsule) { + aws_string_destroy(binding->name); + aws_mem_release(allocator, binding); + return PyErr_AwsLastError(); + } + + return capsule; +} + +PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args) { + (void)self; + + struct aws_allocator *allocator = aws_py_get_allocator(); + + PyObject *lock_capsule; /* O */ + + if (!PyArg_ParseTuple(args, "O", &lock_capsule)) { + return NULL; + } + + struct cross_process_lock_binding *lock_binding = + PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock); + if (!lock_binding) { + return NULL; + } + + if (!lock_binding->lock) { + struct aws_cross_process_lock *lock = + aws_cross_process_lock_try_acquire(allocator, aws_byte_cursor_from_string(lock_binding->name)); + + if (!lock) { + return PyErr_AwsLastError(); + } + lock_binding->lock = lock; + } + + Py_RETURN_NONE; +} + +PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args) { + PyObject *lock_capsule; /* O */ + + if (!PyArg_ParseTuple(args, "O", &lock_capsule)) { + return NULL; + } + + struct cross_process_lock_binding *lock_binding = + PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock); + if (!lock_binding) { + return NULL; + } + + if (lock_binding->lock) { + aws_cross_process_lock_release(lock_binding->lock); + lock_binding->lock = NULL; + } + + Py_RETURN_NONE; +} + struct s3_client_binding { struct aws_s3_client *native; diff --git a/test/test_s3.py b/test/test_s3.py index 09aa252cf..8602a8a14 100644 --- a/test/test_s3.py +++ b/test/test_s3.py @@ -8,8 +8,10 @@ import tempfile import math import shutil +import time from test import NativeResourceTest from concurrent.futures import Future +from multiprocessing import Process from awscrt.http import HttpHeaders, HttpRequest from awscrt.s3 import ( @@ -18,6 +20,7 @@ S3ChecksumLocation, S3Client, S3RequestType, + CrossProcessLock, create_default_s3_signing_config, ) from awscrt.io import ( @@ -41,6 +44,57 @@ MB = 1024 ** 2 GB = 1024 ** 3 +cross_process_lock_name = "instance_lock_test" + + +def cross_proc_task(): + try: + lock = CrossProcessLock(cross_process_lock_name) + lock.acquire() + lock.release() + exit(0) + except RuntimeError as e: + exit(-1) + + +class CrossProcessLockTest(NativeResourceTest): + def setUp(self): + self.nonce = time.time() + super().setUp() + + def test_with_statement(self): + nonce_str = f'lock_a_{self.nonce}' + with CrossProcessLock(nonce_str) as lock: + try: + new_lock = CrossProcessLock(nonce_str) + new_lock.acquire() + self.fail("Acquiring a lock by the same nonce should fail when it's already held") + except RuntimeError as e: + unique_nonce_str = f'lock_b{self.nonce}' + new_lock = CrossProcessLock(unique_nonce_str) + new_lock.acquire() + new_lock.release() + + lock_after_with_same_nonce = CrossProcessLock(nonce_str) + lock_after_with_same_nonce.acquire() + lock_after_with_same_nonce.release() + + def test_cross_proc(self): + with CrossProcessLock(cross_process_lock_name) as lock: + process = Process(target=cross_proc_task) + process.start() + process.join() + # aquiring this lock in a sub-process should fail since we + # already hold the lock in this process. + self.assertNotEqual(0, process.exitcode) + + # now that we've released the lock above, the same sub-process path + # should now succeed. + unlocked_process = Process(target=cross_proc_task) + unlocked_process.start() + unlocked_process.join() + self.assertEqual(0, unlocked_process.exitcode) + class FileCreator(object): def __init__(self):