1#
2# Copyright 2009 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""Non-blocking HTTP client implementation using pycurl."""
17
18from __future__ import absolute_import, division, print_function
19
20import collections
21import functools
22import logging
23import pycurl  # type: ignore
24import threading
25import time
26from io import BytesIO
27
28from tornado import httputil
29from tornado import ioloop
30from tornado import stack_context
31
32from tornado.escape import utf8, native_str
33from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main
34
35curl_log = logging.getLogger('tornado.curl_httpclient')
36
37
38class CurlAsyncHTTPClient(AsyncHTTPClient):
39    def initialize(self, max_clients=10, defaults=None):
40        super(CurlAsyncHTTPClient, self).initialize(defaults=defaults)
41        self._multi = pycurl.CurlMulti()
42        self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
43        self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
44        self._curls = [self._curl_create() for i in range(max_clients)]
45        self._free_list = self._curls[:]
46        self._requests = collections.deque()
47        self._fds = {}
48        self._timeout = None
49
50        # libcurl has bugs that sometimes cause it to not report all
51        # relevant file descriptors and timeouts to TIMERFUNCTION/
52        # SOCKETFUNCTION.  Mitigate the effects of such bugs by
53        # forcing a periodic scan of all active requests.
54        self._force_timeout_callback = ioloop.PeriodicCallback(
55            self._handle_force_timeout, 1000)
56        self._force_timeout_callback.start()
57
58        # Work around a bug in libcurl 7.29.0: Some fields in the curl
59        # multi object are initialized lazily, and its destructor will
60        # segfault if it is destroyed without having been used.  Add
61        # and remove a dummy handle to make sure everything is
62        # initialized.
63        dummy_curl_handle = pycurl.Curl()
64        self._multi.add_handle(dummy_curl_handle)
65        self._multi.remove_handle(dummy_curl_handle)
66
67    def close(self):
68        self._force_timeout_callback.stop()
69        if self._timeout is not None:
70            self.io_loop.remove_timeout(self._timeout)
71        for curl in self._curls:
72            curl.close()
73        self._multi.close()
74        super(CurlAsyncHTTPClient, self).close()
75
76        # Set below properties to None to reduce the reference count of current
77        # instance, because those properties hold some methods of current
78        # instance that will case circular reference.
79        self._force_timeout_callback = None
80        self._multi = None
81
82    def fetch_impl(self, request, callback):
83        self._requests.append((request, callback, self.io_loop.time()))
84        self._process_queue()
85        self._set_timeout(0)
86
87    def _handle_socket(self, event, fd, multi, data):
88        """Called by libcurl when it wants to change the file descriptors
89        it cares about.
90        """
91        event_map = {
92            pycurl.POLL_NONE: ioloop.IOLoop.NONE,
93            pycurl.POLL_IN: ioloop.IOLoop.READ,
94            pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
95            pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
96        }
97        if event == pycurl.POLL_REMOVE:
98            if fd in self._fds:
99                self.io_loop.remove_handler(fd)
100                del self._fds[fd]
101        else:
102            ioloop_event = event_map[event]
103            # libcurl sometimes closes a socket and then opens a new
104            # one using the same FD without giving us a POLL_NONE in
105            # between.  This is a problem with the epoll IOLoop,
106            # because the kernel can tell when a socket is closed and
107            # removes it from the epoll automatically, causing future
108            # update_handler calls to fail.  Since we can't tell when
109            # this has happened, always use remove and re-add
110            # instead of update.
111            if fd in self._fds:
112                self.io_loop.remove_handler(fd)
113            self.io_loop.add_handler(fd, self._handle_events,
114                                     ioloop_event)
115            self._fds[fd] = ioloop_event
116
117    def _set_timeout(self, msecs):
118        """Called by libcurl to schedule a timeout."""
119        if self._timeout is not None:
120            self.io_loop.remove_timeout(self._timeout)
121        self._timeout = self.io_loop.add_timeout(
122            self.io_loop.time() + msecs / 1000.0, self._handle_timeout)
123
124    def _handle_events(self, fd, events):
125        """Called by IOLoop when there is activity on one of our
126        file descriptors.
127        """
128        action = 0
129        if events & ioloop.IOLoop.READ:
130            action |= pycurl.CSELECT_IN
131        if events & ioloop.IOLoop.WRITE:
132            action |= pycurl.CSELECT_OUT
133        while True:
134            try:
135                ret, num_handles = self._multi.socket_action(fd, action)
136            except pycurl.error as e:
137                ret = e.args[0]
138            if ret != pycurl.E_CALL_MULTI_PERFORM:
139                break
140        self._finish_pending_requests()
141
142    def _handle_timeout(self):
143        """Called by IOLoop when the requested timeout has passed."""
144        with stack_context.NullContext():
145            self._timeout = None
146            while True:
147                try:
148                    ret, num_handles = self._multi.socket_action(
149                        pycurl.SOCKET_TIMEOUT, 0)
150                except pycurl.error as e:
151                    ret = e.args[0]
152                if ret != pycurl.E_CALL_MULTI_PERFORM:
153                    break
154            self._finish_pending_requests()
155
156        # In theory, we shouldn't have to do this because curl will
157        # call _set_timeout whenever the timeout changes.  However,
158        # sometimes after _handle_timeout we will need to reschedule
159        # immediately even though nothing has changed from curl's
160        # perspective.  This is because when socket_action is
161        # called with SOCKET_TIMEOUT, libcurl decides internally which
162        # timeouts need to be processed by using a monotonic clock
163        # (where available) while tornado uses python's time.time()
164        # to decide when timeouts have occurred.  When those clocks
165        # disagree on elapsed time (as they will whenever there is an
166        # NTP adjustment), tornado might call _handle_timeout before
167        # libcurl is ready.  After each timeout, resync the scheduled
168        # timeout with libcurl's current state.
169        new_timeout = self._multi.timeout()
170        if new_timeout >= 0:
171            self._set_timeout(new_timeout)
172
173    def _handle_force_timeout(self):
174        """Called by IOLoop periodically to ask libcurl to process any
175        events it may have forgotten about.
176        """
177        with stack_context.NullContext():
178            while True:
179                try:
180                    ret, num_handles = self._multi.socket_all()
181                except pycurl.error as e:
182                    ret = e.args[0]
183                if ret != pycurl.E_CALL_MULTI_PERFORM:
184                    break
185            self._finish_pending_requests()
186
187    def _finish_pending_requests(self):
188        """Process any requests that were completed by the last
189        call to multi.socket_action.
190        """
191        while True:
192            num_q, ok_list, err_list = self._multi.info_read()
193            for curl in ok_list:
194                self._finish(curl)
195            for curl, errnum, errmsg in err_list:
196                self._finish(curl, errnum, errmsg)
197            if num_q == 0:
198                break
199        self._process_queue()
200
201    def _process_queue(self):
202        with stack_context.NullContext():
203            while True:
204                started = 0
205                while self._free_list and self._requests:
206                    started += 1
207                    curl = self._free_list.pop()
208                    (request, callback, queue_start_time) = self._requests.popleft()
209                    curl.info = {
210                        "headers": httputil.HTTPHeaders(),
211                        "buffer": BytesIO(),
212                        "request": request,
213                        "callback": callback,
214                        "queue_start_time": queue_start_time,
215                        "curl_start_time": time.time(),
216                        "curl_start_ioloop_time": self.io_loop.current().time(),
217                    }
218                    try:
219                        self._curl_setup_request(
220                            curl, request, curl.info["buffer"],
221                            curl.info["headers"])
222                    except Exception as e:
223                        # If there was an error in setup, pass it on
224                        # to the callback. Note that allowing the
225                        # error to escape here will appear to work
226                        # most of the time since we are still in the
227                        # caller's original stack frame, but when
228                        # _process_queue() is called from
229                        # _finish_pending_requests the exceptions have
230                        # nowhere to go.
231                        self._free_list.append(curl)
232                        callback(HTTPResponse(
233                            request=request,
234                            code=599,
235                            error=e))
236                    else:
237                        self._multi.add_handle(curl)
238
239                if not started:
240                    break
241
242    def _finish(self, curl, curl_error=None, curl_message=None):
243        info = curl.info
244        curl.info = None
245        self._multi.remove_handle(curl)
246        self._free_list.append(curl)
247        buffer = info["buffer"]
248        if curl_error:
249            error = CurlError(curl_error, curl_message)
250            code = error.code
251            effective_url = None
252            buffer.close()
253            buffer = None
254        else:
255            error = None
256            code = curl.getinfo(pycurl.HTTP_CODE)
257            effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
258            buffer.seek(0)
259        # the various curl timings are documented at
260        # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html
261        time_info = dict(
262            queue=info["curl_start_ioloop_time"] - info["queue_start_time"],
263            namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
264            connect=curl.getinfo(pycurl.CONNECT_TIME),
265            appconnect=curl.getinfo(pycurl.APPCONNECT_TIME),
266            pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME),
267            starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
268            total=curl.getinfo(pycurl.TOTAL_TIME),
269            redirect=curl.getinfo(pycurl.REDIRECT_TIME),
270        )
271        try:
272            info["callback"](HTTPResponse(
273                request=info["request"], code=code, headers=info["headers"],
274                buffer=buffer, effective_url=effective_url, error=error,
275                reason=info['headers'].get("X-Http-Reason", None),
276                request_time=self.io_loop.time() - info["curl_start_ioloop_time"],
277                start_time=info["curl_start_time"],
278                time_info=time_info))
279        except Exception:
280            self.handle_callback_exception(info["callback"])
281
282    def handle_callback_exception(self, callback):
283        self.io_loop.handle_callback_exception(callback)
284
285    def _curl_create(self):
286        curl = pycurl.Curl()
287        if curl_log.isEnabledFor(logging.DEBUG):
288            curl.setopt(pycurl.VERBOSE, 1)
289            curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug)
290        if hasattr(pycurl, 'PROTOCOLS'):  # PROTOCOLS first appeared in pycurl 7.19.5 (2014-07-12)
291            curl.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS)
292            curl.setopt(pycurl.REDIR_PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS)
293        return curl
294
295    def _curl_setup_request(self, curl, request, buffer, headers):
296        curl.setopt(pycurl.URL, native_str(request.url))
297
298        # libcurl's magic "Expect: 100-continue" behavior causes delays
299        # with servers that don't support it (which include, among others,
300        # Google's OpenID endpoint).  Additionally, this behavior has
301        # a bug in conjunction with the curl_multi_socket_action API
302        # (https://sourceforge.net/tracker/?func=detail&atid=100976&aid=3039744&group_id=976),
303        # which increases the delays.  It's more trouble than it's worth,
304        # so just turn off the feature (yes, setting Expect: to an empty
305        # value is the official way to disable this)
306        if "Expect" not in request.headers:
307            request.headers["Expect"] = ""
308
309        # libcurl adds Pragma: no-cache by default; disable that too
310        if "Pragma" not in request.headers:
311            request.headers["Pragma"] = ""
312
313        curl.setopt(pycurl.HTTPHEADER,
314                    ["%s: %s" % (native_str(k), native_str(v))
315                     for k, v in request.headers.get_all()])
316
317        curl.setopt(pycurl.HEADERFUNCTION,
318                    functools.partial(self._curl_header_callback,
319                                      headers, request.header_callback))
320        if request.streaming_callback:
321            def write_function(chunk):
322                self.io_loop.add_callback(request.streaming_callback, chunk)
323        else:
324            write_function = buffer.write
325        curl.setopt(pycurl.WRITEFUNCTION, write_function)
326        curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
327        curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
328        curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
329        curl.setopt(pycurl.TIMEOUT_MS, int(1000 * request.request_timeout))
330        if request.user_agent:
331            curl.setopt(pycurl.USERAGENT, native_str(request.user_agent))
332        else:
333            curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
334        if request.network_interface:
335            curl.setopt(pycurl.INTERFACE, request.network_interface)
336        if request.decompress_response:
337            curl.setopt(pycurl.ENCODING, "gzip,deflate")
338        else:
339            curl.setopt(pycurl.ENCODING, "none")
340        if request.proxy_host and request.proxy_port:
341            curl.setopt(pycurl.PROXY, request.proxy_host)
342            curl.setopt(pycurl.PROXYPORT, request.proxy_port)
343            if request.proxy_username:
344                credentials = httputil.encode_username_password(request.proxy_username,
345                                                                request.proxy_password)
346                curl.setopt(pycurl.PROXYUSERPWD, credentials)
347
348            if (request.proxy_auth_mode is None or
349                    request.proxy_auth_mode == "basic"):
350                curl.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_BASIC)
351            elif request.proxy_auth_mode == "digest":
352                curl.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_DIGEST)
353            else:
354                raise ValueError(
355                    "Unsupported proxy_auth_mode %s" % request.proxy_auth_mode)
356        else:
357            curl.setopt(pycurl.PROXY, '')
358            curl.unsetopt(pycurl.PROXYUSERPWD)
359        if request.validate_cert:
360            curl.setopt(pycurl.SSL_VERIFYPEER, 1)
361            curl.setopt(pycurl.SSL_VERIFYHOST, 2)
362        else:
363            curl.setopt(pycurl.SSL_VERIFYPEER, 0)
364            curl.setopt(pycurl.SSL_VERIFYHOST, 0)
365        if request.ca_certs is not None:
366            curl.setopt(pycurl.CAINFO, request.ca_certs)
367        else:
368            # There is no way to restore pycurl.CAINFO to its default value
369            # (Using unsetopt makes it reject all certificates).
370            # I don't see any way to read the default value from python so it
371            # can be restored later.  We'll have to just leave CAINFO untouched
372            # if no ca_certs file was specified, and require that if any
373            # request uses a custom ca_certs file, they all must.
374            pass
375
376        if request.allow_ipv6 is False:
377            # Curl behaves reasonably when DNS resolution gives an ipv6 address
378            # that we can't reach, so allow ipv6 unless the user asks to disable.
379            curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
380        else:
381            curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
382
383        # Set the request method through curl's irritating interface which makes
384        # up names for almost every single method
385        curl_options = {
386            "GET": pycurl.HTTPGET,
387            "POST": pycurl.POST,
388            "PUT": pycurl.UPLOAD,
389            "HEAD": pycurl.NOBODY,
390        }
391        custom_methods = set(["DELETE", "OPTIONS", "PATCH"])
392        for o in curl_options.values():
393            curl.setopt(o, False)
394        if request.method in curl_options:
395            curl.unsetopt(pycurl.CUSTOMREQUEST)
396            curl.setopt(curl_options[request.method], True)
397        elif request.allow_nonstandard_methods or request.method in custom_methods:
398            curl.setopt(pycurl.CUSTOMREQUEST, request.method)
399        else:
400            raise KeyError('unknown method ' + request.method)
401
402        body_expected = request.method in ("POST", "PATCH", "PUT")
403        body_present = request.body is not None
404        if not request.allow_nonstandard_methods:
405            # Some HTTP methods nearly always have bodies while others
406            # almost never do. Fail in this case unless the user has
407            # opted out of sanity checks with allow_nonstandard_methods.
408            if ((body_expected and not body_present) or
409                    (body_present and not body_expected)):
410                raise ValueError(
411                    'Body must %sbe None for method %s (unless '
412                    'allow_nonstandard_methods is true)' %
413                    ('not ' if body_expected else '', request.method))
414
415        if body_expected or body_present:
416            if request.method == "GET":
417                # Even with `allow_nonstandard_methods` we disallow
418                # GET with a body (because libcurl doesn't allow it
419                # unless we use CUSTOMREQUEST). While the spec doesn't
420                # forbid clients from sending a body, it arguably
421                # disallows the server from doing anything with them.
422                raise ValueError('Body must be None for GET request')
423            request_buffer = BytesIO(utf8(request.body or ''))
424
425            def ioctl(cmd):
426                if cmd == curl.IOCMD_RESTARTREAD:
427                    request_buffer.seek(0)
428            curl.setopt(pycurl.READFUNCTION, request_buffer.read)
429            curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
430            if request.method == "POST":
431                curl.setopt(pycurl.POSTFIELDSIZE, len(request.body or ''))
432            else:
433                curl.setopt(pycurl.UPLOAD, True)
434                curl.setopt(pycurl.INFILESIZE, len(request.body or ''))
435
436        if request.auth_username is not None:
437            if request.auth_mode is None or request.auth_mode == "basic":
438                curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
439            elif request.auth_mode == "digest":
440                curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
441            else:
442                raise ValueError("Unsupported auth_mode %s" % request.auth_mode)
443
444            userpwd = httputil.encode_username_password(request.auth_username,
445                                                        request.auth_password)
446            curl.setopt(pycurl.USERPWD, userpwd)
447            curl_log.debug("%s %s (username: %r)", request.method, request.url,
448                           request.auth_username)
449        else:
450            curl.unsetopt(pycurl.USERPWD)
451            curl_log.debug("%s %s", request.method, request.url)
452
453        if request.client_cert is not None:
454            curl.setopt(pycurl.SSLCERT, request.client_cert)
455
456        if request.client_key is not None:
457            curl.setopt(pycurl.SSLKEY, request.client_key)
458
459        if request.ssl_options is not None:
460            raise ValueError("ssl_options not supported in curl_httpclient")
461
462        if threading.activeCount() > 1:
463            # libcurl/pycurl is not thread-safe by default.  When multiple threads
464            # are used, signals should be disabled.  This has the side effect
465            # of disabling DNS timeouts in some environments (when libcurl is
466            # not linked against ares), so we don't do it when there is only one
467            # thread.  Applications that use many short-lived threads may need
468            # to set NOSIGNAL manually in a prepare_curl_callback since
469            # there may not be any other threads running at the time we call
470            # threading.activeCount.
471            curl.setopt(pycurl.NOSIGNAL, 1)
472        if request.prepare_curl_callback is not None:
473            request.prepare_curl_callback(curl)
474
475    def _curl_header_callback(self, headers, header_callback, header_line):
476        header_line = native_str(header_line.decode('latin1'))
477        if header_callback is not None:
478            self.io_loop.add_callback(header_callback, header_line)
479        # header_line as returned by curl includes the end-of-line characters.
480        # whitespace at the start should be preserved to allow multi-line headers
481        header_line = header_line.rstrip()
482        if header_line.startswith("HTTP/"):
483            headers.clear()
484            try:
485                (__, __, reason) = httputil.parse_response_start_line(header_line)
486                header_line = "X-Http-Reason: %s" % reason
487            except httputil.HTTPInputError:
488                return
489        if not header_line:
490            return
491        headers.parse_line(header_line)
492
493    def _curl_debug(self, debug_type, debug_msg):
494        debug_types = ('I', '<', '>', '<', '>')
495        if debug_type == 0:
496            debug_msg = native_str(debug_msg)
497            curl_log.debug('%s', debug_msg.strip())
498        elif debug_type in (1, 2):
499            debug_msg = native_str(debug_msg)
500            for line in debug_msg.splitlines():
501                curl_log.debug('%s %s', debug_types[debug_type], line)
502        elif debug_type == 4:
503            curl_log.debug('%s %r', debug_types[debug_type], debug_msg)
504
505
506class CurlError(HTTPError):
507    def __init__(self, errno, message):
508        HTTPError.__init__(self, 599, message)
509        self.errno = errno
510
511
512if __name__ == "__main__":
513    AsyncHTTPClient.configure(CurlAsyncHTTPClient)
514    main()
515