diff --git a/devlib/target.py b/devlib/target.py index baaf338c0..db4ed9adb 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -103,7 +103,8 @@ def call_conn(f): @functools.wraps(f) def wrapper(self, *args, **kwargs): - reentered = self.conn.is_in_use + conn = self.conn + reentered = conn.is_in_use disconnect = False try: # If the connection was already in use we need to use a different @@ -113,8 +114,9 @@ def wrapper(self, *args, **kwargs): if reentered: # Shallow copy so we can use another connection instance _self = copy.copy(self) - _self.conn = _self.get_connection() - assert self.conn is not _self.conn + new_conn = _self.get_connection() + assert conn is not new_conn + _self.conn = new_conn disconnect = True else: _self = self @@ -122,6 +124,13 @@ def wrapper(self, *args, **kwargs): finally: if disconnect: _self.disconnect() + elif not reentered: + # Return the connection to the pool, so if we end up exiting + # the thread the connection can then be reused by another + # thread. + del self.conn + with self._lock: + self._unused_conns.add(conn) return wrapper @@ -284,7 +293,8 @@ def is_running(self, comm): @tls_property def _conn(self): try: - return self._unused_conns.pop() + with self._lock: + return self._unused_conns.pop() except KeyError: return self.get_connection() @@ -311,6 +321,8 @@ def __init__(self, is_container=False, max_async=50, ): + + self._lock = threading.RLock() self._async_pool = None self._async_pool_size = None self._unused_conns = set() @@ -435,6 +447,7 @@ def __getstate__(self): ignored.update(( '_async_pool', '_unused_conns', + '_lock', )) return { k: v @@ -450,6 +463,7 @@ def __setstate__(self, dct): else: self._async_pool = ThreadPoolExecutor(pool_size) self._unused_conns = set() + self._lock = threading.RLock() # connection and initialization @@ -542,6 +556,13 @@ def disconnect(self): if pool is not None: pool.__exit__(None, None, None) + with self._lock: + connections = self._conn.get_all_values() + for conn in itertools.chain(connections, self._unused_conns): + conn.close() + if self._async_pool is not None: + self._async_pool.__exit__(None, None, None) + def __enter__(self): return self