diff --git a/wayback/_client.py b/wayback/_client.py index 8ccb05c1..d553ec6a 100644 --- a/wayback/_client.py +++ b/wayback/_client.py @@ -18,6 +18,7 @@ from datetime import datetime, timezone import hashlib import logging +import queue import re import requests from requests.exceptions import (ChunkedEncodingError, @@ -26,9 +27,10 @@ ProxyError, RetryError, Timeout) +import threading import time import urllib.parse -from urllib3.connectionpool import HTTPConnectionPool +from urllib3 import HTTPConnectionPool, PoolManager from urllib3.exceptions import (ConnectTimeoutError, MaxRetryError, ReadTimeoutError) @@ -259,10 +261,19 @@ def from_httplib(cls, httplib_response, **response_kwargs): ##################################################################### +class PoolManagerAdapter(requests.adapters.HTTPAdapter): + def __init__(self, pool_manager): + self.poolmanager = pool_manager + super().__init__() + + def init_poolmanager(self, *args, **kwargs): + pass + + # TODO: make rate limiting configurable at the session level, rather than # arbitrarily set inside get_memento(). Idea: have a rate limit lock type and # pass an instance to the constructor here. -class WaybackSession(_utils.DisableAfterCloseSession, requests.Session): +class UnsafeWaybackSession(_utils.DisableAfterCloseSession, requests.Session): """ A custom session object that network pools connections and resources for requests to the Wayback Machine. @@ -299,11 +310,13 @@ class WaybackSession(_utils.DisableAfterCloseSession, requests.Session): # just the error type. See `should_retry_error()`. handleable_errors = (ConnectionError,) + retryable_errors - def __init__(self, retries=6, backoff=2, timeout=None, user_agent=None): + def __init__(self, retries=6, backoff=2, timeout=None, user_agent=None, + connection_manager=None): super().__init__() self.retries = retries self.backoff = backoff self.timeout = timeout + self.connection_manager = connection_manager self.headers = { 'User-Agent': (user_agent or f'wayback/{__version__} (+https://github.com/edgi-govdata-archiving/wayback)'), @@ -321,6 +334,7 @@ def __init__(self, retries=6, backoff=2, timeout=None, user_agent=None): # get more fancy with log filtering, since we *expect* lots of retries # with Wayback's APIs, but urllib3 logs a warning on every retry: # https://github.com/urllib3/urllib3/blob/5b047b645f5f93900d5e2fc31230848c25eb1f5f/src/urllib3/connectionpool.py#L730-L737 + self.reset() # Customize the built-in `send` functionality with retryability. # NOTE: worth considering whether we should push this logic to a custom @@ -387,8 +401,69 @@ def reset(self): self.close(disable=False) # Re-build the standard adapters. See: # https://github.com/kennethreitz/requests/blob/v2.22.0/requests/sessions.py#L415-L418 - self.mount('https://', requests.adapters.HTTPAdapter()) - self.mount('http://', requests.adapters.HTTPAdapter()) + if self.connection_manager: + self.mount('https://', PoolManagerAdapter(self.connection_manager)) + self.mount('http://', PoolManagerAdapter(self.connection_manager)) + else: + self.mount('https://', requests.adapters.HTTPAdapter()) + self.mount('http://', requests.adapters.HTTPAdapter()) + + +class WaybackSession(_utils.DisableAfterCloseSession): + def __init__(self, max_connections=None, **kwargs): + self.thread_data = threading.local() + self.all_sessions = queue.SimpleQueue() + self.session_options = kwargs + # Create a single urllib3 poolmanager (thread-safe) to share across all + # the actual concrete sessions (not thread-safe). + if max_connections: + manager = PoolManager(num_pools=10, maxsize=max_connections, + block=True, strict=True) + else: + # These defaults come from requests.adapters.HTTPAdapter. + manager = PoolManager(num_pools=10, maxsize=10, block=False, + strict=True) + self.session_options['connection_manager'] = manager + + def get_unsafe_session(self): + session = getattr(self.thread_data, 'unsafe_session', None) + if session is None: + session = UnsafeWaybackSession(**self.session_options) + self.thread_data.unsafe_session = session + self.all_sessions.put_nowait(session) + return session + + def close(self): + while True: + try: + session = self.all_sessions.get_nowait() + session.close() + except queue.Empty: + break + + def __enter__(self): + concrete_session = self.get_unsafe_session() + return concrete_session.__enter__() + + def __exit__(self, *args): + concrete_session = self.get_unsafe_session() + return concrete_session.__exit__(*args) + + def request(self, *args, **kwargs): + concrete_session = self.get_unsafe_session() + return concrete_session.request(*args, **kwargs) + + def resolve_redirects(self, *args, **kwargs): + concrete_session = self.get_unsafe_session() + return concrete_session.resolve_redirects(*args, **kwargs) + + def send(self, *args, **kwargs): + concrete_session = self.get_unsafe_session() + return concrete_session.send(*args, **kwargs) + + def should_strip_auth(self, *args, **kwargs): + concrete_session = self.get_unsafe_session() + return concrete_session.should_strip_auth(*args, **kwargs) # TODO: add retry, backoff, cross_thread_backoff, and rate_limit options that