Skip to content

Commit

Permalink
Bind out cross-process lock with unit tests. (#519)
Browse files Browse the repository at this point in the history
Co-authored-by: Nate Prewitt <[email protected]>
  • Loading branch information
JonathanHenson and nateprewitt authored Nov 7, 2023
1 parent fbf82f2 commit 16e6492
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 1 deletion.
31 changes: 31 additions & 0 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,37 @@
from enum import IntEnum


class CrossProcessLock(NativeResource):
"""
Class representing an exclusive cross-process lock, scoped by `lock_scope_name`
Recommended usage is to either explicitly call acquire() followed by release() when the lock is no longer required, or use this in a 'with' statement.
acquire() will throw a RuntimeError with AWS_MUTEX_CALLER_NOT_OWNER as the error code, if the lock could not be acquired.
If the lock has not been explicitly released when the process exits, it will be released by the operating system.
Keyword Args:
lock_scope_name (str): Unique string identifying the caller holding the lock.
"""

def __init__(self, lock_scope_name):
super().__init__()
self._binding = _awscrt.s3_cross_process_lock_new(lock_scope_name)

def acquire(self):
_awscrt.s3_cross_process_lock_acquire(self._binding)

def __enter__(self):
self.acquire()

def release(self):
_awscrt.s3_cross_process_lock_release(self._binding)

def __exit__(self, exc_type, exc_value, exc_tb):
self.release()


class S3RequestType(IntEnum):
"""The type of the AWS S3 request"""

Expand Down
3 changes: 3 additions & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,9 @@ static PyMethodDef s_module_methods[] = {
AWS_PY_METHOD_DEF(s3_meta_request_cancel, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_get_ec2_instance_type, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_is_crt_s3_optimized_for_system, METH_NOARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_new, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_acquire, METH_VARARGS),
AWS_PY_METHOD_DEF(s3_cross_process_lock_release, METH_VARARGS),

/* WebSocket */
AWS_PY_METHOD_DEF(websocket_client_connect, METH_VARARGS),
Expand Down
4 changes: 4 additions & 0 deletions source/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args);

PyObject *aws_py_s3_meta_request_cancel(PyObject *self, PyObject *args);

PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args);
PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args);
PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args);

struct aws_s3_client *aws_py_get_s3_client(PyObject *s3_client);
struct aws_s3_meta_request *aws_py_get_s3_meta_request(PyObject *s3_client);

Expand Down
99 changes: 99 additions & 0 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

#include "auth.h"
#include "io.h"
#include <aws/common/cross_process_lock.h>
#include <aws/s3/s3_client.h>

static const char *s_capsule_name_s3_client = "aws_s3_client";
static const char *s_capsule_name_s3_instance_lock = "aws_cross_process_lock";

PyObject *aws_py_s3_get_ec2_instance_type(PyObject *self, PyObject *args) {
(void)self;
Expand Down Expand Up @@ -37,6 +39,103 @@ PyObject *aws_py_s3_is_crt_s3_optimized_for_system(PyObject *self, PyObject *arg
Py_RETURN_FALSE;
}

struct cross_process_lock_binding {
struct aws_cross_process_lock *lock;
struct aws_string *name;
};

/* Invoked when the python object gets cleaned up */
static void s_s3_cross_process_lock_destructor(PyObject *capsule) {
struct cross_process_lock_binding *lock_binding = PyCapsule_GetPointer(capsule, s_capsule_name_s3_instance_lock);

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

if (lock_binding->name) {
aws_string_destroy(lock_binding->name);
}

aws_mem_release(aws_py_get_allocator(), lock_binding);
}

PyObject *aws_py_s3_cross_process_lock_new(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

struct aws_byte_cursor lock_name; /* s# */

if (!PyArg_ParseTuple(args, "s#", &lock_name.ptr, &lock_name.len)) {
return NULL;
}

struct cross_process_lock_binding *binding =
aws_mem_calloc(allocator, 1, sizeof(struct cross_process_lock_binding));
binding->name = aws_string_new_from_cursor(allocator, &lock_name);

PyObject *capsule = PyCapsule_New(binding, s_capsule_name_s3_instance_lock, s_s3_cross_process_lock_destructor);
if (!capsule) {
aws_string_destroy(binding->name);
aws_mem_release(allocator, binding);
return PyErr_AwsLastError();
}

return capsule;
}

PyObject *aws_py_s3_cross_process_lock_acquire(PyObject *self, PyObject *args) {
(void)self;

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *lock_capsule; /* O */

if (!PyArg_ParseTuple(args, "O", &lock_capsule)) {
return NULL;
}

struct cross_process_lock_binding *lock_binding =
PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock);
if (!lock_binding) {
return NULL;
}

if (!lock_binding->lock) {
struct aws_cross_process_lock *lock =
aws_cross_process_lock_try_acquire(allocator, aws_byte_cursor_from_string(lock_binding->name));

if (!lock) {
return PyErr_AwsLastError();
}
lock_binding->lock = lock;
}

Py_RETURN_NONE;
}

