1"""Module implementing the Pool for :mod:``requests_toolbelt.threaded``."""
2import multiprocessing
3import requests
4
5from . import thread
6from .._compat import queue
7
8
9class Pool(object):
10    """Pool that manages the threads containing sessions.
11
12    :param queue:
13        The queue you're expected to use to which you should add items.
14    :type queue: queue.Queue
15    :param initializer:
16        Function used to initialize an instance of ``session``.
17    :type initializer: collections.Callable
18    :param auth_generator:
19        Function used to generate new auth credentials for the session.
20    :type auth_generator: collections.Callable
21    :param int num_process:
22        Number of threads to create.
23    :param session:
24    :type session: requests.Session
25    """
26
27    def __init__(self, job_queue, initializer=None, auth_generator=None,
28                 num_processes=None, session=requests.Session):
29        if num_processes is None:
30            num_processes = multiprocessing.cpu_count() or 1
31
32        if num_processes < 1:
33            raise ValueError("Number of processes should at least be 1.")
34
35        self._job_queue = job_queue
36        self._response_queue = queue.Queue()
37        self._exc_queue = queue.Queue()
38        self._processes = num_processes
39        self._initializer = initializer or _identity
40        self._auth = auth_generator or _identity
41        self._session = session
42        self._pool = [
43            thread.SessionThread(self._new_session(), self._job_queue,
44                                 self._response_queue, self._exc_queue)
45            for _ in range(self._processes)
46        ]
47
48    def _new_session(self):
49        return self._auth(self._initializer(self._session()))
50
51    @classmethod
52    def from_exceptions(cls, exceptions, **kwargs):
53        r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s.
54
55        Provided an iterable that provides :class:`~ThreadException` objects,
56        this classmethod will generate a new pool to retry the requests that
57        caused the exceptions.
58
59        :param exceptions:
60            Iterable that returns :class:`~ThreadException`
61        :type exceptions: iterable
62        :param kwargs:
63            Keyword arguments passed to the :class:`~Pool` initializer.
64        :returns: An initialized :class:`~Pool` object.
65        :rtype: :class:`~Pool`
66        """
67        job_queue = queue.Queue()
68        for exc in exceptions:
69            job_queue.put(exc.request_kwargs)
70
71        return cls(job_queue=job_queue, **kwargs)
72
73    @classmethod
74    def from_urls(cls, urls, request_kwargs=None, **kwargs):
75        """Create a :class:`~Pool` from an iterable of URLs.
76
77        :param urls:
78            Iterable that returns URLs with which we create a pool.
79        :type urls: iterable
80        :param dict request_kwargs:
81            Dictionary of other keyword arguments to provide to the request
82            method.
83        :param kwargs:
84            Keyword arguments passed to the :class:`~Pool` initializer.
85        :returns: An initialized :class:`~Pool` object.
86        :rtype: :class:`~Pool`
87        """
88        request_dict = {'method': 'GET'}
89        request_dict.update(request_kwargs or {})
90        job_queue = queue.Queue()
91        for url in urls:
92            job = request_dict.copy()
93            job.update({'url': url})
94            job_queue.put(job)
95
96        return cls(job_queue=job_queue, **kwargs)
97
98    def exceptions(self):
99        """Iterate over all the exceptions in the pool.
100
101        :returns: Generator of :class:`~ThreadException`
102        """
103        while True:
104            exc = self.get_exception()
105            if exc is None:
106                break
107            yield exc
108
109    def get_exception(self):
110        """Get an exception from the pool.
111
112        :rtype: :class:`~ThreadException`
113        """
114        try:
115            (request, exc) = self._exc_queue.get_nowait()
116        except queue.Empty:
117            return None
118        else:
119            return ThreadException(request, exc)
120
121    def get_response(self):
122        """Get a response from the pool.
123
124        :rtype: :class:`~ThreadResponse`
125        """
126        try:
127            (request, response) = self._response_queue.get_nowait()
128        except queue.Empty:
129            return None
130        else:
131            return ThreadResponse(request, response)
132
133    def responses(self):
134        """Iterate over all the responses in the pool.
135
136        :returns: Generator of :class:`~ThreadResponse`
137        """
138        while True:
139            resp = self.get_response()
140            if resp is None:
141                break
142            yield resp
143
144    def join_all(self):
145        """Join all the threads to the master thread."""
146        for session_thread in self._pool:
147            session_thread.join()
148
149
150class ThreadProxy(object):
151    proxied_attr = None
152
153    def __getattr__(self, attr):
154        """Proxy attribute accesses to the proxied object."""
155        get = object.__getattribute__
156        if attr not in self.attrs:
157            response = get(self, self.proxied_attr)
158            return getattr(response, attr)
159        else:
160            return get(self, attr)
161
162
163class ThreadResponse(ThreadProxy):
164    """A wrapper around a requests Response object.
165
166    This will proxy most attribute access actions to the Response object. For
167    example, if you wanted the parsed JSON from the response, you might do:
168
169    .. code-block:: python
170
171        thread_response = pool.get_response()
172        json = thread_response.json()
173
174    """
175    proxied_attr = 'response'
176    attrs = frozenset(['request_kwargs', 'response'])
177
178    def __init__(self, request_kwargs, response):
179        #: The original keyword arguments provided to the queue
180        self.request_kwargs = request_kwargs
181        #: The wrapped response
182        self.response = response
183
184
185class ThreadException(ThreadProxy):
186    """A wrapper around an exception raised during a request.
187
188    This will proxy most attribute access actions to the exception object. For
189    example, if you wanted the message from the exception, you might do:
190
191    .. code-block:: python
192
193        thread_exc = pool.get_exception()
194        msg = thread_exc.message
195
196    """
197    proxied_attr = 'exception'
198    attrs = frozenset(['request_kwargs', 'exception'])
199
200    def __init__(self, request_kwargs, exception):
201        #: The original keyword arguments provided to the queue
202        self.request_kwargs = request_kwargs
203        #: The captured and wrapped exception
204        self.exception = exception
205
206
207def _identity(session_obj):
208    return session_obj
209
210
211__all__ = ['ThreadException', 'ThreadResponse', 'Pool']
212