Skip to content

Commit

Permalink
target: Allow reuse of a connection once the owning thread is terminated
Browse files Browse the repository at this point in the history
Once a thread exits, the connection instance it was using can be
returned to the pool so it can be reused by another thread.

Since there is no per-thread equivalent to atexit, this is achieved by
returning the connection to the pool after every top-level method call
that uses it directly, so the object the user can get by accessing
Target.conn can change after each call to Target method.
  • Loading branch information
douglas-raillard-arm authored and marcbonnici committed Sep 12, 2024
1 parent 1d6a007 commit 165b87f
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions devlib/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def call_conn(f):

@functools.wraps(f)
def wrapper(self, *args, **kwargs):
reentered = self.conn.is_in_use
conn = self.conn
reentered = conn.is_in_use
disconnect = False
try:
# If the connection was already in use we need to use a different
Expand All @@ -113,15 +114,23 @@ def wrapper(self, *args, **kwargs):
if reentered:
# Shallow copy so we can use another connection instance
_self = copy.copy(self)
_self.conn = _self.get_connection()
assert self.conn is not _self.conn
new_conn = _self.get_connection()
assert conn is not new_conn
_self.conn = new_conn
disconnect = True
else:
_self = self
return f(_self, *args, **kwargs)
finally:
if disconnect:
_self.disconnect()
elif not reentered:
# Return the connection to the pool, so if we end up exiting
# the thread the connection can then be reused by another
# thread.
del self.conn
with self._lock:
self._unused_conns.add(conn)

return wrapper

Expand Down Expand Up @@ -284,7 +293,8 @@ def is_running(self, comm):
@tls_property
def _conn(self):
try:
return self._unused_conns.pop()
with self._lock:
return self._unused_conns.pop()
except KeyError:
return self.get_connection()

Expand All @@ -311,6 +321,8 @@ def __init__(self,
is_container=False,
max_async=50,
):

self._lock = threading.RLock()
self._async_pool = None
self._async_pool_size = None
self._unused_conns = set()
Expand Down Expand Up @@ -435,6 +447,7 @@ def __getstate__(self):
ignored.update((
'_async_pool',
'_unused_conns',
'_lock',
))
return {
k: v
Expand All @@ -450,6 +463,7 @@ def __setstate__(self, dct):
else:
self._async_pool = ThreadPoolExecutor(pool_size)
self._unused_conns = set()
self._lock = threading.RLock()

# connection and initialization

Expand Down Expand Up @@ -542,6 +556,13 @@ def disconnect(self):
if pool is not None:
pool.__exit__(None, None, None)

with self._lock:
connections = self._conn.get_all_values()
for conn in itertools.chain(connections, self._unused_conns):
conn.close()
if self._async_pool is not None:
self._async_pool.__exit__(None, None, None)

def __enter__(self):
return self

Expand Down

0 comments on commit 165b87f

Please sign in to comment.