PyObject *aws_py_s3_cross_process_lock_release(PyObject *self, PyObject *args) {
PyObject *lock_capsule; /* O */

if (!PyArg_ParseTuple(args, "O", &lock_capsule)) {
return NULL;
}

struct cross_process_lock_binding *lock_binding =
PyCapsule_GetPointer(lock_capsule, s_capsule_name_s3_instance_lock);
if (!lock_binding) {
return NULL;
}

if (lock_binding->lock) {
aws_cross_process_lock_release(lock_binding->lock);
lock_binding->lock = NULL;
}

Py_RETURN_NONE;
}

struct s3_client_binding {
struct aws_s3_client *native;

Expand Down
54 changes: 54 additions & 0 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import tempfile
import math
import shutil
import time
from test import NativeResourceTest
from concurrent.futures import Future
from multiprocessing import Process

from awscrt.http import HttpHeaders, HttpRequest
from awscrt.s3 import (
Expand All @@ -18,6 +20,7 @@
S3ChecksumLocation,
S3Client,
S3RequestType,
CrossProcessLock,
create_default_s3_signing_config,
)
from awscrt.io import (
Expand All @@ -41,6 +44,57 @@
MB = 1024 ** 2
GB = 1024 ** 3

cross_process_lock_name = "instance_lock_test"


def cross_proc_task():
try:
lock = CrossProcessLock(cross_process_lock_name)
lock.acquire()
lock.release()
exit(0)
except RuntimeError as e:
exit(-1)


class CrossProcessLockTest(NativeResourceTest):
def setUp(self):
self.nonce = time.time()
super().setUp()

def test_with_statement(self):
nonce_str = f'lock_a_{self.nonce}'
with CrossProcessLock(nonce_str) as lock:
try:
new_lock = CrossProcessLock(nonce_str)
new_lock.acquire()
self.fail("Acquiring a lock by the same nonce should fail when it's already held")
except RuntimeError as e:
unique_nonce_str = f'lock_b{self.nonce}'
new_lock = CrossProcessLock(unique_nonce_str)
new_lock.acquire()
new_lock.release()

lock_after_with_same_nonce = CrossProcessLock(nonce_str)
lock_after_with_same_nonce.acquire()
lock_after_with_same_nonce.release()

def test_cross_proc(self):
with CrossProcessLock(cross_process_lock_name) as lock:
process = Process(target=cross_proc_task)
process.start()
process.join()
# aquiring this lock in a sub-process should fail since we
# already hold the lock in this process.
self.assertNotEqual(0, process.exitcode)

# now that we've released the lock above, the same sub-process path
# should now succeed.
unlocked_process = Process(target=cross_proc_task)
unlocked_process.start()
unlocked_process.join()
self.assertEqual(0, unlocked_process.exitcode)


class FileCreator(object):
def __init__(self):
Expand Down

0 comments on commit 16e6492

Please sign in to comment.