1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2009 Christopher Lenz
5# All rights reserved.
6#
7# This software is licensed as described in the file COPYING, which
8# you should have received as part of this distribution.
9
10"""Simple HTTP client implementation based on the ``httplib`` module in the
11standard library.
12"""
13
14from base64 import b64encode
15from datetime import datetime
16import errno
17import socket
18import time
19import sys
20import ssl
21
22try:
23    from threading import Lock
24except ImportError:
25    from dummy_threading import Lock
26
27try:
28    from http.client import BadStatusLine, HTTPConnection, HTTPSConnection
29except ImportError:
30    from httplib import BadStatusLine, HTTPConnection, HTTPSConnection
31
32try:
33    from email.Utils import parsedate
34except ImportError:
35    from email.utils import parsedate
36
37from couchdb import json
38from couchdb import util
39
40__all__ = ['HTTPError', 'PreconditionFailed', 'ResourceNotFound',
41           'ResourceConflict', 'ServerError', 'Unauthorized', 'RedirectLimit',
42           'Session', 'Resource']
43__docformat__ = 'restructuredtext en'
44
45
46if sys.version < '2.7':
47
48    from httplib import CannotSendHeader, _CS_REQ_STARTED, _CS_REQ_SENT
49
50    class NagleMixin:
51        """
52        Mixin to upgrade httplib connection types so headers and body can be
53        sent at the same time to avoid triggering Nagle's algorithm.
54
55        Based on code originally copied from Python 2.7's httplib module.
56        """
57
58        def endheaders(self, message_body=None):
59            if self.__dict__['_HTTPConnection__state'] == _CS_REQ_STARTED:
60                self.__dict__['_HTTPConnection__state'] = _CS_REQ_SENT
61            else:
62                raise CannotSendHeader()
63            self._send_output(message_body)
64
65        def _send_output(self, message_body=None):
66            self._buffer.extend(("", ""))
67            msg = "\r\n".join(self._buffer)
68            del self._buffer[:]
69            if isinstance(message_body, str):
70                msg += message_body
71                message_body = None
72            self.send(msg)
73            if message_body is not None:
74                self.send(message_body)
75
76    class HTTPConnection(NagleMixin, HTTPConnection):
77        pass
78
79    class HTTPSConnection(NagleMixin, HTTPSConnection):
80        pass
81
82
83class HTTPError(Exception):
84    """Base class for errors based on HTTP status codes >= 400."""
85
86
87class PreconditionFailed(HTTPError):
88    """Exception raised when a 412 HTTP error is received in response to a
89    request.
90    """
91
92
93class ResourceNotFound(HTTPError):
94    """Exception raised when a 404 HTTP error is received in response to a
95    request.
96    """
97
98
99class ResourceConflict(HTTPError):
100    """Exception raised when a 409 HTTP error is received in response to a
101    request.
102    """
103
104
105class ServerError(HTTPError):
106    """Exception raised when an unexpected HTTP error is received in response
107    to a request.
108    """
109
110
111class Unauthorized(HTTPError):
112    """Exception raised when the server requires authentication credentials
113    but either none are provided, or they are incorrect.
114    """
115
116
117class RedirectLimit(Exception):
118    """Exception raised when a request is redirected more often than allowed
119    by the maximum number of redirections.
120    """
121
122
123CHUNK_SIZE = 1024 * 8
124
125class ResponseBody(object):
126
127    def __init__(self, resp, conn_pool, url, conn):
128        self.resp = resp
129        self.chunked = self.resp.msg.get('transfer-encoding') == 'chunked'
130        self.conn_pool = conn_pool
131        self.url = url
132        self.conn = conn
133
134    def __del__(self):
135        if not self.chunked:
136            self.close()
137        else:
138            self.resp.close()
139            if self.conn:
140                # Since chunked responses can be infinite (i.e. for
141                # feed=continuous), and we want to avoid leaking sockets
142                # (even if just to prevent ResourceWarnings when running
143                # the test suite on Python 3), we'll close this connection
144                # eagerly. We can't get it into the clean state required to
145                # put it back into the ConnectionPool (since we don't know
146                # when it ends and we can only do blocking reads). Finding
147                # out whether it might in fact end would be relatively onerous
148                # and require a layering violation.
149                self.conn.close()
150
151    def read(self, size=None):
152        bytes = self.resp.read(size)
153        if size is None or len(bytes) < size:
154            self.close()
155        return bytes
156
157    def _release_conn(self):
158        self.conn_pool.release(self.url, self.conn)
159        self.conn_pool, self.url, self.conn = None, None, None
160
161    def close(self):
162        while not self.resp.isclosed():
163            chunk = self.resp.read(CHUNK_SIZE)
164            if not chunk:
165                self.resp.close()
166        if self.conn:
167            self._release_conn()
168
169    def iterchunks(self):
170        assert self.chunked
171        buffer = []
172        while True:
173
174            if self.resp.isclosed():
175                break
176
177            chunksz = int(self.resp.fp.readline().strip(), 16)
178            if not chunksz:
179                self.resp.fp.read(2) #crlf
180                self.resp.close()
181                self._release_conn()
182                break
183
184            chunk = self.resp.fp.read(chunksz)
185            for ln in chunk.splitlines(True):
186
187                end = ln == b'\n' and not buffer # end of response
188                if not ln or end:
189                    break
190
191                buffer.append(ln)
192                if ln.endswith(b'\n'):
193                    yield b''.join(buffer)
194                    buffer = []
195
196            self.resp.fp.read(2) #crlf
197
198
199RETRYABLE_ERRORS = frozenset([
200    errno.EPIPE, errno.ETIMEDOUT,
201    errno.ECONNRESET, errno.ECONNREFUSED, errno.ECONNABORTED,
202    errno.EHOSTDOWN, errno.EHOSTUNREACH,
203    errno.ENETRESET, errno.ENETUNREACH, errno.ENETDOWN
204])
205
206
207class Session(object):
208
209    def __init__(self, cache=None, timeout=None, max_redirects=5,
210                 retry_delays=[0], retryable_errors=RETRYABLE_ERRORS):
211        """Initialize an HTTP client session.
212
213        :param cache: an instance with a dict-like interface or None to allow
214                      Session to create a dict for caching.
215        :param timeout: socket timeout in number of seconds, or `None` for no
216                        timeout (the default)
217        :param retry_delays: list of request retry delays.
218        """
219        from couchdb import __version__ as VERSION
220        self.user_agent = 'CouchDB-Python/%s' % VERSION
221        # XXX We accept a `cache` dict arg, but the ref gets overwritten later
222        # during cache cleanup. Do we remove the cache arg (does using a shared
223        # Session instance cover the same use cases?) or fix the cache cleanup?
224        # For now, let's just assign the dict to the Cache instance to retain
225        # current behaviour.
226        if cache is not None:
227            cache_by_url = cache
228            cache = Cache()
229            cache.by_url = cache_by_url
230        else:
231            cache = Cache()
232        self.cache = cache
233        self.max_redirects = max_redirects
234        self.perm_redirects = {}
235
236        self._disable_ssl_verification = False
237        self._timeout = timeout
238        self.connection_pool = ConnectionPool(
239            self._timeout,
240            disable_ssl_verification=self._disable_ssl_verification)
241
242        self.retry_delays = list(retry_delays) # We don't want this changing on us.
243        self.retryable_errors = set(retryable_errors)
244
245    def disable_ssl_verification(self):
246        """Disable verification of SSL certificates and re-initialize the
247        ConnectionPool. Only applicable on Python 2.7.9+ as previous versions
248        of Python don't verify SSL certs."""
249        self._disable_ssl_verification = True
250        self.connection_pool = ConnectionPool(self._timeout,
251            disable_ssl_verification=self._disable_ssl_verification)
252
253    def request(self, method, url, body=None, headers=None, credentials=None,
254                num_redirects=0):
255        if url in self.perm_redirects:
256            url = self.perm_redirects[url]
257        method = method.upper()
258
259        if headers is None:
260            headers = {}
261        headers.setdefault('Accept', 'application/json')
262        headers['User-Agent'] = self.user_agent
263
264        cached_resp = None
265        if method in ('GET', 'HEAD'):
266            cached_resp = self.cache.get(url)
267            if cached_resp is not None:
268                etag = cached_resp[1].get('etag')
269                if etag:
270                    headers['If-None-Match'] = etag
271
272        if (body is not None and not isinstance(body, util.strbase) and
273                not hasattr(body, 'read')):
274            body = json.encode(body).encode('utf-8')
275            headers.setdefault('Content-Type', 'application/json')
276
277        if body is None:
278            headers.setdefault('Content-Length', '0')
279        elif isinstance(body, util.strbase):
280            headers.setdefault('Content-Length', str(len(body)))
281        else:
282            headers['Transfer-Encoding'] = 'chunked'
283
284        authorization = basic_auth(credentials)
285        if authorization:
286            headers['Authorization'] = authorization
287
288        path_query = util.urlunsplit(('', '') + util.urlsplit(url)[2:4] + ('',))
289        conn = self.connection_pool.get(url)
290
291        def _try_request_with_retries(retries):
292            while True:
293                try:
294                    return _try_request()
295                except socket.error as e:
296                    ecode = e.args[0]
297                    if ecode not in self.retryable_errors:
298                        raise
299                    try:
300                        delay = next(retries)
301                    except StopIteration:
302                        # No more retries, raise last socket error.
303                        raise e
304                    finally:
305                        time.sleep(delay)
306                        conn.close()
307
308        def _try_request():
309            try:
310                conn.putrequest(method, path_query, skip_accept_encoding=True)
311                for header in headers:
312                    conn.putheader(header, headers[header])
313                if body is None:
314                    conn.endheaders()
315                else:
316                    if isinstance(body, util.strbase):
317                        if isinstance(body, util.utype):
318                            conn.endheaders(body.encode('utf-8'))
319                        else:
320                            conn.endheaders(body)
321                    else: # assume a file-like object and send in chunks
322                        conn.endheaders()
323                        while 1:
324                            chunk = body.read(CHUNK_SIZE)
325                            if not chunk:
326                                break
327                            if isinstance(chunk, util.utype):
328                                chunk = chunk.encode('utf-8')
329                            status = ('%x\r\n' % len(chunk)).encode('utf-8')
330                            conn.send(status + chunk + b'\r\n')
331                        conn.send(b'0\r\n\r\n')
332                return conn.getresponse()
333            except BadStatusLine as e:
334                # httplib raises a BadStatusLine when it cannot read the status
335                # line saying, "Presumably, the server closed the connection
336                # before sending a valid response."
337                # Raise as ECONNRESET to simplify retry logic.
338                if e.line == '' or e.line == "''":
339                    raise socket.error(errno.ECONNRESET)
340                else:
341                    raise
342
343        resp = _try_request_with_retries(iter(self.retry_delays))
344        status = resp.status
345
346        # Handle conditional response
347        if status == 304 and method in ('GET', 'HEAD'):
348            resp.read()
349            self.connection_pool.release(url, conn)
350            status, msg, data = cached_resp
351            if data is not None:
352                data = util.StringIO(data)
353            return status, msg, data
354        elif cached_resp:
355            self.cache.remove(url)
356
357        # Handle redirects
358        if status == 303 or \
359                method in ('GET', 'HEAD') and status in (301, 302, 307):
360            resp.read()
361            self.connection_pool.release(url, conn)
362            if num_redirects > self.max_redirects:
363                raise RedirectLimit('Redirection limit exceeded')
364            location = resp.getheader('location')
365
366            # in case of relative location: add scheme and host to the location
367            location_split = util.urlsplit(location)
368
369            if not location_split[0]:
370                orig_url_split = util.urlsplit(url)
371                location = util.urlunsplit(orig_url_split[:2] + location_split[2:])
372
373            if status == 301:
374                self.perm_redirects[url] = location
375            elif status == 303:
376                method = 'GET'
377            return self.request(method, location, body, headers,
378                                num_redirects=num_redirects + 1)
379
380        data = None
381        streamed = False
382
383        # Read the full response for empty responses so that the connection is
384        # in good state for the next request
385        if method == 'HEAD' or resp.getheader('content-length') == '0' or \
386                status < 200 or status in (204, 304):
387            resp.read()
388            self.connection_pool.release(url, conn)
389
390        # Buffer small non-JSON response bodies
391        elif int(resp.getheader('content-length', sys.maxsize)) < CHUNK_SIZE:
392            data = resp.read()
393            self.connection_pool.release(url, conn)
394
395        # For large or chunked response bodies, do not buffer the full body,
396        # and instead return a minimal file-like object
397        else:
398            data = ResponseBody(resp, self.connection_pool, url, conn)
399            streamed = True
400
401        # Handle errors
402        if status >= 400:
403            ctype = resp.getheader('content-type')
404            if data is not None and 'application/json' in ctype:
405                data = json.decode(data.decode('utf-8'))
406                error = data.get('error'), data.get('reason')
407            elif method != 'HEAD':
408                error = resp.read()
409                self.connection_pool.release(url, conn)
410            else:
411                error = ''
412            if status == 401:
413                raise Unauthorized(error)
414            elif status == 404:
415                raise ResourceNotFound(error)
416            elif status == 409:
417                raise ResourceConflict(error)
418            elif status == 412:
419                raise PreconditionFailed(error)
420            else:
421                raise ServerError((status, error))
422
423        # Store cachable responses
424        if not streamed and method == 'GET' and 'etag' in resp.msg:
425            self.cache.put(url, (status, resp.msg, data))
426
427        if not streamed and data is not None:
428            data = util.StringIO(data)
429
430        return status, resp.msg, data
431
432
433def cache_sort(i):
434    return datetime.fromtimestamp(time.mktime(parsedate(i[1][1]['Date'])))
435
436class Cache(object):
437    """Content cache."""
438
439    # Some random values to limit memory use
440    keep_size, max_size = 10, 75
441
442    def __init__(self):
443        self.by_url = {}
444
445    def get(self, url):
446        return self.by_url.get(url)
447
448    def put(self, url, response):
449        self.by_url[url] = response
450        if len(self.by_url) > self.max_size:
451            self._clean()
452
453    def remove(self, url):
454        self.by_url.pop(url, None)
455
456    def _clean(self):
457        ls = sorted(self.by_url.items(), key=cache_sort)
458        self.by_url = dict(ls[-self.keep_size:])
459
460
461class InsecureHTTPSConnection(HTTPSConnection):
462    """Wrapper class to create an HTTPSConnection without SSL verification
463    (the default behavior in Python < 2.7.9). See:
464    https://docs.python.org/2/library/httplib.html#httplib.HTTPSConnection"""
465    if sys.version_info >= (2, 7, 9):
466        def __init__(self, *a, **k):
467            k['context'] = ssl._create_unverified_context()
468            HTTPSConnection.__init__(self, *a, **k)
469
470
471class ConnectionPool(object):
472    """HTTP connection pool."""
473
474    def __init__(self, timeout, disable_ssl_verification=False):
475        self.timeout = timeout
476        self.disable_ssl_verification = disable_ssl_verification
477        self.conns = {} # HTTP connections keyed by (scheme, host)
478        self.lock = Lock()
479
480    def get(self, url):
481
482        scheme, host = util.urlsplit(url, 'http', False)[:2]
483
484        # Try to reuse an existing connection.
485        self.lock.acquire()
486        try:
487            conns = self.conns.setdefault((scheme, host), [])
488            if conns:
489                conn = conns.pop(-1)
490            else:
491                conn = None
492        finally:
493            self.lock.release()
494
495        # Create a new connection if nothing was available.
496        if conn is None:
497            if scheme == 'http':
498                cls = HTTPConnection
499            elif scheme == 'https':
500                if self.disable_ssl_verification:
501                    cls = InsecureHTTPSConnection
502                else:
503                    cls = HTTPSConnection
504            else:
505                raise ValueError('%s is not a supported scheme' % scheme)
506            conn = cls(host, timeout=self.timeout)
507            conn.connect()
508
509        return conn
510
511    def release(self, url, conn):
512        scheme, host = util.urlsplit(url, 'http', False)[:2]
513        self.lock.acquire()
514        try:
515            self.conns.setdefault((scheme, host), []).append(conn)
516        finally:
517            self.lock.release()
518
519    def __del__(self):
520        for key, conns in list(self.conns.items()):
521            for conn in conns:
522                conn.close()
523
524
525class Resource(object):
526
527    def __init__(self, url, session, headers=None):
528        if sys.version_info[0] == 2 and isinstance(url, util.utype):
529            url = url.encode('utf-8') # kind of an ugly hack for issue 235
530        self.url, self.credentials = extract_credentials(url)
531        if session is None:
532            session = Session()
533        self.session = session
534        self.headers = headers or {}
535
536    def __call__(self, *path):
537        obj = type(self)(urljoin(self.url, *path), self.session)
538        obj.credentials = self.credentials
539        obj.headers = self.headers.copy()
540        return obj
541
542    def delete(self, path=None, headers=None, **params):
543        return self._request('DELETE', path, headers=headers, **params)
544
545    def get(self, path=None, headers=None, **params):
546        return self._request('GET', path, headers=headers, **params)
547
548    def head(self, path=None, headers=None, **params):
549        return self._request('HEAD', path, headers=headers, **params)
550
551    def post(self, path=None, body=None, headers=None, **params):
552        return self._request('POST', path, body=body, headers=headers,
553                             **params)
554
555    def put(self, path=None, body=None, headers=None, **params):
556        return self._request('PUT', path, body=body, headers=headers, **params)
557
558    def delete_json(self, path=None, headers=None, **params):
559        return self._request_json('DELETE', path, headers=headers, **params)
560
561    def get_json(self, path=None, headers=None, **params):
562        return self._request_json('GET', path, headers=headers, **params)
563
564    def post_json(self, path=None, body=None, headers=None, **params):
565        return self._request_json('POST', path, body=body, headers=headers,
566                                  **params)
567
568    def put_json(self, path=None, body=None, headers=None, **params):
569        return self._request_json('PUT', path, body=body, headers=headers,
570                                  **params)
571
572    def _request(self, method, path=None, body=None, headers=None, **params):
573        all_headers = self.headers.copy()
574        all_headers.update(headers or {})
575        if path is not None:
576            url = urljoin(self.url, path, **params)
577        else:
578            url = urljoin(self.url, **params)
579        return self.session.request(method, url, body=body,
580                                    headers=all_headers,
581                                    credentials=self.credentials)
582
583    def _request_json(self, method, path=None, body=None, headers=None, **params):
584        status, headers, data = self._request(method, path, body=body,
585                                              headers=headers, **params)
586        if 'application/json' in headers.get('content-type', ''):
587            data = json.decode(data.read().decode('utf-8'))
588        return status, headers, data
589
590
591
592def extract_credentials(url):
593    """Extract authentication (user name and password) credentials from the
594    given URL.
595
596    >>> extract_credentials('http://localhost:5984/_config/')
597    ('http://localhost:5984/_config/', None)
598    >>> extract_credentials('http://joe:secret@localhost:5984/_config/')
599    ('http://localhost:5984/_config/', ('joe', 'secret'))
600    >>> extract_credentials('http://joe%40example.com:secret@localhost:5984/_config/')
601    ('http://localhost:5984/_config/', ('joe@example.com', 'secret'))
602    """
603    parts = util.urlsplit(url)
604    netloc = parts[1]
605    if '@' in netloc:
606        creds, netloc = netloc.split('@')
607        credentials = tuple(util.urlunquote(i) for i in creds.split(':'))
608        parts = list(parts)
609        parts[1] = netloc
610    else:
611        credentials = None
612    return util.urlunsplit(parts), credentials
613
614
615def basic_auth(credentials):
616    """Generates authorization header value for given credentials.
617    >>> basic_auth(('root', 'relax'))
618    b'Basic cm9vdDpyZWxheA=='
619    >>> basic_auth(None)
620    >>> basic_auth(())
621    """
622    if credentials:
623        token = b64encode(('%s:%s' % credentials).encode('latin1'))
624        return ('Basic %s' % token.strip().decode('latin1')).encode('ascii')
625
626
627def quote(string, safe=''):
628    if isinstance(string, util.utype):
629        string = string.encode('utf-8')
630    return util.urlquote(string, safe)
631
632
633def urlencode(data):
634    if isinstance(data, dict):
635        data = data.items()
636    params = []
637    for name, value in data:
638        if isinstance(value, util.utype):
639            value = value.encode('utf-8')
640        params.append((name, value))
641    return util.urlencode(params)
642
643
644def urljoin(base, *path, **query):
645    """Assemble a uri based on a base, any number of path segments, and query
646    string parameters.
647
648    >>> urljoin('http://example.org', '_all_dbs')
649    'http://example.org/_all_dbs'
650
651    A trailing slash on the uri base is handled gracefully:
652
653    >>> urljoin('http://example.org/', '_all_dbs')
654    'http://example.org/_all_dbs'
655
656    And multiple positional arguments become path parts:
657
658    >>> urljoin('http://example.org/', 'foo', 'bar')
659    'http://example.org/foo/bar'
660
661    All slashes within a path part are escaped:
662
663    >>> urljoin('http://example.org/', 'foo/bar')
664    'http://example.org/foo%2Fbar'
665    >>> urljoin('http://example.org/', 'foo', '/bar/')
666    'http://example.org/foo/%2Fbar%2F'
667
668    >>> urljoin('http://example.org/', None) #doctest:+IGNORE_EXCEPTION_DETAIL
669    Traceback (most recent call last):
670        ...
671    TypeError: argument 2 to map() must support iteration
672    """
673    if base and base.endswith('/'):
674        base = base[:-1]
675    retval = [base]
676
677    # build the path
678    path = '/'.join([''] + [quote(s) for s in path])
679    if path:
680        retval.append(path)
681
682    # build the query string
683    params = []
684    for name, value in query.items():
685        if type(value) in (list, tuple):
686            params.extend([(name, i) for i in value if i is not None])
687        elif value is not None:
688            if value is True:
689                value = 'true'
690            elif value is False:
691                value = 'false'
692            params.append((name, value))
693    if params:
694        retval.extend(['?', urlencode(params)])
695
696    return ''.join(retval)
697
698