1"""Module implementing the Pool for :mod:``requests_toolbelt.threaded``.""" 2import multiprocessing 3import requests 4 5from . import thread 6from .._compat import queue 7 8 9class Pool(object): 10 """Pool that manages the threads containing sessions. 11 12 :param queue: 13 The queue you're expected to use to which you should add items. 14 :type queue: queue.Queue 15 :param initializer: 16 Function used to initialize an instance of ``session``. 17 :type initializer: collections.Callable 18 :param auth_generator: 19 Function used to generate new auth credentials for the session. 20 :type auth_generator: collections.Callable 21 :param int num_process: 22 Number of threads to create. 23 :param session: 24 :type session: requests.Session 25 """ 26 27 def __init__(self, job_queue, initializer=None, auth_generator=None, 28 num_processes=None, session=requests.Session): 29 if num_processes is None: 30 num_processes = multiprocessing.cpu_count() or 1 31 32 if num_processes < 1: 33 raise ValueError("Number of processes should at least be 1.") 34 35 self._job_queue = job_queue 36 self._response_queue = queue.Queue() 37 self._exc_queue = queue.Queue() 38 self._processes = num_processes 39 self._initializer = initializer or _identity 40 self._auth = auth_generator or _identity 41 self._session = session 42 self._pool = [ 43 thread.SessionThread(self._new_session(), self._job_queue, 44 self._response_queue, self._exc_queue) 45 for _ in range(self._processes) 46 ] 47 48 def _new_session(self): 49 return self._auth(self._initializer(self._session())) 50 51 @classmethod 52 def from_exceptions(cls, exceptions, **kwargs): 53 r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s. 54 55 Provided an iterable that provides :class:`~ThreadException` objects, 56 this classmethod will generate a new pool to retry the requests that 57 caused the exceptions. 58 59 :param exceptions: 60 Iterable that returns :class:`~ThreadException` 61 :type exceptions: iterable 62 :param kwargs: 63 Keyword arguments passed to the :class:`~Pool` initializer. 64 :returns: An initialized :class:`~Pool` object. 65 :rtype: :class:`~Pool` 66 """ 67 job_queue = queue.Queue() 68 for exc in exceptions: 69 job_queue.put(exc.request_kwargs) 70 71 return cls(job_queue=job_queue, **kwargs) 72 73 @classmethod 74 def from_urls(cls, urls, request_kwargs=None, **kwargs): 75 """Create a :class:`~Pool` from an iterable of URLs. 76 77 :param urls: 78 Iterable that returns URLs with which we create a pool. 79 :type urls: iterable 80 :param dict request_kwargs: 81 Dictionary of other keyword arguments to provide to the request 82 method. 83 :param kwargs: 84 Keyword arguments passed to the :class:`~Pool` initializer. 85 :returns: An initialized :class:`~Pool` object. 86 :rtype: :class:`~Pool` 87 """ 88 request_dict = {'method': 'GET'} 89 request_dict.update(request_kwargs or {}) 90 job_queue = queue.Queue() 91 for url in urls: 92 job = request_dict.copy() 93 job.update({'url': url}) 94 job_queue.put(job) 95 96 return cls(job_queue=job_queue, **kwargs) 97 98 def exceptions(self): 99 """Iterate over all the exceptions in the pool. 100 101 :returns: Generator of :class:`~ThreadException` 102 """ 103 while True: 104 exc = self.get_exception() 105 if exc is None: 106 break 107 yield exc 108 109 def get_exception(self): 110 """Get an exception from the pool. 111 112 :rtype: :class:`~ThreadException` 113 """ 114 try: 115 (request, exc) = self._exc_queue.get_nowait() 116 except queue.Empty: 117 return None 118 else: 119 return ThreadException(request, exc) 120 121 def get_response(self): 122 """Get a response from the pool. 123 124 :rtype: :class:`~ThreadResponse` 125 """ 126 try: 127 (request, response) = self._response_queue.get_nowait() 128 except queue.Empty: 129 return None 130 else: 131 return ThreadResponse(request, response) 132 133 def responses(self): 134 """Iterate over all the responses in the pool. 135 136 :returns: Generator of :class:`~ThreadResponse` 137 """ 138 while True: 139 resp = self.get_response() 140 if resp is None: 141 break 142 yield resp 143 144 def join_all(self): 145 """Join all the threads to the master thread.""" 146 for session_thread in self._pool: 147 session_thread.join() 148 149 150class ThreadProxy(object): 151 proxied_attr = None 152 153 def __getattr__(self, attr): 154 """Proxy attribute accesses to the proxied object.""" 155 get = object.__getattribute__ 156 if attr not in self.attrs: 157 response = get(self, self.proxied_attr) 158 return getattr(response, attr) 159 else: 160 return get(self, attr) 161 162 163class ThreadResponse(ThreadProxy): 164 """A wrapper around a requests Response object. 165 166 This will proxy most attribute access actions to the Response object. For 167 example, if you wanted the parsed JSON from the response, you might do: 168 169 .. code-block:: python 170 171 thread_response = pool.get_response() 172 json = thread_response.json() 173 174 """ 175 proxied_attr = 'response' 176 attrs = frozenset(['request_kwargs', 'response']) 177 178 def __init__(self, request_kwargs, response): 179 #: The original keyword arguments provided to the queue 180 self.request_kwargs = request_kwargs 181 #: The wrapped response 182 self.response = response 183 184 185class ThreadException(ThreadProxy): 186 """A wrapper around an exception raised during a request. 187 188 This will proxy most attribute access actions to the exception object. For 189 example, if you wanted the message from the exception, you might do: 190 191 .. code-block:: python 192 193 thread_exc = pool.get_exception() 194 msg = thread_exc.message 195 196 """ 197 proxied_attr = 'exception' 198 attrs = frozenset(['request_kwargs', 'exception']) 199 200 def __init__(self, request_kwargs, exception): 201 #: The original keyword arguments provided to the queue 202 self.request_kwargs = request_kwargs 203 #: The captured and wrapped exception 204 self.exception = exception 205 206 207def _identity(session_obj): 208 return session_obj 209 210 211__all__ = ['ThreadException', 'ThreadResponse', 'Pool'] 212