1# 2# Copyright 2009 Facebook 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16"""Non-blocking HTTP client implementation using pycurl.""" 17 18from __future__ import absolute_import, division, print_function 19 20import collections 21import functools 22import logging 23import pycurl # type: ignore 24import threading 25import time 26from io import BytesIO 27 28from tornado import httputil 29from tornado import ioloop 30from tornado import stack_context 31 32from tornado.escape import utf8, native_str 33from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main 34 35curl_log = logging.getLogger('tornado.curl_httpclient') 36 37 38class CurlAsyncHTTPClient(AsyncHTTPClient): 39 def initialize(self, max_clients=10, defaults=None): 40 super(CurlAsyncHTTPClient, self).initialize(defaults=defaults) 41 self._multi = pycurl.CurlMulti() 42 self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) 43 self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket) 44 self._curls = [self._curl_create() for i in range(max_clients)] 45 self._free_list = self._curls[:] 46 self._requests = collections.deque() 47 self._fds = {} 48 self._timeout = None 49 50 # libcurl has bugs that sometimes cause it to not report all 51 # relevant file descriptors and timeouts to TIMERFUNCTION/ 52 # SOCKETFUNCTION. Mitigate the effects of such bugs by 53 # forcing a periodic scan of all active requests. 54 self._force_timeout_callback = ioloop.PeriodicCallback( 55 self._handle_force_timeout, 1000) 56 self._force_timeout_callback.start() 57 58 # Work around a bug in libcurl 7.29.0: Some fields in the curl 59 # multi object are initialized lazily, and its destructor will 60 # segfault if it is destroyed without having been used. Add 61 # and remove a dummy handle to make sure everything is 62 # initialized. 63 dummy_curl_handle = pycurl.Curl() 64 self._multi.add_handle(dummy_curl_handle) 65 self._multi.remove_handle(dummy_curl_handle) 66 67 def close(self): 68 self._force_timeout_callback.stop() 69 if self._timeout is not None: 70 self.io_loop.remove_timeout(self._timeout) 71 for curl in self._curls: 72 curl.close() 73 self._multi.close() 74 super(CurlAsyncHTTPClient, self).close() 75 76 # Set below properties to None to reduce the reference count of current 77 # instance, because those properties hold some methods of current 78 # instance that will case circular reference. 79 self._force_timeout_callback = None 80 self._multi = None 81 82 def fetch_impl(self, request, callback): 83 self._requests.append((request, callback, self.io_loop.time())) 84 self._process_queue() 85 self._set_timeout(0) 86 87 def _handle_socket(self, event, fd, multi, data): 88 """Called by libcurl when it wants to change the file descriptors 89 it cares about. 90 """ 91 event_map = { 92 pycurl.POLL_NONE: ioloop.IOLoop.NONE, 93 pycurl.POLL_IN: ioloop.IOLoop.READ, 94 pycurl.POLL_OUT: ioloop.IOLoop.WRITE, 95 pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE 96 } 97 if event == pycurl.POLL_REMOVE: 98 if fd in self._fds: 99 self.io_loop.remove_handler(fd) 100 del self._fds[fd] 101 else: 102 ioloop_event = event_map[event] 103 # libcurl sometimes closes a socket and then opens a new 104 # one using the same FD without giving us a POLL_NONE in 105 # between. This is a problem with the epoll IOLoop, 106 # because the kernel can tell when a socket is closed and 107 # removes it from the epoll automatically, causing future 108 # update_handler calls to fail. Since we can't tell when 109 # this has happened, always use remove and re-add 110 # instead of update. 111 if fd in self._fds: 112 self.io_loop.remove_handler(fd) 113 self.io_loop.add_handler(fd, self._handle_events, 114 ioloop_event) 115 self._fds[fd] = ioloop_event 116 117 def _set_timeout(self, msecs): 118 """Called by libcurl to schedule a timeout.""" 119 if self._timeout is not None: 120 self.io_loop.remove_timeout(self._timeout) 121 self._timeout = self.io_loop.add_timeout( 122 self.io_loop.time() + msecs / 1000.0, self._handle_timeout) 123 124 def _handle_events(self, fd, events): 125 """Called by IOLoop when there is activity on one of our 126 file descriptors. 127 """ 128 action = 0 129 if events & ioloop.IOLoop.READ: 130 action |= pycurl.CSELECT_IN 131 if events & ioloop.IOLoop.WRITE: 132 action |= pycurl.CSELECT_OUT 133 while True: 134 try: 135 ret, num_handles = self._multi.socket_action(fd, action) 136 except pycurl.error as e: 137 ret = e.args[0] 138 if ret != pycurl.E_CALL_MULTI_PERFORM: 139 break 140 self._finish_pending_requests() 141 142 def _handle_timeout(self): 143 """Called by IOLoop when the requested timeout has passed.""" 144 with stack_context.NullContext(): 145 self._timeout = None 146 while True: 147 try: 148 ret, num_handles = self._multi.socket_action( 149 pycurl.SOCKET_TIMEOUT, 0) 150 except pycurl.error as e: 151 ret = e.args[0] 152 if ret != pycurl.E_CALL_MULTI_PERFORM: 153 break 154 self._finish_pending_requests() 155 156 # In theory, we shouldn't have to do this because curl will 157 # call _set_timeout whenever the timeout changes. However, 158 # sometimes after _handle_timeout we will need to reschedule 159 # immediately even though nothing has changed from curl's 160 # perspective. This is because when socket_action is 161 # called with SOCKET_TIMEOUT, libcurl decides internally which 162 # timeouts need to be processed by using a monotonic clock 163 # (where available) while tornado uses python's time.time() 164 # to decide when timeouts have occurred. When those clocks 165 # disagree on elapsed time (as they will whenever there is an 166 # NTP adjustment), tornado might call _handle_timeout before 167 # libcurl is ready. After each timeout, resync the scheduled 168 # timeout with libcurl's current state. 169 new_timeout = self._multi.timeout() 170 if new_timeout >= 0: 171 self._set_timeout(new_timeout) 172 173 def _handle_force_timeout(self): 174 """Called by IOLoop periodically to ask libcurl to process any 175 events it may have forgotten about. 176 """ 177 with stack_context.NullContext(): 178 while True: 179 try: 180 ret, num_handles = self._multi.socket_all() 181 except pycurl.error as e: 182 ret = e.args[0] 183 if ret != pycurl.E_CALL_MULTI_PERFORM: 184 break 185 self._finish_pending_requests() 186 187 def _finish_pending_requests(self): 188 """Process any requests that were completed by the last 189 call to multi.socket_action. 190 """ 191 while True: 192 num_q, ok_list, err_list = self._multi.info_read() 193 for curl in ok_list: 194 self._finish(curl) 195 for curl, errnum, errmsg in err_list: 196 self._finish(curl, errnum, errmsg) 197 if num_q == 0: 198 break 199 self._process_queue() 200 201 def _process_queue(self): 202 with stack_context.NullContext(): 203 while True: 204 started = 0 205 while self._free_list and self._requests: 206 started += 1 207 curl = self._free_list.pop() 208 (request, callback, queue_start_time) = self._requests.popleft() 209 curl.info = { 210 "headers": httputil.HTTPHeaders(), 211 "buffer": BytesIO(), 212 "request": request, 213 "callback": callback, 214 "queue_start_time": queue_start_time, 215 "curl_start_time": time.time(), 216 "curl_start_ioloop_time": self.io_loop.current().time(), 217 } 218 try: 219 self._curl_setup_request( 220 curl, request, curl.info["buffer"], 221 curl.info["headers"]) 222 except Exception as e: 223 # If there was an error in setup, pass it on 224 # to the callback. Note that allowing the 225 # error to escape here will appear to work 226 # most of the time since we are still in the 227 # caller's original stack frame, but when 228 # _process_queue() is called from 229 # _finish_pending_requests the exceptions have 230 # nowhere to go. 231 self._free_list.append(curl) 232 callback(HTTPResponse( 233 request=request, 234 code=599, 235 error=e)) 236 else: 237 self._multi.add_handle(curl) 238 239 if not started: 240 break 241 242 def _finish(self, curl, curl_error=None, curl_message=None): 243 info = curl.info 244 curl.info = None 245 self._multi.remove_handle(curl) 246 self._free_list.append(curl) 247 buffer = info["buffer"] 248 if curl_error: 249 error = CurlError(curl_error, curl_message) 250 code = error.code 251 effective_url = None 252 buffer.close() 253 buffer = None 254 else: 255 error = None 256 code = curl.getinfo(pycurl.HTTP_CODE) 257 effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) 258 buffer.seek(0) 259 # the various curl timings are documented at 260 # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html 261 time_info = dict( 262 queue=info["curl_start_ioloop_time"] - info["queue_start_time"], 263 namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME), 264 connect=curl.getinfo(pycurl.CONNECT_TIME), 265 appconnect=curl.getinfo(pycurl.APPCONNECT_TIME), 266 pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME), 267 starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME), 268 total=curl.getinfo(pycurl.TOTAL_TIME), 269 redirect=curl.getinfo(pycurl.REDIRECT_TIME), 270 ) 271 try: 272 info["callback"](HTTPResponse( 273 request=info["request"], code=code, headers=info["headers"], 274 buffer=buffer, effective_url=effective_url, error=error, 275 reason=info['headers'].get("X-Http-Reason", None), 276 request_time=self.io_loop.time() - info["curl_start_ioloop_time"], 277 start_time=info["curl_start_time"], 278 time_info=time_info)) 279 except Exception: 280 self.handle_callback_exception(info["callback"]) 281 282 def handle_callback_exception(self, callback): 283 self.io_loop.handle_callback_exception(callback) 284 285 def _curl_create(self): 286 curl = pycurl.Curl() 287 if curl_log.isEnabledFor(logging.DEBUG): 288 curl.setopt(pycurl.VERBOSE, 1) 289 curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug) 290 if hasattr(pycurl, 'PROTOCOLS'): # PROTOCOLS first appeared in pycurl 7.19.5 (2014-07-12) 291 curl.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS) 292 curl.setopt(pycurl.REDIR_PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS) 293 return curl 294 295 def _curl_setup_request(self, curl, request, buffer, headers): 296 curl.setopt(pycurl.URL, native_str(request.url)) 297 298 # libcurl's magic "Expect: 100-continue" behavior causes delays 299 # with servers that don't support it (which include, among others, 300 # Google's OpenID endpoint). Additionally, this behavior has 301 # a bug in conjunction with the curl_multi_socket_action API 302 # (https://sourceforge.net/tracker/?func=detail&atid=100976&aid=3039744&group_id=976), 303 # which increases the delays. It's more trouble than it's worth, 304 # so just turn off the feature (yes, setting Expect: to an empty 305 # value is the official way to disable this) 306 if "Expect" not in request.headers: 307 request.headers["Expect"] = "" 308 309 # libcurl adds Pragma: no-cache by default; disable that too 310 if "Pragma" not in request.headers: 311 request.headers["Pragma"] = "" 312 313 curl.setopt(pycurl.HTTPHEADER, 314 ["%s: %s" % (native_str(k), native_str(v)) 315 for k, v in request.headers.get_all()]) 316 317 curl.setopt(pycurl.HEADERFUNCTION, 318 functools.partial(self._curl_header_callback, 319 headers, request.header_callback)) 320 if request.streaming_callback: 321 def write_function(chunk): 322 self.io_loop.add_callback(request.streaming_callback, chunk) 323 else: 324 write_function = buffer.write 325 curl.setopt(pycurl.WRITEFUNCTION, write_function) 326 curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects) 327 curl.setopt(pycurl.MAXREDIRS, request.max_redirects) 328 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout)) 329 curl.setopt(pycurl.TIMEOUT_MS, int(1000 * request.request_timeout)) 330 if request.user_agent: 331 curl.setopt(pycurl.USERAGENT, native_str(request.user_agent)) 332 else: 333 curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)") 334 if request.network_interface: 335 curl.setopt(pycurl.INTERFACE, request.network_interface) 336 if request.decompress_response: 337 curl.setopt(pycurl.ENCODING, "gzip,deflate") 338 else: 339 curl.setopt(pycurl.ENCODING, "none") 340 if request.proxy_host and request.proxy_port: 341 curl.setopt(pycurl.PROXY, request.proxy_host) 342 curl.setopt(pycurl.PROXYPORT, request.proxy_port) 343 if request.proxy_username: 344 credentials = httputil.encode_username_password(request.proxy_username, 345 request.proxy_password) 346 curl.setopt(pycurl.PROXYUSERPWD, credentials) 347 348 if (request.proxy_auth_mode is None or 349 request.proxy_auth_mode == "basic"): 350 curl.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_BASIC) 351 elif request.proxy_auth_mode == "digest": 352 curl.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_DIGEST) 353 else: 354 raise ValueError( 355 "Unsupported proxy_auth_mode %s" % request.proxy_auth_mode) 356 else: 357 curl.setopt(pycurl.PROXY, '') 358 curl.unsetopt(pycurl.PROXYUSERPWD) 359 if request.validate_cert: 360 curl.setopt(pycurl.SSL_VERIFYPEER, 1) 361 curl.setopt(pycurl.SSL_VERIFYHOST, 2) 362 else: 363 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 364 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 365 if request.ca_certs is not None: 366 curl.setopt(pycurl.CAINFO, request.ca_certs) 367 else: 368 # There is no way to restore pycurl.CAINFO to its default value 369 # (Using unsetopt makes it reject all certificates). 370 # I don't see any way to read the default value from python so it 371 # can be restored later. We'll have to just leave CAINFO untouched 372 # if no ca_certs file was specified, and require that if any 373 # request uses a custom ca_certs file, they all must. 374 pass 375 376 if request.allow_ipv6 is False: 377 # Curl behaves reasonably when DNS resolution gives an ipv6 address 378 # that we can't reach, so allow ipv6 unless the user asks to disable. 379 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) 380 else: 381 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) 382 383 # Set the request method through curl's irritating interface which makes 384 # up names for almost every single method 385 curl_options = { 386 "GET": pycurl.HTTPGET, 387 "POST": pycurl.POST, 388 "PUT": pycurl.UPLOAD, 389 "HEAD": pycurl.NOBODY, 390 } 391 custom_methods = set(["DELETE", "OPTIONS", "PATCH"]) 392 for o in curl_options.values(): 393 curl.setopt(o, False) 394 if request.method in curl_options: 395 curl.unsetopt(pycurl.CUSTOMREQUEST) 396 curl.setopt(curl_options[request.method], True) 397 elif request.allow_nonstandard_methods or request.method in custom_methods: 398 curl.setopt(pycurl.CUSTOMREQUEST, request.method) 399 else: 400 raise KeyError('unknown method ' + request.method) 401 402 body_expected = request.method in ("POST", "PATCH", "PUT") 403 body_present = request.body is not None 404 if not request.allow_nonstandard_methods: 405 # Some HTTP methods nearly always have bodies while others 406 # almost never do. Fail in this case unless the user has 407 # opted out of sanity checks with allow_nonstandard_methods. 408 if ((body_expected and not body_present) or 409 (body_present and not body_expected)): 410 raise ValueError( 411 'Body must %sbe None for method %s (unless ' 412 'allow_nonstandard_methods is true)' % 413 ('not ' if body_expected else '', request.method)) 414 415 if body_expected or body_present: 416 if request.method == "GET": 417 # Even with `allow_nonstandard_methods` we disallow 418 # GET with a body (because libcurl doesn't allow it 419 # unless we use CUSTOMREQUEST). While the spec doesn't 420 # forbid clients from sending a body, it arguably 421 # disallows the server from doing anything with them. 422 raise ValueError('Body must be None for GET request') 423 request_buffer = BytesIO(utf8(request.body or '')) 424 425 def ioctl(cmd): 426 if cmd == curl.IOCMD_RESTARTREAD: 427 request_buffer.seek(0) 428 curl.setopt(pycurl.READFUNCTION, request_buffer.read) 429 curl.setopt(pycurl.IOCTLFUNCTION, ioctl) 430 if request.method == "POST": 431 curl.setopt(pycurl.POSTFIELDSIZE, len(request.body or '')) 432 else: 433 curl.setopt(pycurl.UPLOAD, True) 434 curl.setopt(pycurl.INFILESIZE, len(request.body or '')) 435 436 if request.auth_username is not None: 437 if request.auth_mode is None or request.auth_mode == "basic": 438 curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) 439 elif request.auth_mode == "digest": 440 curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST) 441 else: 442 raise ValueError("Unsupported auth_mode %s" % request.auth_mode) 443 444 userpwd = httputil.encode_username_password(request.auth_username, 445 request.auth_password) 446 curl.setopt(pycurl.USERPWD, userpwd) 447 curl_log.debug("%s %s (username: %r)", request.method, request.url, 448 request.auth_username) 449 else: 450 curl.unsetopt(pycurl.USERPWD) 451 curl_log.debug("%s %s", request.method, request.url) 452 453 if request.client_cert is not None: 454 curl.setopt(pycurl.SSLCERT, request.client_cert) 455 456 if request.client_key is not None: 457 curl.setopt(pycurl.SSLKEY, request.client_key) 458 459 if request.ssl_options is not None: 460 raise ValueError("ssl_options not supported in curl_httpclient") 461 462 if threading.activeCount() > 1: 463 # libcurl/pycurl is not thread-safe by default. When multiple threads 464 # are used, signals should be disabled. This has the side effect 465 # of disabling DNS timeouts in some environments (when libcurl is 466 # not linked against ares), so we don't do it when there is only one 467 # thread. Applications that use many short-lived threads may need 468 # to set NOSIGNAL manually in a prepare_curl_callback since 469 # there may not be any other threads running at the time we call 470 # threading.activeCount. 471 curl.setopt(pycurl.NOSIGNAL, 1) 472 if request.prepare_curl_callback is not None: 473 request.prepare_curl_callback(curl) 474 475 def _curl_header_callback(self, headers, header_callback, header_line): 476 header_line = native_str(header_line.decode('latin1')) 477 if header_callback is not None: 478 self.io_loop.add_callback(header_callback, header_line) 479 # header_line as returned by curl includes the end-of-line characters. 480 # whitespace at the start should be preserved to allow multi-line headers 481 header_line = header_line.rstrip() 482 if header_line.startswith("HTTP/"): 483 headers.clear() 484 try: 485 (__, __, reason) = httputil.parse_response_start_line(header_line) 486 header_line = "X-Http-Reason: %s" % reason 487 except httputil.HTTPInputError: 488 return 489 if not header_line: 490 return 491 headers.parse_line(header_line) 492 493 def _curl_debug(self, debug_type, debug_msg): 494 debug_types = ('I', '<', '>', '<', '>') 495 if debug_type == 0: 496 debug_msg = native_str(debug_msg) 497 curl_log.debug('%s', debug_msg.strip()) 498 elif debug_type in (1, 2): 499 debug_msg = native_str(debug_msg) 500 for line in debug_msg.splitlines(): 501 curl_log.debug('%s %s', debug_types[debug_type], line) 502 elif debug_type == 4: 503 curl_log.debug('%s %r', debug_types[debug_type], debug_msg) 504 505 506class CurlError(HTTPError): 507 def __init__(self, errno, message): 508 HTTPError.__init__(self, 599, message) 509 self.errno = errno 510 511 512if __name__ == "__main__": 513 AsyncHTTPClient.configure(CurlAsyncHTTPClient) 514 main() 515