1from __future__ import absolute_import, division, print_function 2 3from tornado.concurrent import Future 4from tornado import gen 5from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring # noqa: E501 6from tornado.httpclient import HTTPClientError 7from tornado.httputil import format_timestamp 8from tornado.ioloop import IOLoop 9from tornado.iostream import IOStream 10from tornado import locale 11from tornado.locks import Event 12from tornado.log import app_log, gen_log 13from tornado.simple_httpclient import SimpleAsyncHTTPClient 14from tornado.template import DictLoader 15from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test 16from tornado.test.util import unittest, skipBefore35, exec_test, ignore_deprecation 17from tornado.util import ObjectDict, unicode_type, timedelta_to_seconds, PY3 18from tornado.web import ( 19 Application, RequestHandler, StaticFileHandler, RedirectHandler as WebRedirectHandler, 20 HTTPError, MissingArgumentError, ErrorHandler, authenticated, asynchronous, url, 21 _create_signature_v1, create_signed_value, decode_signed_value, get_signature_key_version, 22 UIModule, Finish, stream_request_body, removeslash, addslash, GZipContentEncoding, 23) 24 25import binascii 26import contextlib 27import copy 28import datetime 29import email.utils 30import gzip 31from io import BytesIO 32import itertools 33import logging 34import os 35import re 36import socket 37 38if PY3: 39 import urllib.parse as urllib_parse # py3 40else: 41 import urllib as urllib_parse # py2 42 43wsgi_safe_tests = [] 44 45 46def relpath(*a): 47 return os.path.join(os.path.dirname(__file__), *a) 48 49 50def wsgi_safe(cls): 51 wsgi_safe_tests.append(cls) 52 return cls 53 54 55class WebTestCase(AsyncHTTPTestCase): 56 """Base class for web tests that also supports WSGI mode. 57 58 Override get_handlers and get_app_kwargs instead of get_app. 59 Append to wsgi_safe to have it run in wsgi_test as well. 60 """ 61 def get_app(self): 62 self.app = Application(self.get_handlers(), **self.get_app_kwargs()) 63 return self.app 64 65 def get_handlers(self): 66 raise NotImplementedError() 67 68 def get_app_kwargs(self): 69 return {} 70 71 72class SimpleHandlerTestCase(WebTestCase): 73 """Simplified base class for tests that work with a single handler class. 74 75 To use, define a nested class named ``Handler``. 76 """ 77 def get_handlers(self): 78 return [('/', self.Handler)] 79 80 81class HelloHandler(RequestHandler): 82 def get(self): 83 self.write('hello') 84 85 86class CookieTestRequestHandler(RequestHandler): 87 # stub out enough methods to make the secure_cookie functions work 88 def __init__(self, cookie_secret='0123456789', key_version=None): 89 # don't call super.__init__ 90 self._cookies = {} 91 if key_version is None: 92 self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret)) 93 else: 94 self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret, 95 key_version=key_version)) 96 97 def get_cookie(self, name): 98 return self._cookies.get(name) 99 100 def set_cookie(self, name, value, expires_days=None): 101 self._cookies[name] = value 102 103 104# See SignedValueTest below for more. 105class SecureCookieV1Test(unittest.TestCase): 106 def test_round_trip(self): 107 handler = CookieTestRequestHandler() 108 handler.set_secure_cookie('foo', b'bar', version=1) 109 self.assertEqual(handler.get_secure_cookie('foo', min_version=1), 110 b'bar') 111 112 def test_cookie_tampering_future_timestamp(self): 113 handler = CookieTestRequestHandler() 114 # this string base64-encodes to '12345678' 115 handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'), 116 version=1) 117 cookie = handler._cookies['foo'] 118 match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie) 119 self.assertTrue(match) 120 timestamp = match.group(1) 121 sig = match.group(2) 122 self.assertEqual( 123 _create_signature_v1(handler.application.settings["cookie_secret"], 124 'foo', '12345678', timestamp), 125 sig) 126 # shifting digits from payload to timestamp doesn't alter signature 127 # (this is not desirable behavior, just confirming that that's how it 128 # works) 129 self.assertEqual( 130 _create_signature_v1(handler.application.settings["cookie_secret"], 131 'foo', '1234', b'5678' + timestamp), 132 sig) 133 # tamper with the cookie 134 handler._cookies['foo'] = utf8('1234|5678%s|%s' % ( 135 to_basestring(timestamp), to_basestring(sig))) 136 # it gets rejected 137 with ExpectLog(gen_log, "Cookie timestamp in future"): 138 self.assertTrue( 139 handler.get_secure_cookie('foo', min_version=1) is None) 140 141 def test_arbitrary_bytes(self): 142 # Secure cookies accept arbitrary data (which is base64 encoded). 143 # Note that normal cookies accept only a subset of ascii. 144 handler = CookieTestRequestHandler() 145 handler.set_secure_cookie('foo', b'\xe9', version=1) 146 self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9') 147 148 149# See SignedValueTest below for more. 150class SecureCookieV2Test(unittest.TestCase): 151 KEY_VERSIONS = { 152 0: 'ajklasdf0ojaisdf', 153 1: 'aslkjasaolwkjsdf' 154 } 155 156 def test_round_trip(self): 157 handler = CookieTestRequestHandler() 158 handler.set_secure_cookie('foo', b'bar', version=2) 159 self.assertEqual(handler.get_secure_cookie('foo', min_version=2), b'bar') 160 161 def test_key_version_roundtrip(self): 162 handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS, 163 key_version=0) 164 handler.set_secure_cookie('foo', b'bar') 165 self.assertEqual(handler.get_secure_cookie('foo'), b'bar') 166 167 def test_key_version_roundtrip_differing_version(self): 168 handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS, 169 key_version=1) 170 handler.set_secure_cookie('foo', b'bar') 171 self.assertEqual(handler.get_secure_cookie('foo'), b'bar') 172 173 def test_key_version_increment_version(self): 174 handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS, 175 key_version=0) 176 handler.set_secure_cookie('foo', b'bar') 177 new_handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS, 178 key_version=1) 179 new_handler._cookies = handler._cookies 180 self.assertEqual(new_handler.get_secure_cookie('foo'), b'bar') 181 182 def test_key_version_invalidate_version(self): 183 handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS, 184 key_version=0) 185 handler.set_secure_cookie('foo', b'bar') 186 new_key_versions = self.KEY_VERSIONS.copy() 187 new_key_versions.pop(0) 188 new_handler = CookieTestRequestHandler(cookie_secret=new_key_versions, 189 key_version=1) 190 new_handler._cookies = handler._cookies 191 self.assertEqual(new_handler.get_secure_cookie('foo'), None) 192 193 194class FinalReturnTest(WebTestCase): 195 def get_handlers(self): 196 test = self 197 198 class FinishHandler(RequestHandler): 199 @gen.coroutine 200 def get(self): 201 test.final_return = self.finish() 202 yield test.final_return 203 204 class RenderHandler(RequestHandler): 205 def create_template_loader(self, path): 206 return DictLoader({'foo.html': 'hi'}) 207 208 @gen.coroutine 209 def get(self): 210 test.final_return = self.render('foo.html') 211 212 return [("/finish", FinishHandler), 213 ("/render", RenderHandler)] 214 215 def get_app_kwargs(self): 216 return dict(template_path='FinalReturnTest') 217 218 def test_finish_method_return_future(self): 219 response = self.fetch(self.get_url('/finish')) 220 self.assertEqual(response.code, 200) 221 self.assertIsInstance(self.final_return, Future) 222 self.assertTrue(self.final_return.done()) 223 224 def test_render_method_return_future(self): 225 response = self.fetch(self.get_url('/render')) 226 self.assertEqual(response.code, 200) 227 self.assertIsInstance(self.final_return, Future) 228 229 230class CookieTest(WebTestCase): 231 def get_handlers(self): 232 class SetCookieHandler(RequestHandler): 233 def get(self): 234 # Try setting cookies with different argument types 235 # to ensure that everything gets encoded correctly 236 self.set_cookie("str", "asdf") 237 self.set_cookie("unicode", u"qwer") 238 self.set_cookie("bytes", b"zxcv") 239 240 class GetCookieHandler(RequestHandler): 241 def get(self): 242 self.write(self.get_cookie("foo", "default")) 243 244 class SetCookieDomainHandler(RequestHandler): 245 def get(self): 246 # unicode domain and path arguments shouldn't break things 247 # either (see bug #285) 248 self.set_cookie("unicode_args", "blah", domain=u"foo.com", 249 path=u"/foo") 250 251 class SetCookieSpecialCharHandler(RequestHandler): 252 def get(self): 253 self.set_cookie("equals", "a=b") 254 self.set_cookie("semicolon", "a;b") 255 self.set_cookie("quote", 'a"b') 256 257 class SetCookieOverwriteHandler(RequestHandler): 258 def get(self): 259 self.set_cookie("a", "b", domain="example.com") 260 self.set_cookie("c", "d", domain="example.com") 261 # A second call with the same name clobbers the first. 262 # Attributes from the first call are not carried over. 263 self.set_cookie("a", "e") 264 265 class SetCookieMaxAgeHandler(RequestHandler): 266 def get(self): 267 self.set_cookie("foo", "bar", max_age=10) 268 269 class SetCookieExpiresDaysHandler(RequestHandler): 270 def get(self): 271 self.set_cookie("foo", "bar", expires_days=10) 272 273 class SetCookieFalsyFlags(RequestHandler): 274 def get(self): 275 self.set_cookie("a", "1", secure=True) 276 self.set_cookie("b", "1", secure=False) 277 self.set_cookie("c", "1", httponly=True) 278 self.set_cookie("d", "1", httponly=False) 279 280 return [("/set", SetCookieHandler), 281 ("/get", GetCookieHandler), 282 ("/set_domain", SetCookieDomainHandler), 283 ("/special_char", SetCookieSpecialCharHandler), 284 ("/set_overwrite", SetCookieOverwriteHandler), 285 ("/set_max_age", SetCookieMaxAgeHandler), 286 ("/set_expires_days", SetCookieExpiresDaysHandler), 287 ("/set_falsy_flags", SetCookieFalsyFlags) 288 ] 289 290 def test_set_cookie(self): 291 response = self.fetch("/set") 292 self.assertEqual(sorted(response.headers.get_list("Set-Cookie")), 293 ["bytes=zxcv; Path=/", 294 "str=asdf; Path=/", 295 "unicode=qwer; Path=/", 296 ]) 297 298 def test_get_cookie(self): 299 response = self.fetch("/get", headers={"Cookie": "foo=bar"}) 300 self.assertEqual(response.body, b"bar") 301 302 response = self.fetch("/get", headers={"Cookie": 'foo="bar"'}) 303 self.assertEqual(response.body, b"bar") 304 305 response = self.fetch("/get", headers={"Cookie": "/=exception;"}) 306 self.assertEqual(response.body, b"default") 307 308 def test_set_cookie_domain(self): 309 response = self.fetch("/set_domain") 310 self.assertEqual(response.headers.get_list("Set-Cookie"), 311 ["unicode_args=blah; Domain=foo.com; Path=/foo"]) 312 313 def test_cookie_special_char(self): 314 response = self.fetch("/special_char") 315 headers = sorted(response.headers.get_list("Set-Cookie")) 316 self.assertEqual(len(headers), 3) 317 self.assertEqual(headers[0], 'equals="a=b"; Path=/') 318 self.assertEqual(headers[1], 'quote="a\\"b"; Path=/') 319 # python 2.7 octal-escapes the semicolon; older versions leave it alone 320 self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/', 321 'semicolon="a\\073b"; Path=/'), 322 headers[2]) 323 324 data = [('foo=a=b', 'a=b'), 325 ('foo="a=b"', 'a=b'), 326 ('foo="a;b"', '"a'), # even quoted, ";" is a delimiter 327 ('foo=a\\073b', 'a\\073b'), # escapes only decoded in quotes 328 ('foo="a\\073b"', 'a;b'), 329 ('foo="a\\"b"', 'a"b'), 330 ] 331 for header, expected in data: 332 logging.debug("trying %r", header) 333 response = self.fetch("/get", headers={"Cookie": header}) 334 self.assertEqual(response.body, utf8(expected)) 335 336 def test_set_cookie_overwrite(self): 337 response = self.fetch("/set_overwrite") 338 headers = response.headers.get_list("Set-Cookie") 339 self.assertEqual(sorted(headers), 340 ["a=e; Path=/", "c=d; Domain=example.com; Path=/"]) 341 342 def test_set_cookie_max_age(self): 343 response = self.fetch("/set_max_age") 344 headers = response.headers.get_list("Set-Cookie") 345 self.assertEqual(sorted(headers), 346 ["foo=bar; Max-Age=10; Path=/"]) 347 348 def test_set_cookie_expires_days(self): 349 response = self.fetch("/set_expires_days") 350 header = response.headers.get("Set-Cookie") 351 match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header) 352 self.assertIsNotNone(match) 353 354 expires = datetime.datetime.utcnow() + datetime.timedelta(days=10) 355 header_expires = datetime.datetime( 356 *email.utils.parsedate(match.groupdict()["expires"])[:6]) 357 self.assertTrue(abs(timedelta_to_seconds(expires - header_expires)) < 10) 358 359 def test_set_cookie_false_flags(self): 360 response = self.fetch("/set_falsy_flags") 361 headers = sorted(response.headers.get_list("Set-Cookie")) 362 # The secure and httponly headers are capitalized in py35 and 363 # lowercase in older versions. 364 self.assertEqual(headers[0].lower(), 'a=1; path=/; secure') 365 self.assertEqual(headers[1].lower(), 'b=1; path=/') 366 self.assertEqual(headers[2].lower(), 'c=1; httponly; path=/') 367 self.assertEqual(headers[3].lower(), 'd=1; path=/') 368 369 370class AuthRedirectRequestHandler(RequestHandler): 371 def initialize(self, login_url): 372 self.login_url = login_url 373 374 def get_login_url(self): 375 return self.login_url 376 377 @authenticated 378 def get(self): 379 # we'll never actually get here because the test doesn't follow redirects 380 self.send_error(500) 381 382 383class AuthRedirectTest(WebTestCase): 384 def get_handlers(self): 385 return [('/relative', AuthRedirectRequestHandler, 386 dict(login_url='/login')), 387 ('/absolute', AuthRedirectRequestHandler, 388 dict(login_url='http://example.com/login'))] 389 390 def test_relative_auth_redirect(self): 391 response = self.fetch(self.get_url('/relative'), 392 follow_redirects=False) 393 self.assertEqual(response.code, 302) 394 self.assertEqual(response.headers['Location'], '/login?next=%2Frelative') 395 396 def test_absolute_auth_redirect(self): 397 response = self.fetch(self.get_url('/absolute'), 398 follow_redirects=False) 399 self.assertEqual(response.code, 302) 400 self.assertTrue(re.match( 401 'http://example.com/login\?next=http%3A%2F%2F127.0.0.1%3A[0-9]+%2Fabsolute', 402 response.headers['Location']), response.headers['Location']) 403 404 405class ConnectionCloseHandler(RequestHandler): 406 def initialize(self, test): 407 self.test = test 408 409 @gen.coroutine 410 def get(self): 411 self.test.on_handler_waiting() 412 never_finish = Event() 413 yield never_finish.wait() 414 415 def on_connection_close(self): 416 self.test.on_connection_close() 417 418 419class ConnectionCloseTest(WebTestCase): 420 def get_handlers(self): 421 return [('/', ConnectionCloseHandler, dict(test=self))] 422 423 def test_connection_close(self): 424 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 425 s.connect(("127.0.0.1", self.get_http_port())) 426 self.stream = IOStream(s) 427 self.stream.write(b"GET / HTTP/1.0\r\n\r\n") 428 self.wait() 429 430 def on_handler_waiting(self): 431 logging.debug('handler waiting') 432 self.stream.close() 433 434 def on_connection_close(self): 435 logging.debug('connection closed') 436 self.stop() 437 438 439class EchoHandler(RequestHandler): 440 def get(self, *path_args): 441 # Type checks: web.py interfaces convert argument values to 442 # unicode strings (by default, but see also decode_argument). 443 # In httpserver.py (i.e. self.request.arguments), they're left 444 # as bytes. Keys are always native strings. 445 for key in self.request.arguments: 446 if type(key) != str: 447 raise Exception("incorrect type for key: %r" % type(key)) 448 for value in self.request.arguments[key]: 449 if type(value) != bytes: 450 raise Exception("incorrect type for value: %r" % 451 type(value)) 452 for value in self.get_arguments(key): 453 if type(value) != unicode_type: 454 raise Exception("incorrect type for value: %r" % 455 type(value)) 456 for arg in path_args: 457 if type(arg) != unicode_type: 458 raise Exception("incorrect type for path arg: %r" % type(arg)) 459 self.write(dict(path=self.request.path, 460 path_args=path_args, 461 args=recursive_unicode(self.request.arguments))) 462 463 464class RequestEncodingTest(WebTestCase): 465 def get_handlers(self): 466 return [("/group/(.*)", EchoHandler), 467 ("/slashes/([^/]*)/([^/]*)", EchoHandler), 468 ] 469 470 def fetch_json(self, path): 471 return json_decode(self.fetch(path).body) 472 473 def test_group_question_mark(self): 474 # Ensure that url-encoded question marks are handled properly 475 self.assertEqual(self.fetch_json('/group/%3F'), 476 dict(path='/group/%3F', path_args=['?'], args={})) 477 self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'), 478 dict(path='/group/%3F', path_args=['?'], args={'?': ['?']})) 479 480 def test_group_encoding(self): 481 # Path components and query arguments should be decoded the same way 482 self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'), 483 {u"path": u"/group/%C3%A9", 484 u"path_args": [u"\u00e9"], 485 u"args": {u"arg": [u"\u00e9"]}}) 486 487 def test_slashes(self): 488 # Slashes may be escaped to appear as a single "directory" in the path, 489 # but they are then unescaped when passed to the get() method. 490 self.assertEqual(self.fetch_json('/slashes/foo/bar'), 491 dict(path="/slashes/foo/bar", 492 path_args=["foo", "bar"], 493 args={})) 494 self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'), 495 dict(path="/slashes/a%2Fb/c%2Fd", 496 path_args=["a/b", "c/d"], 497 args={})) 498 499 def test_error(self): 500 # Percent signs (encoded as %25) should not mess up printf-style 501 # messages in logs 502 with ExpectLog(gen_log, ".*Invalid unicode"): 503 self.fetch("/group/?arg=%25%e9") 504 505 506class TypeCheckHandler(RequestHandler): 507 def prepare(self): 508 self.errors = {} 509 510 self.check_type('status', self.get_status(), int) 511 512 # get_argument is an exception from the general rule of using 513 # type str for non-body data mainly for historical reasons. 514 self.check_type('argument', self.get_argument('foo'), unicode_type) 515 self.check_type('cookie_key', list(self.cookies.keys())[0], str) 516 self.check_type('cookie_value', list(self.cookies.values())[0].value, str) 517 518 # Secure cookies return bytes because they can contain arbitrary 519 # data, but regular cookies are native strings. 520 if list(self.cookies.keys()) != ['asdf']: 521 raise Exception("unexpected values for cookie keys: %r" % 522 self.cookies.keys()) 523 self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes) 524 self.check_type('get_cookie', self.get_cookie('asdf'), str) 525 526 self.check_type('xsrf_token', self.xsrf_token, bytes) 527 self.check_type('xsrf_form_html', self.xsrf_form_html(), str) 528 529 self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str) 530 531 self.check_type('request_summary', self._request_summary(), str) 532 533 def get(self, path_component): 534 # path_component uses type unicode instead of str for consistency 535 # with get_argument() 536 self.check_type('path_component', path_component, unicode_type) 537 self.write(self.errors) 538 539 def post(self, path_component): 540 self.check_type('path_component', path_component, unicode_type) 541 self.write(self.errors) 542 543 def check_type(self, name, obj, expected_type): 544 actual_type = type(obj) 545 if expected_type != actual_type: 546 self.errors[name] = "expected %s, got %s" % (expected_type, 547 actual_type) 548 549 550class DecodeArgHandler(RequestHandler): 551 def decode_argument(self, value, name=None): 552 if type(value) != bytes: 553 raise Exception("unexpected type for value: %r" % type(value)) 554 # use self.request.arguments directly to avoid recursion 555 if 'encoding' in self.request.arguments: 556 return value.decode(to_unicode(self.request.arguments['encoding'][0])) 557 else: 558 return value 559 560 def get(self, arg): 561 def describe(s): 562 if type(s) == bytes: 563 return ["bytes", native_str(binascii.b2a_hex(s))] 564 elif type(s) == unicode_type: 565 return ["unicode", s] 566 raise Exception("unknown type") 567 self.write({'path': describe(arg), 568 'query': describe(self.get_argument("foo")), 569 }) 570 571 572class LinkifyHandler(RequestHandler): 573 def get(self): 574 self.render("linkify.html", message="http://example.com") 575 576 577class UIModuleResourceHandler(RequestHandler): 578 def get(self): 579 self.render("page.html", entries=[1, 2]) 580 581 582class OptionalPathHandler(RequestHandler): 583 def get(self, path): 584 self.write({"path": path}) 585 586 587class FlowControlHandler(RequestHandler): 588 # These writes are too small to demonstrate real flow control, 589 # but at least it shows that the callbacks get run. 590 with ignore_deprecation(): 591 @asynchronous 592 def get(self): 593 self.write("1") 594 with ignore_deprecation(): 595 self.flush(callback=self.step2) 596 597 def step2(self): 598 self.write("2") 599 with ignore_deprecation(): 600 self.flush(callback=self.step3) 601 602 def step3(self): 603 self.write("3") 604 self.finish() 605 606 607class MultiHeaderHandler(RequestHandler): 608 def get(self): 609 self.set_header("x-overwrite", "1") 610 self.set_header("X-Overwrite", 2) 611 self.add_header("x-multi", 3) 612 self.add_header("X-Multi", "4") 613 614 615class RedirectHandler(RequestHandler): 616 def get(self): 617 if self.get_argument('permanent', None) is not None: 618 self.redirect('/', permanent=int(self.get_argument('permanent'))) 619 elif self.get_argument('status', None) is not None: 620 self.redirect('/', status=int(self.get_argument('status'))) 621 else: 622 raise Exception("didn't get permanent or status arguments") 623 624 625class EmptyFlushCallbackHandler(RequestHandler): 626 @gen.coroutine 627 def get(self): 628 # Ensure that the flush callback is run whether or not there 629 # was any output. The gen.Task and direct yield forms are 630 # equivalent. 631 yield self.flush() # "empty" flush, but writes headers 632 yield self.flush() # empty flush 633 self.write("o") 634 yield self.flush() # flushes the "o" 635 yield self.flush() # empty flush 636 self.finish("k") 637 638 639class HeaderInjectionHandler(RequestHandler): 640 def get(self): 641 try: 642 self.set_header("X-Foo", "foo\r\nX-Bar: baz") 643 raise Exception("Didn't get expected exception") 644 except ValueError as e: 645 if "Unsafe header value" in str(e): 646 self.finish(b"ok") 647 else: 648 raise 649 650 651class GetArgumentHandler(RequestHandler): 652 def prepare(self): 653 if self.get_argument('source', None) == 'query': 654 method = self.get_query_argument 655 elif self.get_argument('source', None) == 'body': 656 method = self.get_body_argument 657 else: 658 method = self.get_argument 659 self.finish(method("foo", "default")) 660 661 662class GetArgumentsHandler(RequestHandler): 663 def prepare(self): 664 self.finish(dict(default=self.get_arguments("foo"), 665 query=self.get_query_arguments("foo"), 666 body=self.get_body_arguments("foo"))) 667 668 669# This test is shared with wsgi_test.py 670@wsgi_safe 671class WSGISafeWebTest(WebTestCase): 672 COOKIE_SECRET = "WebTest.COOKIE_SECRET" 673 674 def get_app_kwargs(self): 675 loader = DictLoader({ 676 "linkify.html": "{% module linkify(message) %}", 677 "page.html": """\ 678<html><head></head><body> 679{% for e in entries %} 680{% module Template("entry.html", entry=e) %} 681{% end %} 682</body></html>""", 683 "entry.html": """\ 684{{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", 685 embedded_javascript="js_embed()", 686 css_files=["/base.css", "/foo.css"], 687 javascript_files="/common.js", 688 html_head="<meta>", 689 html_body='<script src="/analytics.js"/>') }} 690<div class="entry">...</div>""", 691 }) 692 return dict(template_loader=loader, 693 autoescape="xhtml_escape", 694 cookie_secret=self.COOKIE_SECRET) 695 696 def tearDown(self): 697 super(WSGISafeWebTest, self).tearDown() 698 RequestHandler._template_loaders.clear() 699 700 def get_handlers(self): 701 urls = [ 702 url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'), 703 url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'), 704 url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler), 705 url("/linkify", LinkifyHandler), 706 url("/uimodule_resources", UIModuleResourceHandler), 707 url("/optional_path/(.+)?", OptionalPathHandler), 708 url("/multi_header", MultiHeaderHandler), 709 url("/redirect", RedirectHandler), 710 url("/web_redirect_permanent", WebRedirectHandler, {"url": "/web_redirect_newpath"}), 711 url("/web_redirect", WebRedirectHandler, 712 {"url": "/web_redirect_newpath", "permanent": False}), 713 url("//web_redirect_double_slash", WebRedirectHandler, 714 {"url": '/web_redirect_newpath'}), 715 url("/header_injection", HeaderInjectionHandler), 716 url("/get_argument", GetArgumentHandler), 717 url("/get_arguments", GetArgumentsHandler), 718 ] 719 return urls 720 721 def fetch_json(self, *args, **kwargs): 722 response = self.fetch(*args, **kwargs) 723 response.rethrow() 724 return json_decode(response.body) 725 726 def test_types(self): 727 cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET, 728 "asdf", "qwer")) 729 response = self.fetch("/typecheck/asdf?foo=bar", 730 headers={"Cookie": "asdf=" + cookie_value}) 731 data = json_decode(response.body) 732 self.assertEqual(data, {}) 733 734 response = self.fetch("/typecheck/asdf?foo=bar", method="POST", 735 headers={"Cookie": "asdf=" + cookie_value}, 736 body="foo=bar") 737 738 def test_decode_argument(self): 739 # These urls all decode to the same thing 740 urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8", 741 "/decode_arg/%E9?foo=%E9&encoding=latin1", 742 "/decode_arg_kw/%E9?foo=%E9&encoding=latin1", 743 ] 744 for req_url in urls: 745 response = self.fetch(req_url) 746 response.rethrow() 747 data = json_decode(response.body) 748 self.assertEqual(data, {u'path': [u'unicode', u'\u00e9'], 749 u'query': [u'unicode', u'\u00e9'], 750 }) 751 752 response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9") 753 response.rethrow() 754 data = json_decode(response.body) 755 self.assertEqual(data, {u'path': [u'bytes', u'c3a9'], 756 u'query': [u'bytes', u'c3a9'], 757 }) 758 759 def test_decode_argument_invalid_unicode(self): 760 # test that invalid unicode in URLs causes 400, not 500 761 with ExpectLog(gen_log, ".*Invalid unicode.*"): 762 response = self.fetch("/typecheck/invalid%FF") 763 self.assertEqual(response.code, 400) 764 response = self.fetch("/typecheck/invalid?foo=%FF") 765 self.assertEqual(response.code, 400) 766 767 def test_decode_argument_plus(self): 768 # These urls are all equivalent. 769 urls = ["/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8", 770 "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8"] 771 for req_url in urls: 772 response = self.fetch(req_url) 773 response.rethrow() 774 data = json_decode(response.body) 775 self.assertEqual(data, {u'path': [u'unicode', u'1 + 1'], 776 u'query': [u'unicode', u'1 + 1'], 777 }) 778 779 def test_reverse_url(self): 780 self.assertEqual(self.app.reverse_url('decode_arg', 'foo'), 781 '/decode_arg/foo') 782 self.assertEqual(self.app.reverse_url('decode_arg', 42), 783 '/decode_arg/42') 784 self.assertEqual(self.app.reverse_url('decode_arg', b'\xe9'), 785 '/decode_arg/%E9') 786 self.assertEqual(self.app.reverse_url('decode_arg', u'\u00e9'), 787 '/decode_arg/%C3%A9') 788 self.assertEqual(self.app.reverse_url('decode_arg', '1 + 1'), 789 '/decode_arg/1%20%2B%201') 790 791 def test_uimodule_unescaped(self): 792 response = self.fetch("/linkify") 793 self.assertEqual(response.body, 794 b"<a href=\"http://example.com\">http://example.com</a>") 795 796 def test_uimodule_resources(self): 797 response = self.fetch("/uimodule_resources") 798 self.assertEqual(response.body, b"""\ 799<html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/> 800<style type="text/css"> 801.entry { margin-bottom: 1em; } 802</style> 803<meta> 804</head><body> 805 806 807<div class="entry">...</div> 808 809 810<div class="entry">...</div> 811 812<script src="/common.js" type="text/javascript"></script> 813<script type="text/javascript"> 814//<![CDATA[ 815js_embed() 816//]]> 817</script> 818<script src="/analytics.js"/> 819</body></html>""") # noqa: E501 820 821 def test_optional_path(self): 822 self.assertEqual(self.fetch_json("/optional_path/foo"), 823 {u"path": u"foo"}) 824 self.assertEqual(self.fetch_json("/optional_path/"), 825 {u"path": None}) 826 827 def test_multi_header(self): 828 response = self.fetch("/multi_header") 829 self.assertEqual(response.headers["x-overwrite"], "2") 830 self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"]) 831 832 def test_redirect(self): 833 response = self.fetch("/redirect?permanent=1", follow_redirects=False) 834 self.assertEqual(response.code, 301) 835 response = self.fetch("/redirect?permanent=0", follow_redirects=False) 836 self.assertEqual(response.code, 302) 837 response = self.fetch("/redirect?status=307", follow_redirects=False) 838 self.assertEqual(response.code, 307) 839 840 def test_web_redirect(self): 841 response = self.fetch("/web_redirect_permanent", follow_redirects=False) 842 self.assertEqual(response.code, 301) 843 self.assertEqual(response.headers['Location'], '/web_redirect_newpath') 844 response = self.fetch("/web_redirect", follow_redirects=False) 845 self.assertEqual(response.code, 302) 846 self.assertEqual(response.headers['Location'], '/web_redirect_newpath') 847 848 def test_web_redirect_double_slash(self): 849 response = self.fetch("//web_redirect_double_slash", follow_redirects=False) 850 self.assertEqual(response.code, 301) 851 self.assertEqual(response.headers['Location'], '/web_redirect_newpath') 852 853 def test_header_injection(self): 854 response = self.fetch("/header_injection") 855 self.assertEqual(response.body, b"ok") 856 857 def test_get_argument(self): 858 response = self.fetch("/get_argument?foo=bar") 859 self.assertEqual(response.body, b"bar") 860 response = self.fetch("/get_argument?foo=") 861 self.assertEqual(response.body, b"") 862 response = self.fetch("/get_argument") 863 self.assertEqual(response.body, b"default") 864 865 # Test merging of query and body arguments. 866 # In singular form, body arguments take precedence over query arguments. 867 body = urllib_parse.urlencode(dict(foo="hello")) 868 response = self.fetch("/get_argument?foo=bar", method="POST", body=body) 869 self.assertEqual(response.body, b"hello") 870 # In plural methods they are merged. 871 response = self.fetch("/get_arguments?foo=bar", 872 method="POST", body=body) 873 self.assertEqual(json_decode(response.body), 874 dict(default=['bar', 'hello'], 875 query=['bar'], 876 body=['hello'])) 877 878 def test_get_query_arguments(self): 879 # send as a post so we can ensure the separation between query 880 # string and body arguments. 881 body = urllib_parse.urlencode(dict(foo="hello")) 882 response = self.fetch("/get_argument?source=query&foo=bar", 883 method="POST", body=body) 884 self.assertEqual(response.body, b"bar") 885 response = self.fetch("/get_argument?source=query&foo=", 886 method="POST", body=body) 887 self.assertEqual(response.body, b"") 888 response = self.fetch("/get_argument?source=query", 889 method="POST", body=body) 890 self.assertEqual(response.body, b"default") 891 892 def test_get_body_arguments(self): 893 body = urllib_parse.urlencode(dict(foo="bar")) 894 response = self.fetch("/get_argument?source=body&foo=hello", 895 method="POST", body=body) 896 self.assertEqual(response.body, b"bar") 897 898 body = urllib_parse.urlencode(dict(foo="")) 899 response = self.fetch("/get_argument?source=body&foo=hello", 900 method="POST", body=body) 901 self.assertEqual(response.body, b"") 902 903 body = urllib_parse.urlencode(dict()) 904 response = self.fetch("/get_argument?source=body&foo=hello", 905 method="POST", body=body) 906 self.assertEqual(response.body, b"default") 907 908 def test_no_gzip(self): 909 response = self.fetch('/get_argument') 910 self.assertNotIn('Accept-Encoding', response.headers.get('Vary', '')) 911 self.assertNotIn('gzip', response.headers.get('Content-Encoding', '')) 912 913 914class NonWSGIWebTests(WebTestCase): 915 def get_handlers(self): 916 return [("/flow_control", FlowControlHandler), 917 ("/empty_flush", EmptyFlushCallbackHandler), 918 ] 919 920 def test_flow_control(self): 921 self.assertEqual(self.fetch("/flow_control").body, b"123") 922 923 def test_empty_flush(self): 924 response = self.fetch("/empty_flush") 925 self.assertEqual(response.body, b"ok") 926 927 928@wsgi_safe 929class ErrorResponseTest(WebTestCase): 930 def get_handlers(self): 931 class DefaultHandler(RequestHandler): 932 def get(self): 933 if self.get_argument("status", None): 934 raise HTTPError(int(self.get_argument("status"))) 935 1 / 0 936 937 class WriteErrorHandler(RequestHandler): 938 def get(self): 939 if self.get_argument("status", None): 940 self.send_error(int(self.get_argument("status"))) 941 else: 942 1 / 0 943 944 def write_error(self, status_code, **kwargs): 945 self.set_header("Content-Type", "text/plain") 946 if "exc_info" in kwargs: 947 self.write("Exception: %s" % kwargs["exc_info"][0].__name__) 948 else: 949 self.write("Status: %d" % status_code) 950 951 class FailedWriteErrorHandler(RequestHandler): 952 def get(self): 953 1 / 0 954 955 def write_error(self, status_code, **kwargs): 956 raise Exception("exception in write_error") 957 958 return [url("/default", DefaultHandler), 959 url("/write_error", WriteErrorHandler), 960 url("/failed_write_error", FailedWriteErrorHandler), 961 ] 962 963 def test_default(self): 964 with ExpectLog(app_log, "Uncaught exception"): 965 response = self.fetch("/default") 966 self.assertEqual(response.code, 500) 967 self.assertTrue(b"500: Internal Server Error" in response.body) 968 969 response = self.fetch("/default?status=503") 970 self.assertEqual(response.code, 503) 971 self.assertTrue(b"503: Service Unavailable" in response.body) 972 973 response = self.fetch("/default?status=435") 974 self.assertEqual(response.code, 435) 975 self.assertTrue(b"435: Unknown" in response.body) 976 977 def test_write_error(self): 978 with ExpectLog(app_log, "Uncaught exception"): 979 response = self.fetch("/write_error") 980 self.assertEqual(response.code, 500) 981 self.assertEqual(b"Exception: ZeroDivisionError", response.body) 982 983 response = self.fetch("/write_error?status=503") 984 self.assertEqual(response.code, 503) 985 self.assertEqual(b"Status: 503", response.body) 986 987 def test_failed_write_error(self): 988 with ExpectLog(app_log, "Uncaught exception"): 989 response = self.fetch("/failed_write_error") 990 self.assertEqual(response.code, 500) 991 self.assertEqual(b"", response.body) 992 993 994@wsgi_safe 995class StaticFileTest(WebTestCase): 996 # The expected MD5 hash of robots.txt, used in tests that call 997 # StaticFileHandler.get_version 998 robots_txt_hash = b"f71d20196d4caf35b6a670db8c70b03d" 999 static_dir = os.path.join(os.path.dirname(__file__), 'static') 1000 1001 def get_handlers(self): 1002 class StaticUrlHandler(RequestHandler): 1003 def get(self, path): 1004 with_v = int(self.get_argument('include_version', 1)) 1005 self.write(self.static_url(path, include_version=with_v)) 1006 1007 class AbsoluteStaticUrlHandler(StaticUrlHandler): 1008 include_host = True 1009 1010 class OverrideStaticUrlHandler(RequestHandler): 1011 def get(self, path): 1012 do_include = bool(self.get_argument("include_host")) 1013 self.include_host = not do_include 1014 1015 regular_url = self.static_url(path) 1016 override_url = self.static_url(path, include_host=do_include) 1017 if override_url == regular_url: 1018 return self.write(str(False)) 1019 1020 protocol = self.request.protocol + "://" 1021 protocol_length = len(protocol) 1022 check_regular = regular_url.find(protocol, 0, protocol_length) 1023 check_override = override_url.find(protocol, 0, protocol_length) 1024 1025 if do_include: 1026 result = (check_override == 0 and check_regular == -1) 1027 else: 1028 result = (check_override == -1 and check_regular == 0) 1029 self.write(str(result)) 1030 1031 return [('/static_url/(.*)', StaticUrlHandler), 1032 ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler), 1033 ('/override_static_url/(.*)', OverrideStaticUrlHandler), 1034 ('/root_static/(.*)', StaticFileHandler, dict(path='/'))] 1035 1036 def get_app_kwargs(self): 1037 return dict(static_path=relpath('static')) 1038 1039 def test_static_files(self): 1040 response = self.fetch('/robots.txt') 1041 self.assertTrue(b"Disallow: /" in response.body) 1042 1043 response = self.fetch('/static/robots.txt') 1044 self.assertTrue(b"Disallow: /" in response.body) 1045 self.assertEqual(response.headers.get("Content-Type"), "text/plain") 1046 1047 def test_static_compressed_files(self): 1048 response = self.fetch("/static/sample.xml.gz") 1049 self.assertEqual(response.headers.get("Content-Type"), 1050 "application/gzip") 1051 response = self.fetch("/static/sample.xml.bz2") 1052 self.assertEqual(response.headers.get("Content-Type"), 1053 "application/octet-stream") 1054 # make sure the uncompressed file still has the correct type 1055 response = self.fetch("/static/sample.xml") 1056 self.assertTrue(response.headers.get("Content-Type") 1057 in set(("text/xml", "application/xml"))) 1058 1059 def test_static_url(self): 1060 response = self.fetch("/static_url/robots.txt") 1061 self.assertEqual(response.body, 1062 b"/static/robots.txt?v=" + self.robots_txt_hash) 1063 1064 def test_absolute_static_url(self): 1065 response = self.fetch("/abs_static_url/robots.txt") 1066 self.assertEqual(response.body, ( 1067 utf8(self.get_url("/")) + 1068 b"static/robots.txt?v=" + 1069 self.robots_txt_hash 1070 )) 1071 1072 def test_relative_version_exclusion(self): 1073 response = self.fetch("/static_url/robots.txt?include_version=0") 1074 self.assertEqual(response.body, b"/static/robots.txt") 1075 1076 def test_absolute_version_exclusion(self): 1077 response = self.fetch("/abs_static_url/robots.txt?include_version=0") 1078 self.assertEqual(response.body, 1079 utf8(self.get_url("/") + "static/robots.txt")) 1080 1081 def test_include_host_override(self): 1082 self._trigger_include_host_check(False) 1083 self._trigger_include_host_check(True) 1084 1085 def _trigger_include_host_check(self, include_host): 1086 path = "/override_static_url/robots.txt?include_host=%s" 1087 response = self.fetch(path % int(include_host)) 1088 self.assertEqual(response.body, utf8(str(True))) 1089 1090 def get_and_head(self, *args, **kwargs): 1091 """Performs a GET and HEAD request and returns the GET response. 1092 1093 Fails if any ``Content-*`` headers returned by the two requests 1094 differ. 1095 """ 1096 head_response = self.fetch(*args, method="HEAD", **kwargs) 1097 get_response = self.fetch(*args, method="GET", **kwargs) 1098 content_headers = set() 1099 for h in itertools.chain(head_response.headers, get_response.headers): 1100 if h.startswith('Content-'): 1101 content_headers.add(h) 1102 for h in content_headers: 1103 self.assertEqual(head_response.headers.get(h), 1104 get_response.headers.get(h), 1105 "%s differs between GET (%s) and HEAD (%s)" % 1106 (h, head_response.headers.get(h), 1107 get_response.headers.get(h))) 1108 return get_response 1109 1110 def test_static_304_if_modified_since(self): 1111 response1 = self.get_and_head("/static/robots.txt") 1112 response2 = self.get_and_head("/static/robots.txt", headers={ 1113 'If-Modified-Since': response1.headers['Last-Modified']}) 1114 self.assertEqual(response2.code, 304) 1115 self.assertTrue('Content-Length' not in response2.headers) 1116 self.assertTrue('Last-Modified' not in response2.headers) 1117 1118 def test_static_304_if_none_match(self): 1119 response1 = self.get_and_head("/static/robots.txt") 1120 response2 = self.get_and_head("/static/robots.txt", headers={ 1121 'If-None-Match': response1.headers['Etag']}) 1122 self.assertEqual(response2.code, 304) 1123 1124 def test_static_304_etag_modified_bug(self): 1125 response1 = self.get_and_head("/static/robots.txt") 1126 response2 = self.get_and_head("/static/robots.txt", headers={ 1127 'If-None-Match': '"MISMATCH"', 1128 'If-Modified-Since': response1.headers['Last-Modified']}) 1129 self.assertEqual(response2.code, 200) 1130 1131 def test_static_if_modified_since_pre_epoch(self): 1132 # On windows, the functions that work with time_t do not accept 1133 # negative values, and at least one client (processing.js) seems 1134 # to use if-modified-since 1/1/1960 as a cache-busting technique. 1135 response = self.get_and_head("/static/robots.txt", headers={ 1136 'If-Modified-Since': 'Fri, 01 Jan 1960 00:00:00 GMT'}) 1137 self.assertEqual(response.code, 200) 1138 1139 def test_static_if_modified_since_time_zone(self): 1140 # Instead of the value from Last-Modified, make requests with times 1141 # chosen just before and after the known modification time 1142 # of the file to ensure that the right time zone is being used 1143 # when parsing If-Modified-Since. 1144 stat = os.stat(relpath('static/robots.txt')) 1145 1146 response = self.get_and_head('/static/robots.txt', headers={ 1147 'If-Modified-Since': format_timestamp(stat.st_mtime - 1)}) 1148 self.assertEqual(response.code, 200) 1149 response = self.get_and_head('/static/robots.txt', headers={ 1150 'If-Modified-Since': format_timestamp(stat.st_mtime + 1)}) 1151 self.assertEqual(response.code, 304) 1152 1153 def test_static_etag(self): 1154 response = self.get_and_head('/static/robots.txt') 1155 self.assertEqual(utf8(response.headers.get("Etag")), 1156 b'"' + self.robots_txt_hash + b'"') 1157 1158 def test_static_with_range(self): 1159 response = self.get_and_head('/static/robots.txt', headers={ 1160 'Range': 'bytes=0-9'}) 1161 self.assertEqual(response.code, 206) 1162 self.assertEqual(response.body, b"User-agent") 1163 self.assertEqual(utf8(response.headers.get("Etag")), 1164 b'"' + self.robots_txt_hash + b'"') 1165 self.assertEqual(response.headers.get("Content-Length"), "10") 1166 self.assertEqual(response.headers.get("Content-Range"), 1167 "bytes 0-9/26") 1168 1169 def test_static_with_range_full_file(self): 1170 response = self.get_and_head('/static/robots.txt', headers={ 1171 'Range': 'bytes=0-'}) 1172 # Note: Chrome refuses to play audio if it gets an HTTP 206 in response 1173 # to ``Range: bytes=0-`` :( 1174 self.assertEqual(response.code, 200) 1175 robots_file_path = os.path.join(self.static_dir, "robots.txt") 1176 with open(robots_file_path) as f: 1177 self.assertEqual(response.body, utf8(f.read())) 1178 self.assertEqual(response.headers.get("Content-Length"), "26") 1179 self.assertEqual(response.headers.get("Content-Range"), None) 1180 1181 def test_static_with_range_full_past_end(self): 1182 response = self.get_and_head('/static/robots.txt', headers={ 1183 'Range': 'bytes=0-10000000'}) 1184 self.assertEqual(response.code, 200) 1185 robots_file_path = os.path.join(self.static_dir, "robots.txt") 1186 with open(robots_file_path) as f: 1187 self.assertEqual(response.body, utf8(f.read())) 1188 self.assertEqual(response.headers.get("Content-Length"), "26") 1189 self.assertEqual(response.headers.get("Content-Range"), None) 1190 1191 def test_static_with_range_partial_past_end(self): 1192 response = self.get_and_head('/static/robots.txt', headers={ 1193 'Range': 'bytes=1-10000000'}) 1194 self.assertEqual(response.code, 206) 1195 robots_file_path = os.path.join(self.static_dir, "robots.txt") 1196 with open(robots_file_path) as f: 1197 self.assertEqual(response.body, utf8(f.read()[1:])) 1198 self.assertEqual(response.headers.get("Content-Length"), "25") 1199 self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26") 1200 1201 def test_static_with_range_end_edge(self): 1202 response = self.get_and_head('/static/robots.txt', headers={ 1203 'Range': 'bytes=22-'}) 1204 self.assertEqual(response.body, b": /\n") 1205 self.assertEqual(response.headers.get("Content-Length"), "4") 1206 self.assertEqual(response.headers.get("Content-Range"), 1207 "bytes 22-25/26") 1208 1209 def test_static_with_range_neg_end(self): 1210 response = self.get_and_head('/static/robots.txt', headers={ 1211 'Range': 'bytes=-4'}) 1212 self.assertEqual(response.body, b": /\n") 1213 self.assertEqual(response.headers.get("Content-Length"), "4") 1214 self.assertEqual(response.headers.get("Content-Range"), 1215 "bytes 22-25/26") 1216 1217 def test_static_invalid_range(self): 1218 response = self.get_and_head('/static/robots.txt', headers={ 1219 'Range': 'asdf'}) 1220 self.assertEqual(response.code, 200) 1221 1222 def test_static_unsatisfiable_range_zero_suffix(self): 1223 response = self.get_and_head('/static/robots.txt', headers={ 1224 'Range': 'bytes=-0'}) 1225 self.assertEqual(response.headers.get("Content-Range"), 1226 "bytes */26") 1227 self.assertEqual(response.code, 416) 1228 1229 def test_static_unsatisfiable_range_invalid_start(self): 1230 response = self.get_and_head('/static/robots.txt', headers={ 1231 'Range': 'bytes=26'}) 1232 self.assertEqual(response.code, 416) 1233 self.assertEqual(response.headers.get("Content-Range"), 1234 "bytes */26") 1235 1236 def test_static_head(self): 1237 response = self.fetch('/static/robots.txt', method='HEAD') 1238 self.assertEqual(response.code, 200) 1239 # No body was returned, but we did get the right content length. 1240 self.assertEqual(response.body, b'') 1241 self.assertEqual(response.headers['Content-Length'], '26') 1242 self.assertEqual(utf8(response.headers['Etag']), 1243 b'"' + self.robots_txt_hash + b'"') 1244 1245 def test_static_head_range(self): 1246 response = self.fetch('/static/robots.txt', method='HEAD', 1247 headers={'Range': 'bytes=1-4'}) 1248 self.assertEqual(response.code, 206) 1249 self.assertEqual(response.body, b'') 1250 self.assertEqual(response.headers['Content-Length'], '4') 1251 self.assertEqual(utf8(response.headers['Etag']), 1252 b'"' + self.robots_txt_hash + b'"') 1253 1254 def test_static_range_if_none_match(self): 1255 response = self.get_and_head('/static/robots.txt', headers={ 1256 'Range': 'bytes=1-4', 1257 'If-None-Match': b'"' + self.robots_txt_hash + b'"'}) 1258 self.assertEqual(response.code, 304) 1259 self.assertEqual(response.body, b'') 1260 self.assertTrue('Content-Length' not in response.headers) 1261 self.assertEqual(utf8(response.headers['Etag']), 1262 b'"' + self.robots_txt_hash + b'"') 1263 1264 def test_static_404(self): 1265 response = self.get_and_head('/static/blarg') 1266 self.assertEqual(response.code, 404) 1267 1268 def test_path_traversal_protection(self): 1269 # curl_httpclient processes ".." on the client side, so we 1270 # must test this with simple_httpclient. 1271 self.http_client.close() 1272 self.http_client = SimpleAsyncHTTPClient() 1273 with ExpectLog(gen_log, ".*not in root static directory"): 1274 response = self.get_and_head('/static/../static_foo.txt') 1275 # Attempted path traversal should result in 403, not 200 1276 # (which means the check failed and the file was served) 1277 # or 404 (which means that the file didn't exist and 1278 # is probably a packaging error). 1279 self.assertEqual(response.code, 403) 1280 1281 @unittest.skipIf(os.name != 'posix', 'non-posix OS') 1282 def test_root_static_path(self): 1283 # Sometimes people set the StaticFileHandler's path to '/' 1284 # to disable Tornado's path validation (in conjunction with 1285 # their own validation in get_absolute_path). Make sure 1286 # that the stricter validation in 4.2.1 doesn't break them. 1287 path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 1288 'static/robots.txt') 1289 response = self.get_and_head('/root_static' + urllib_parse.quote(path)) 1290 self.assertEqual(response.code, 200) 1291 1292 1293@wsgi_safe 1294class StaticDefaultFilenameTest(WebTestCase): 1295 def get_app_kwargs(self): 1296 return dict(static_path=relpath('static'), 1297 static_handler_args=dict(default_filename='index.html')) 1298 1299 def get_handlers(self): 1300 return [] 1301 1302 def test_static_default_filename(self): 1303 response = self.fetch('/static/dir/', follow_redirects=False) 1304 self.assertEqual(response.code, 200) 1305 self.assertEqual(b'this is the index\n', response.body) 1306 1307 def test_static_default_redirect(self): 1308 response = self.fetch('/static/dir', follow_redirects=False) 1309 self.assertEqual(response.code, 301) 1310 self.assertTrue(response.headers['Location'].endswith('/static/dir/')) 1311 1312 1313@wsgi_safe 1314class StaticFileWithPathTest(WebTestCase): 1315 def get_app_kwargs(self): 1316 return dict(static_path=relpath('static'), 1317 static_handler_args=dict(default_filename='index.html')) 1318 1319 def get_handlers(self): 1320 return [("/foo/(.*)", StaticFileHandler, { 1321 "path": relpath("templates/"), 1322 })] 1323 1324 def test_serve(self): 1325 response = self.fetch("/foo/utf8.html") 1326 self.assertEqual(response.body, b"H\xc3\xa9llo\n") 1327 1328 1329@wsgi_safe 1330class CustomStaticFileTest(WebTestCase): 1331 def get_handlers(self): 1332 class MyStaticFileHandler(StaticFileHandler): 1333 @classmethod 1334 def make_static_url(cls, settings, path): 1335 version_hash = cls.get_version(settings, path) 1336 extension_index = path.rindex('.') 1337 before_version = path[:extension_index] 1338 after_version = path[(extension_index + 1):] 1339 return '/static/%s.%s.%s' % (before_version, version_hash, 1340 after_version) 1341 1342 def parse_url_path(self, url_path): 1343 extension_index = url_path.rindex('.') 1344 version_index = url_path.rindex('.', 0, extension_index) 1345 return '%s%s' % (url_path[:version_index], 1346 url_path[extension_index:]) 1347 1348 @classmethod 1349 def get_absolute_path(cls, settings, path): 1350 return 'CustomStaticFileTest:' + path 1351 1352 def validate_absolute_path(self, root, absolute_path): 1353 return absolute_path 1354 1355 @classmethod 1356 def get_content(self, path, start=None, end=None): 1357 assert start is None and end is None 1358 if path == 'CustomStaticFileTest:foo.txt': 1359 return b'bar' 1360 raise Exception("unexpected path %r" % path) 1361 1362 def get_content_size(self): 1363 if self.absolute_path == 'CustomStaticFileTest:foo.txt': 1364 return 3 1365 raise Exception("unexpected path %r" % self.absolute_path) 1366 1367 def get_modified_time(self): 1368 return None 1369 1370 @classmethod 1371 def get_version(cls, settings, path): 1372 return "42" 1373 1374 class StaticUrlHandler(RequestHandler): 1375 def get(self, path): 1376 self.write(self.static_url(path)) 1377 1378 self.static_handler_class = MyStaticFileHandler 1379 1380 return [("/static_url/(.*)", StaticUrlHandler)] 1381 1382 def get_app_kwargs(self): 1383 return dict(static_path="dummy", 1384 static_handler_class=self.static_handler_class) 1385 1386 def test_serve(self): 1387 response = self.fetch("/static/foo.42.txt") 1388 self.assertEqual(response.body, b"bar") 1389 1390 def test_static_url(self): 1391 with ExpectLog(gen_log, "Could not open static file", required=False): 1392 response = self.fetch("/static_url/foo.txt") 1393 self.assertEqual(response.body, b"/static/foo.42.txt") 1394 1395 1396@wsgi_safe 1397class HostMatchingTest(WebTestCase): 1398 class Handler(RequestHandler): 1399 def initialize(self, reply): 1400 self.reply = reply 1401 1402 def get(self): 1403 self.write(self.reply) 1404 1405 def get_handlers(self): 1406 return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})] 1407 1408 def test_host_matching(self): 1409 self.app.add_handlers("www.example.com", 1410 [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})]) 1411 self.app.add_handlers(r"www\.example\.com", 1412 [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})]) 1413 self.app.add_handlers("www.example.com", 1414 [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})]) 1415 self.app.add_handlers("www.e.*e.com", 1416 [("/baz", HostMatchingTest.Handler, {"reply": "[3]"})]) 1417 1418 response = self.fetch("/foo") 1419 self.assertEqual(response.body, b"wildcard") 1420 response = self.fetch("/bar") 1421 self.assertEqual(response.code, 404) 1422 response = self.fetch("/baz") 1423 self.assertEqual(response.code, 404) 1424 1425 response = self.fetch("/foo", headers={'Host': 'www.example.com'}) 1426 self.assertEqual(response.body, b"[0]") 1427 response = self.fetch("/bar", headers={'Host': 'www.example.com'}) 1428 self.assertEqual(response.body, b"[1]") 1429 response = self.fetch("/baz", headers={'Host': 'www.example.com'}) 1430 self.assertEqual(response.body, b"[2]") 1431 response = self.fetch("/baz", headers={'Host': 'www.exe.com'}) 1432 self.assertEqual(response.body, b"[3]") 1433 1434 1435@wsgi_safe 1436class DefaultHostMatchingTest(WebTestCase): 1437 def get_handlers(self): 1438 return [] 1439 1440 def get_app_kwargs(self): 1441 return {'default_host': "www.example.com"} 1442 1443 def test_default_host_matching(self): 1444 self.app.add_handlers("www.example.com", 1445 [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})]) 1446 self.app.add_handlers(r"www\.example\.com", 1447 [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})]) 1448 self.app.add_handlers("www.test.com", 1449 [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})]) 1450 1451 response = self.fetch("/foo") 1452 self.assertEqual(response.body, b"[0]") 1453 response = self.fetch("/bar") 1454 self.assertEqual(response.body, b"[1]") 1455 response = self.fetch("/baz") 1456 self.assertEqual(response.code, 404) 1457 1458 response = self.fetch("/foo", headers={"X-Real-Ip": "127.0.0.1"}) 1459 self.assertEqual(response.code, 404) 1460 1461 self.app.default_host = "www.test.com" 1462 1463 response = self.fetch("/baz") 1464 self.assertEqual(response.body, b"[2]") 1465 1466 1467@wsgi_safe 1468class NamedURLSpecGroupsTest(WebTestCase): 1469 def get_handlers(self): 1470 class EchoHandler(RequestHandler): 1471 def get(self, path): 1472 self.write(path) 1473 1474 return [("/str/(?P<path>.*)", EchoHandler), 1475 (u"/unicode/(?P<path>.*)", EchoHandler)] 1476 1477 def test_named_urlspec_groups(self): 1478 response = self.fetch("/str/foo") 1479 self.assertEqual(response.body, b"foo") 1480 1481 response = self.fetch("/unicode/bar") 1482 self.assertEqual(response.body, b"bar") 1483 1484 1485@wsgi_safe 1486class ClearHeaderTest(SimpleHandlerTestCase): 1487 class Handler(RequestHandler): 1488 def get(self): 1489 self.set_header("h1", "foo") 1490 self.set_header("h2", "bar") 1491 self.clear_header("h1") 1492 self.clear_header("nonexistent") 1493 1494 def test_clear_header(self): 1495 response = self.fetch("/") 1496 self.assertTrue("h1" not in response.headers) 1497 self.assertEqual(response.headers["h2"], "bar") 1498 1499 1500class Header204Test(SimpleHandlerTestCase): 1501 class Handler(RequestHandler): 1502 def get(self): 1503 self.set_status(204) 1504 self.finish() 1505 1506 def test_204_headers(self): 1507 response = self.fetch('/') 1508 self.assertEqual(response.code, 204) 1509 self.assertNotIn("Content-Length", response.headers) 1510 self.assertNotIn("Transfer-Encoding", response.headers) 1511 1512 1513@wsgi_safe 1514class Header304Test(SimpleHandlerTestCase): 1515 class Handler(RequestHandler): 1516 def get(self): 1517 self.set_header("Content-Language", "en_US") 1518 self.write("hello") 1519 1520 def test_304_headers(self): 1521 response1 = self.fetch('/') 1522 self.assertEqual(response1.headers["Content-Length"], "5") 1523 self.assertEqual(response1.headers["Content-Language"], "en_US") 1524 1525 response2 = self.fetch('/', headers={ 1526 'If-None-Match': response1.headers["Etag"]}) 1527 self.assertEqual(response2.code, 304) 1528 self.assertTrue("Content-Length" not in response2.headers) 1529 self.assertTrue("Content-Language" not in response2.headers) 1530 # Not an entity header, but should not be added to 304s by chunking 1531 self.assertTrue("Transfer-Encoding" not in response2.headers) 1532 1533 1534@wsgi_safe 1535class StatusReasonTest(SimpleHandlerTestCase): 1536 class Handler(RequestHandler): 1537 def get(self): 1538 reason = self.request.arguments.get('reason', []) 1539 self.set_status(int(self.get_argument('code')), 1540 reason=reason[0] if reason else None) 1541 1542 def get_http_client(self): 1543 # simple_httpclient only: curl doesn't expose the reason string 1544 return SimpleAsyncHTTPClient() 1545 1546 def test_status(self): 1547 response = self.fetch("/?code=304") 1548 self.assertEqual(response.code, 304) 1549 self.assertEqual(response.reason, "Not Modified") 1550 response = self.fetch("/?code=304&reason=Foo") 1551 self.assertEqual(response.code, 304) 1552 self.assertEqual(response.reason, "Foo") 1553 response = self.fetch("/?code=682&reason=Bar") 1554 self.assertEqual(response.code, 682) 1555 self.assertEqual(response.reason, "Bar") 1556 response = self.fetch("/?code=682") 1557 self.assertEqual(response.code, 682) 1558 self.assertEqual(response.reason, "Unknown") 1559 1560 1561@wsgi_safe 1562class DateHeaderTest(SimpleHandlerTestCase): 1563 class Handler(RequestHandler): 1564 def get(self): 1565 self.write("hello") 1566 1567 def test_date_header(self): 1568 response = self.fetch('/') 1569 header_date = datetime.datetime( 1570 *email.utils.parsedate(response.headers['Date'])[:6]) 1571 self.assertTrue(header_date - datetime.datetime.utcnow() < 1572 datetime.timedelta(seconds=2)) 1573 1574 1575@wsgi_safe 1576class RaiseWithReasonTest(SimpleHandlerTestCase): 1577 class Handler(RequestHandler): 1578 def get(self): 1579 raise HTTPError(682, reason="Foo") 1580 1581 def get_http_client(self): 1582 # simple_httpclient only: curl doesn't expose the reason string 1583 return SimpleAsyncHTTPClient() 1584 1585 def test_raise_with_reason(self): 1586 response = self.fetch("/") 1587 self.assertEqual(response.code, 682) 1588 self.assertEqual(response.reason, "Foo") 1589 self.assertIn(b'682: Foo', response.body) 1590 1591 def test_httperror_str(self): 1592 self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo") 1593 1594 def test_httperror_str_from_httputil(self): 1595 self.assertEqual(str(HTTPError(682)), "HTTP 682: Unknown") 1596 1597 1598@wsgi_safe 1599class ErrorHandlerXSRFTest(WebTestCase): 1600 def get_handlers(self): 1601 # note that if the handlers list is empty we get the default_host 1602 # redirect fallback instead of a 404, so test with both an 1603 # explicitly defined error handler and an implicit 404. 1604 return [('/error', ErrorHandler, dict(status_code=417))] 1605 1606 def get_app_kwargs(self): 1607 return dict(xsrf_cookies=True) 1608 1609 def test_error_xsrf(self): 1610 response = self.fetch('/error', method='POST', body='') 1611 self.assertEqual(response.code, 417) 1612 1613 def test_404_xsrf(self): 1614 response = self.fetch('/404', method='POST', body='') 1615 self.assertEqual(response.code, 404) 1616 1617 1618@wsgi_safe 1619class GzipTestCase(SimpleHandlerTestCase): 1620 class Handler(RequestHandler): 1621 def get(self): 1622 for v in self.get_arguments('vary'): 1623 self.add_header('Vary', v) 1624 # Must write at least MIN_LENGTH bytes to activate compression. 1625 self.write('hello world' + ('!' * GZipContentEncoding.MIN_LENGTH)) 1626 1627 def get_app_kwargs(self): 1628 return dict( 1629 gzip=True, 1630 static_path=os.path.join(os.path.dirname(__file__), 'static')) 1631 1632 def assert_compressed(self, response): 1633 # simple_httpclient renames the content-encoding header; 1634 # curl_httpclient doesn't. 1635 self.assertEqual( 1636 response.headers.get( 1637 'Content-Encoding', 1638 response.headers.get('X-Consumed-Content-Encoding')), 1639 'gzip') 1640 1641 def test_gzip(self): 1642 response = self.fetch('/') 1643 self.assert_compressed(response) 1644 self.assertEqual(response.headers['Vary'], 'Accept-Encoding') 1645 1646 def test_gzip_static(self): 1647 # The streaming responses in StaticFileHandler have subtle 1648 # interactions with the gzip output so test this case separately. 1649 response = self.fetch('/robots.txt') 1650 self.assert_compressed(response) 1651 self.assertEqual(response.headers['Vary'], 'Accept-Encoding') 1652 1653 def test_gzip_not_requested(self): 1654 response = self.fetch('/', use_gzip=False) 1655 self.assertNotIn('Content-Encoding', response.headers) 1656 self.assertEqual(response.headers['Vary'], 'Accept-Encoding') 1657 1658 def test_vary_already_present(self): 1659 response = self.fetch('/?vary=Accept-Language') 1660 self.assert_compressed(response) 1661 self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')], 1662 ['Accept-Language', 'Accept-Encoding']) 1663 1664 def test_vary_already_present_multiple(self): 1665 # Regression test for https://github.com/tornadoweb/tornado/issues/1670 1666 response = self.fetch('/?vary=Accept-Language&vary=Cookie') 1667 self.assert_compressed(response) 1668 self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')], 1669 ['Accept-Language', 'Cookie', 'Accept-Encoding']) 1670 1671 1672@wsgi_safe 1673class PathArgsInPrepareTest(WebTestCase): 1674 class Handler(RequestHandler): 1675 def prepare(self): 1676 self.write(dict(args=self.path_args, kwargs=self.path_kwargs)) 1677 1678 def get(self, path): 1679 assert path == 'foo' 1680 self.finish() 1681 1682 def get_handlers(self): 1683 return [('/pos/(.*)', self.Handler), 1684 ('/kw/(?P<path>.*)', self.Handler)] 1685 1686 def test_pos(self): 1687 response = self.fetch('/pos/foo') 1688 response.rethrow() 1689 data = json_decode(response.body) 1690 self.assertEqual(data, {'args': ['foo'], 'kwargs': {}}) 1691 1692 def test_kw(self): 1693 response = self.fetch('/kw/foo') 1694 response.rethrow() 1695 data = json_decode(response.body) 1696 self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}}) 1697 1698 1699@wsgi_safe 1700class ClearAllCookiesTest(SimpleHandlerTestCase): 1701 class Handler(RequestHandler): 1702 def get(self): 1703 self.clear_all_cookies() 1704 self.write('ok') 1705 1706 def test_clear_all_cookies(self): 1707 response = self.fetch('/', headers={'Cookie': 'foo=bar; baz=xyzzy'}) 1708 set_cookies = sorted(response.headers.get_list('Set-Cookie')) 1709 # Python 3.5 sends 'baz="";'; older versions use 'baz=;' 1710 self.assertTrue(set_cookies[0].startswith('baz=;') or 1711 set_cookies[0].startswith('baz="";')) 1712 self.assertTrue(set_cookies[1].startswith('foo=;') or 1713 set_cookies[1].startswith('foo="";')) 1714 1715 1716class PermissionError(Exception): 1717 pass 1718 1719 1720@wsgi_safe 1721class ExceptionHandlerTest(SimpleHandlerTestCase): 1722 class Handler(RequestHandler): 1723 def get(self): 1724 exc = self.get_argument('exc') 1725 if exc == 'http': 1726 raise HTTPError(410, "no longer here") 1727 elif exc == 'zero': 1728 1 / 0 1729 elif exc == 'permission': 1730 raise PermissionError('not allowed') 1731 1732 def write_error(self, status_code, **kwargs): 1733 if 'exc_info' in kwargs: 1734 typ, value, tb = kwargs['exc_info'] 1735 if isinstance(value, PermissionError): 1736 self.set_status(403) 1737 self.write('PermissionError') 1738 return 1739 RequestHandler.write_error(self, status_code, **kwargs) 1740 1741 def log_exception(self, typ, value, tb): 1742 if isinstance(value, PermissionError): 1743 app_log.warning('custom logging for PermissionError: %s', 1744 value.args[0]) 1745 else: 1746 RequestHandler.log_exception(self, typ, value, tb) 1747 1748 def test_http_error(self): 1749 # HTTPErrors are logged as warnings with no stack trace. 1750 # TODO: extend ExpectLog to test this more precisely 1751 with ExpectLog(gen_log, '.*no longer here'): 1752 response = self.fetch('/?exc=http') 1753 self.assertEqual(response.code, 410) 1754 1755 def test_unknown_error(self): 1756 # Unknown errors are logged as errors with a stack trace. 1757 with ExpectLog(app_log, 'Uncaught exception'): 1758 response = self.fetch('/?exc=zero') 1759 self.assertEqual(response.code, 500) 1760 1761 def test_known_error(self): 1762 # log_exception can override logging behavior, and write_error 1763 # can override the response. 1764 with ExpectLog(app_log, 1765 'custom logging for PermissionError: not allowed'): 1766 response = self.fetch('/?exc=permission') 1767 self.assertEqual(response.code, 403) 1768 1769 1770@wsgi_safe 1771class BuggyLoggingTest(SimpleHandlerTestCase): 1772 class Handler(RequestHandler): 1773 def get(self): 1774 1 / 0 1775 1776 def log_exception(self, typ, value, tb): 1777 1 / 0 1778 1779 def test_buggy_log_exception(self): 1780 # Something gets logged even though the application's 1781 # logger is broken. 1782 with ExpectLog(app_log, '.*'): 1783 self.fetch('/') 1784 1785 1786@wsgi_safe 1787class UIMethodUIModuleTest(SimpleHandlerTestCase): 1788 """Test that UI methods and modules are created correctly and 1789 associated with the handler. 1790 """ 1791 class Handler(RequestHandler): 1792 def get(self): 1793 self.render('foo.html') 1794 1795 def value(self): 1796 return self.get_argument("value") 1797 1798 def get_app_kwargs(self): 1799 def my_ui_method(handler, x): 1800 return "In my_ui_method(%s) with handler value %s." % ( 1801 x, handler.value()) 1802 1803 class MyModule(UIModule): 1804 def render(self, x): 1805 return "In MyModule(%s) with handler value %s." % ( 1806 x, self.handler.value()) 1807 1808 loader = DictLoader({ 1809 'foo.html': '{{ my_ui_method(42) }} {% module MyModule(123) %}', 1810 }) 1811 return dict(template_loader=loader, 1812 ui_methods={'my_ui_method': my_ui_method}, 1813 ui_modules={'MyModule': MyModule}) 1814 1815 def tearDown(self): 1816 super(UIMethodUIModuleTest, self).tearDown() 1817 # TODO: fix template loader caching so this isn't necessary. 1818 RequestHandler._template_loaders.clear() 1819 1820 def test_ui_method(self): 1821 response = self.fetch('/?value=asdf') 1822 self.assertEqual(response.body, 1823 b'In my_ui_method(42) with handler value asdf. ' 1824 b'In MyModule(123) with handler value asdf.') 1825 1826 1827@wsgi_safe 1828class GetArgumentErrorTest(SimpleHandlerTestCase): 1829 class Handler(RequestHandler): 1830 def get(self): 1831 try: 1832 self.get_argument('foo') 1833 self.write({}) 1834 except MissingArgumentError as e: 1835 self.write({'arg_name': e.arg_name, 1836 'log_message': e.log_message}) 1837 1838 def test_catch_error(self): 1839 response = self.fetch('/') 1840 self.assertEqual(json_decode(response.body), 1841 {'arg_name': 'foo', 1842 'log_message': 'Missing argument foo'}) 1843 1844 1845class MultipleExceptionTest(SimpleHandlerTestCase): 1846 class Handler(RequestHandler): 1847 exc_count = 0 1848 1849 with ignore_deprecation(): 1850 @asynchronous 1851 def get(self): 1852 IOLoop.current().add_callback(lambda: 1 / 0) 1853 IOLoop.current().add_callback(lambda: 1 / 0) 1854 1855 def log_exception(self, typ, value, tb): 1856 MultipleExceptionTest.Handler.exc_count += 1 1857 1858 def test_multi_exception(self): 1859 with ignore_deprecation(): 1860 # This test verifies that multiple exceptions raised into the same 1861 # ExceptionStackContext do not generate extraneous log entries 1862 # due to "Cannot send error response after headers written". 1863 # log_exception is called, but it does not proceed to send_error. 1864 response = self.fetch('/') 1865 self.assertEqual(response.code, 500) 1866 response = self.fetch('/') 1867 self.assertEqual(response.code, 500) 1868 # Each of our two requests generated two exceptions, we should have 1869 # seen at least three of them by now (the fourth may still be 1870 # in the queue). 1871 self.assertGreater(MultipleExceptionTest.Handler.exc_count, 2) 1872 1873 1874@wsgi_safe 1875class SetLazyPropertiesTest(SimpleHandlerTestCase): 1876 class Handler(RequestHandler): 1877 def prepare(self): 1878 self.current_user = 'Ben' 1879 self.locale = locale.get('en_US') 1880 1881 def get_user_locale(self): 1882 raise NotImplementedError() 1883 1884 def get_current_user(self): 1885 raise NotImplementedError() 1886 1887 def get(self): 1888 self.write('Hello %s (%s)' % (self.current_user, self.locale.code)) 1889 1890 def test_set_properties(self): 1891 # Ensure that current_user can be assigned to normally for apps 1892 # that want to forgo the lazy get_current_user property 1893 response = self.fetch('/') 1894 self.assertEqual(response.body, b'Hello Ben (en_US)') 1895 1896 1897@wsgi_safe 1898class GetCurrentUserTest(WebTestCase): 1899 def get_app_kwargs(self): 1900 class WithoutUserModule(UIModule): 1901 def render(self): 1902 return '' 1903 1904 class WithUserModule(UIModule): 1905 def render(self): 1906 return str(self.current_user) 1907 1908 loader = DictLoader({ 1909 'without_user.html': '', 1910 'with_user.html': '{{ current_user }}', 1911 'without_user_module.html': '{% module WithoutUserModule() %}', 1912 'with_user_module.html': '{% module WithUserModule() %}', 1913 }) 1914 return dict(template_loader=loader, 1915 ui_modules={'WithUserModule': WithUserModule, 1916 'WithoutUserModule': WithoutUserModule}) 1917 1918 def tearDown(self): 1919 super(GetCurrentUserTest, self).tearDown() 1920 RequestHandler._template_loaders.clear() 1921 1922 def get_handlers(self): 1923 class CurrentUserHandler(RequestHandler): 1924 def prepare(self): 1925 self.has_loaded_current_user = False 1926 1927 def get_current_user(self): 1928 self.has_loaded_current_user = True 1929 return '' 1930 1931 class WithoutUserHandler(CurrentUserHandler): 1932 def get(self): 1933 self.render_string('without_user.html') 1934 self.finish(str(self.has_loaded_current_user)) 1935 1936 class WithUserHandler(CurrentUserHandler): 1937 def get(self): 1938 self.render_string('with_user.html') 1939 self.finish(str(self.has_loaded_current_user)) 1940 1941 class CurrentUserModuleHandler(CurrentUserHandler): 1942 def get_template_namespace(self): 1943 # If RequestHandler.get_template_namespace is called, then 1944 # get_current_user is evaluated. Until #820 is fixed, this 1945 # is a small hack to circumvent the issue. 1946 return self.ui 1947 1948 class WithoutUserModuleHandler(CurrentUserModuleHandler): 1949 def get(self): 1950 self.render_string('without_user_module.html') 1951 self.finish(str(self.has_loaded_current_user)) 1952 1953 class WithUserModuleHandler(CurrentUserModuleHandler): 1954 def get(self): 1955 self.render_string('with_user_module.html') 1956 self.finish(str(self.has_loaded_current_user)) 1957 1958 return [('/without_user', WithoutUserHandler), 1959 ('/with_user', WithUserHandler), 1960 ('/without_user_module', WithoutUserModuleHandler), 1961 ('/with_user_module', WithUserModuleHandler)] 1962 1963 @unittest.skip('needs fix') 1964 def test_get_current_user_is_lazy(self): 1965 # TODO: Make this test pass. See #820. 1966 response = self.fetch('/without_user') 1967 self.assertEqual(response.body, b'False') 1968 1969 def test_get_current_user_works(self): 1970 response = self.fetch('/with_user') 1971 self.assertEqual(response.body, b'True') 1972 1973 def test_get_current_user_from_ui_module_is_lazy(self): 1974 response = self.fetch('/without_user_module') 1975 self.assertEqual(response.body, b'False') 1976 1977 def test_get_current_user_from_ui_module_works(self): 1978 response = self.fetch('/with_user_module') 1979 self.assertEqual(response.body, b'True') 1980 1981 1982@wsgi_safe 1983class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase): 1984 class Handler(RequestHandler): 1985 pass 1986 1987 def test_unimplemented_standard_methods(self): 1988 for method in ['HEAD', 'GET', 'DELETE', 'OPTIONS']: 1989 response = self.fetch('/', method=method) 1990 self.assertEqual(response.code, 405) 1991 for method in ['POST', 'PUT']: 1992 response = self.fetch('/', method=method, body=b'') 1993 self.assertEqual(response.code, 405) 1994 1995 1996class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase): 1997 # wsgiref.validate complains about unknown methods in a way that makes 1998 # this test not wsgi_safe. 1999 class Handler(RequestHandler): 2000 def other(self): 2001 # Even though this method exists, it won't get called automatically 2002 # because it is not in SUPPORTED_METHODS. 2003 self.write('other') 2004 2005 def test_unimplemented_patch(self): 2006 # PATCH is recently standardized; Tornado supports it by default 2007 # but wsgiref.validate doesn't like it. 2008 response = self.fetch('/', method='PATCH', body=b'') 2009 self.assertEqual(response.code, 405) 2010 2011 def test_unimplemented_other(self): 2012 response = self.fetch('/', method='OTHER', 2013 allow_nonstandard_methods=True) 2014 self.assertEqual(response.code, 405) 2015 2016 2017@wsgi_safe 2018class AllHTTPMethodsTest(SimpleHandlerTestCase): 2019 class Handler(RequestHandler): 2020 def method(self): 2021 self.write(self.request.method) 2022 2023 get = delete = options = post = put = method 2024 2025 def test_standard_methods(self): 2026 response = self.fetch('/', method='HEAD') 2027 self.assertEqual(response.body, b'') 2028 for method in ['GET', 'DELETE', 'OPTIONS']: 2029 response = self.fetch('/', method=method) 2030 self.assertEqual(response.body, utf8(method)) 2031 for method in ['POST', 'PUT']: 2032 response = self.fetch('/', method=method, body=b'') 2033 self.assertEqual(response.body, utf8(method)) 2034 2035 2036class PatchMethodTest(SimpleHandlerTestCase): 2037 class Handler(RequestHandler): 2038 SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ('OTHER',) 2039 2040 def patch(self): 2041 self.write('patch') 2042 2043 def other(self): 2044 self.write('other') 2045 2046 def test_patch(self): 2047 response = self.fetch('/', method='PATCH', body=b'') 2048 self.assertEqual(response.body, b'patch') 2049 2050 def test_other(self): 2051 response = self.fetch('/', method='OTHER', 2052 allow_nonstandard_methods=True) 2053 self.assertEqual(response.body, b'other') 2054 2055 2056@wsgi_safe 2057class FinishInPrepareTest(SimpleHandlerTestCase): 2058 class Handler(RequestHandler): 2059 def prepare(self): 2060 self.finish('done') 2061 2062 def get(self): 2063 # It's difficult to assert for certain that a method did not 2064 # or will not be called in an asynchronous context, but this 2065 # will be logged noisily if it is reached. 2066 raise Exception('should not reach this method') 2067 2068 def test_finish_in_prepare(self): 2069 response = self.fetch('/') 2070 self.assertEqual(response.body, b'done') 2071 2072 2073@wsgi_safe 2074class Default404Test(WebTestCase): 2075 def get_handlers(self): 2076 # If there are no handlers at all a default redirect handler gets added. 2077 return [('/foo', RequestHandler)] 2078 2079 def test_404(self): 2080 response = self.fetch('/') 2081 self.assertEqual(response.code, 404) 2082 self.assertEqual(response.body, 2083 b'<html><title>404: Not Found</title>' 2084 b'<body>404: Not Found</body></html>') 2085 2086 2087@wsgi_safe 2088class Custom404Test(WebTestCase): 2089 def get_handlers(self): 2090 return [('/foo', RequestHandler)] 2091 2092 def get_app_kwargs(self): 2093 class Custom404Handler(RequestHandler): 2094 def get(self): 2095 self.set_status(404) 2096 self.write('custom 404 response') 2097 2098 return dict(default_handler_class=Custom404Handler) 2099 2100 def test_404(self): 2101 response = self.fetch('/') 2102 self.assertEqual(response.code, 404) 2103 self.assertEqual(response.body, b'custom 404 response') 2104 2105 2106@wsgi_safe 2107class DefaultHandlerArgumentsTest(WebTestCase): 2108 def get_handlers(self): 2109 return [('/foo', RequestHandler)] 2110 2111 def get_app_kwargs(self): 2112 return dict(default_handler_class=ErrorHandler, 2113 default_handler_args=dict(status_code=403)) 2114 2115 def test_403(self): 2116 response = self.fetch('/') 2117 self.assertEqual(response.code, 403) 2118 2119 2120@wsgi_safe 2121class HandlerByNameTest(WebTestCase): 2122 def get_handlers(self): 2123 # All three are equivalent. 2124 return [('/hello1', HelloHandler), 2125 ('/hello2', 'tornado.test.web_test.HelloHandler'), 2126 url('/hello3', 'tornado.test.web_test.HelloHandler'), 2127 ] 2128 2129 def test_handler_by_name(self): 2130 resp = self.fetch('/hello1') 2131 self.assertEqual(resp.body, b'hello') 2132 resp = self.fetch('/hello2') 2133 self.assertEqual(resp.body, b'hello') 2134 resp = self.fetch('/hello3') 2135 self.assertEqual(resp.body, b'hello') 2136 2137 2138class StreamingRequestBodyTest(WebTestCase): 2139 def get_handlers(self): 2140 @stream_request_body 2141 class StreamingBodyHandler(RequestHandler): 2142 def initialize(self, test): 2143 self.test = test 2144 2145 def prepare(self): 2146 self.test.prepared.set_result(None) 2147 2148 def data_received(self, data): 2149 self.test.data.set_result(data) 2150 2151 def get(self): 2152 self.test.finished.set_result(None) 2153 self.write({}) 2154 2155 @stream_request_body 2156 class EarlyReturnHandler(RequestHandler): 2157 def prepare(self): 2158 # If we finish the response in prepare, it won't continue to 2159 # the (non-existent) data_received. 2160 raise HTTPError(401) 2161 2162 @stream_request_body 2163 class CloseDetectionHandler(RequestHandler): 2164 def initialize(self, test): 2165 self.test = test 2166 2167 def on_connection_close(self): 2168 super(CloseDetectionHandler, self).on_connection_close() 2169 self.test.close_future.set_result(None) 2170 2171 return [('/stream_body', StreamingBodyHandler, dict(test=self)), 2172 ('/early_return', EarlyReturnHandler), 2173 ('/close_detection', CloseDetectionHandler, dict(test=self))] 2174 2175 def connect(self, url, connection_close): 2176 # Use a raw connection so we can control the sending of data. 2177 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 2178 s.connect(("127.0.0.1", self.get_http_port())) 2179 stream = IOStream(s) 2180 stream.write(b"GET " + url + b" HTTP/1.1\r\n") 2181 if connection_close: 2182 stream.write(b"Connection: close\r\n") 2183 stream.write(b"Transfer-Encoding: chunked\r\n\r\n") 2184 return stream 2185 2186 @gen_test 2187 def test_streaming_body(self): 2188 self.prepared = Future() 2189 self.data = Future() 2190 self.finished = Future() 2191 2192 stream = self.connect(b"/stream_body", connection_close=True) 2193 yield self.prepared 2194 stream.write(b"4\r\nasdf\r\n") 2195 # Ensure the first chunk is received before we send the second. 2196 data = yield self.data 2197 self.assertEqual(data, b"asdf") 2198 self.data = Future() 2199 stream.write(b"4\r\nqwer\r\n") 2200 data = yield self.data 2201 self.assertEquals(data, b"qwer") 2202 stream.write(b"0\r\n\r\n") 2203 yield self.finished 2204 data = yield stream.read_until_close() 2205 # This would ideally use an HTTP1Connection to read the response. 2206 self.assertTrue(data.endswith(b"{}")) 2207 stream.close() 2208 2209 @gen_test 2210 def test_early_return(self): 2211 stream = self.connect(b"/early_return", connection_close=False) 2212 data = yield stream.read_until_close() 2213 self.assertTrue(data.startswith(b"HTTP/1.1 401")) 2214 2215 @gen_test 2216 def test_early_return_with_data(self): 2217 stream = self.connect(b"/early_return", connection_close=False) 2218 stream.write(b"4\r\nasdf\r\n") 2219 data = yield stream.read_until_close() 2220 self.assertTrue(data.startswith(b"HTTP/1.1 401")) 2221 2222 @gen_test 2223 def test_close_during_upload(self): 2224 self.close_future = Future() 2225 stream = self.connect(b"/close_detection", connection_close=False) 2226 stream.close() 2227 yield self.close_future 2228 2229 2230# Each method in this handler returns a yieldable object and yields to the 2231# IOLoop so the future is not immediately ready. Ensure that the 2232# yieldables are respected and no method is called before the previous 2233# one has completed. 2234@stream_request_body 2235class BaseFlowControlHandler(RequestHandler): 2236 def initialize(self, test): 2237 self.test = test 2238 self.method = None 2239 self.methods = [] 2240 2241 @contextlib.contextmanager 2242 def in_method(self, method): 2243 if self.method is not None: 2244 self.test.fail("entered method %s while in %s" % 2245 (method, self.method)) 2246 self.method = method 2247 self.methods.append(method) 2248 try: 2249 yield 2250 finally: 2251 self.method = None 2252 2253 @gen.coroutine 2254 def prepare(self): 2255 # Note that asynchronous prepare() does not block data_received, 2256 # so we don't use in_method here. 2257 self.methods.append('prepare') 2258 yield gen.moment 2259 2260 @gen.coroutine 2261 def post(self): 2262 with self.in_method('post'): 2263 yield gen.moment 2264 self.write(dict(methods=self.methods)) 2265 2266 2267class BaseStreamingRequestFlowControlTest(object): 2268 def get_httpserver_options(self): 2269 # Use a small chunk size so flow control is relevant even though 2270 # all the data arrives at once. 2271 return dict(chunk_size=10, decompress_request=True) 2272 2273 def get_http_client(self): 2274 # simple_httpclient only: curl doesn't support body_producer. 2275 return SimpleAsyncHTTPClient() 2276 2277 # Test all the slightly different code paths for fixed, chunked, etc bodies. 2278 def test_flow_control_fixed_body(self): 2279 response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz', 2280 method='POST') 2281 response.rethrow() 2282 self.assertEqual(json_decode(response.body), 2283 dict(methods=['prepare', 'data_received', 2284 'data_received', 'data_received', 2285 'post'])) 2286 2287 def test_flow_control_chunked_body(self): 2288 chunks = [b'abcd', b'efgh', b'ijkl'] 2289 2290 @gen.coroutine 2291 def body_producer(write): 2292 for i in chunks: 2293 yield write(i) 2294 response = self.fetch('/', body_producer=body_producer, method='POST') 2295 response.rethrow() 2296 self.assertEqual(json_decode(response.body), 2297 dict(methods=['prepare', 'data_received', 2298 'data_received', 'data_received', 2299 'post'])) 2300 2301 def test_flow_control_compressed_body(self): 2302 bytesio = BytesIO() 2303 gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio) 2304 gzip_file.write(b'abcdefghijklmnopqrstuvwxyz') 2305 gzip_file.close() 2306 compressed_body = bytesio.getvalue() 2307 response = self.fetch('/', body=compressed_body, method='POST', 2308 headers={'Content-Encoding': 'gzip'}) 2309 response.rethrow() 2310 self.assertEqual(json_decode(response.body), 2311 dict(methods=['prepare', 'data_received', 2312 'data_received', 'data_received', 2313 'post'])) 2314 2315 2316class DecoratedStreamingRequestFlowControlTest( 2317 BaseStreamingRequestFlowControlTest, 2318 WebTestCase): 2319 def get_handlers(self): 2320 class DecoratedFlowControlHandler(BaseFlowControlHandler): 2321 @gen.coroutine 2322 def data_received(self, data): 2323 with self.in_method('data_received'): 2324 yield gen.moment 2325 return [('/', DecoratedFlowControlHandler, dict(test=self))] 2326 2327 2328@skipBefore35 2329class NativeStreamingRequestFlowControlTest( 2330 BaseStreamingRequestFlowControlTest, 2331 WebTestCase): 2332 def get_handlers(self): 2333 class NativeFlowControlHandler(BaseFlowControlHandler): 2334 data_received = exec_test(globals(), locals(), """ 2335 async def data_received(self, data): 2336 with self.in_method('data_received'): 2337 import asyncio 2338 await asyncio.sleep(0) 2339 """)["data_received"] 2340 return [('/', NativeFlowControlHandler, dict(test=self))] 2341 2342 2343@wsgi_safe 2344class IncorrectContentLengthTest(SimpleHandlerTestCase): 2345 def get_handlers(self): 2346 test = self 2347 self.server_error = None 2348 2349 # Manually set a content-length that doesn't match the actual content. 2350 class TooHigh(RequestHandler): 2351 def get(self): 2352 self.set_header("Content-Length", "42") 2353 try: 2354 self.finish("ok") 2355 except Exception as e: 2356 test.server_error = e 2357 raise 2358 2359 class TooLow(RequestHandler): 2360 def get(self): 2361 self.set_header("Content-Length", "2") 2362 try: 2363 self.finish("hello") 2364 except Exception as e: 2365 test.server_error = e 2366 raise 2367 2368 return [('/high', TooHigh), 2369 ('/low', TooLow)] 2370 2371 def test_content_length_too_high(self): 2372 # When the content-length is too high, the connection is simply 2373 # closed without completing the response. An error is logged on 2374 # the server. 2375 with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"): 2376 with ExpectLog(gen_log, 2377 "(Cannot send error response after headers written" 2378 "|Failed to flush partial response)"): 2379 with self.assertRaises(HTTPClientError): 2380 self.fetch("/high", raise_error=True) 2381 self.assertEqual(str(self.server_error), 2382 "Tried to write 40 bytes less than Content-Length") 2383 2384 def test_content_length_too_low(self): 2385 # When the content-length is too low, the connection is closed 2386 # without writing the last chunk, so the client never sees the request 2387 # complete (which would be a framing error). 2388 with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"): 2389 with ExpectLog(gen_log, 2390 "(Cannot send error response after headers written" 2391 "|Failed to flush partial response)"): 2392 with self.assertRaises(HTTPClientError): 2393 self.fetch("/low", raise_error=True) 2394 self.assertEqual(str(self.server_error), 2395 "Tried to write more data than Content-Length") 2396 2397 2398class ClientCloseTest(SimpleHandlerTestCase): 2399 class Handler(RequestHandler): 2400 def get(self): 2401 if self.request.version.startswith('HTTP/1'): 2402 # Simulate a connection closed by the client during 2403 # request processing. The client will see an error, but the 2404 # server should respond gracefully (without logging errors 2405 # because we were unable to write out as many bytes as 2406 # Content-Length said we would) 2407 self.request.connection.stream.close() 2408 self.write('hello') 2409 else: 2410 # TODO: add a HTTP2-compatible version of this test. 2411 self.write('requires HTTP/1.x') 2412 2413 def test_client_close(self): 2414 with self.assertRaises((HTTPClientError, unittest.SkipTest)): 2415 response = self.fetch('/', raise_error=True) 2416 if response.body == b'requires HTTP/1.x': 2417 self.skipTest('requires HTTP/1.x') 2418 self.assertEqual(response.code, 599) 2419 2420 2421class SignedValueTest(unittest.TestCase): 2422 SECRET = "It's a secret to everybody" 2423 SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"} 2424 2425 def past(self): 2426 return self.present() - 86400 * 32 2427 2428 def present(self): 2429 return 1300000000 2430 2431 def test_known_values(self): 2432 signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value", 2433 version=1, clock=self.present) 2434 self.assertEqual( 2435 signed_v1, 2436 b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f") 2437 2438 signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value", 2439 version=2, clock=self.present) 2440 self.assertEqual( 2441 signed_v2, 2442 b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|" 2443 b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152") 2444 2445 signed_default = create_signed_value(SignedValueTest.SECRET, 2446 "key", "value", clock=self.present) 2447 self.assertEqual(signed_default, signed_v2) 2448 2449 decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key", 2450 signed_v1, min_version=1, 2451 clock=self.present) 2452 self.assertEqual(decoded_v1, b"value") 2453 2454 decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key", 2455 signed_v2, min_version=2, 2456 clock=self.present) 2457 self.assertEqual(decoded_v2, b"value") 2458 2459 def test_name_swap(self): 2460 signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value", 2461 clock=self.present) 2462 signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value", 2463 clock=self.present) 2464 # Try decoding each string with the other's "name" 2465 decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1, 2466 clock=self.present) 2467 self.assertIs(decoded1, None) 2468 decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2, 2469 clock=self.present) 2470 self.assertIs(decoded2, None) 2471 2472 def test_expired(self): 2473 signed = create_signed_value(SignedValueTest.SECRET, "key1", "value", 2474 clock=self.past) 2475 decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1", 2476 signed, clock=self.past) 2477 self.assertEqual(decoded_past, b"value") 2478 decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1", 2479 signed, clock=self.present) 2480 self.assertIs(decoded_present, None) 2481 2482 def test_payload_tampering(self): 2483 # These cookies are variants of the one in test_known_values. 2484 sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152" 2485 2486 def validate(prefix): 2487 return (b'value' == 2488 decode_signed_value(SignedValueTest.SECRET, "key", 2489 prefix + sig, clock=self.present)) 2490 self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|")) 2491 # Change key version 2492 self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|")) 2493 # length mismatch (field too short) 2494 self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|")) 2495 # length mismatch (field too long) 2496 self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|")) 2497 2498 def test_signature_tampering(self): 2499 prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|" 2500 2501 def validate(sig): 2502 return (b'value' == 2503 decode_signed_value(SignedValueTest.SECRET, "key", 2504 prefix + sig, clock=self.present)) 2505 self.assertTrue(validate( 2506 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")) 2507 # All zeros 2508 self.assertFalse(validate("0" * 32)) 2509 # Change one character 2510 self.assertFalse(validate( 2511 "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")) 2512 # Change another character 2513 self.assertFalse(validate( 2514 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153")) 2515 # Truncate 2516 self.assertFalse(validate( 2517 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15")) 2518 # Lengthen 2519 self.assertFalse(validate( 2520 "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538")) 2521 2522 def test_non_ascii(self): 2523 value = b"\xe9" 2524 signed = create_signed_value(SignedValueTest.SECRET, "key", value, 2525 clock=self.present) 2526 decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed, 2527 clock=self.present) 2528 self.assertEqual(value, decoded) 2529 2530 def test_key_versioning_read_write_default_key(self): 2531 value = b"\xe9" 2532 signed = create_signed_value(SignedValueTest.SECRET_DICT, 2533 "key", value, clock=self.present, 2534 key_version=0) 2535 decoded = decode_signed_value(SignedValueTest.SECRET_DICT, 2536 "key", signed, clock=self.present) 2537 self.assertEqual(value, decoded) 2538 2539 def test_key_versioning_read_write_non_default_key(self): 2540 value = b"\xe9" 2541 signed = create_signed_value(SignedValueTest.SECRET_DICT, 2542 "key", value, clock=self.present, 2543 key_version=1) 2544 decoded = decode_signed_value(SignedValueTest.SECRET_DICT, 2545 "key", signed, clock=self.present) 2546 self.assertEqual(value, decoded) 2547 2548 def test_key_versioning_invalid_key(self): 2549 value = b"\xe9" 2550 signed = create_signed_value(SignedValueTest.SECRET_DICT, 2551 "key", value, clock=self.present, 2552 key_version=0) 2553 newkeys = SignedValueTest.SECRET_DICT.copy() 2554 newkeys.pop(0) 2555 decoded = decode_signed_value(newkeys, 2556 "key", signed, clock=self.present) 2557 self.assertEqual(None, decoded) 2558 2559 def test_key_version_retrieval(self): 2560 value = b"\xe9" 2561 signed = create_signed_value(SignedValueTest.SECRET_DICT, 2562 "key", value, clock=self.present, 2563 key_version=1) 2564 key_version = get_signature_key_version(signed) 2565 self.assertEqual(1, key_version) 2566 2567 2568@wsgi_safe 2569class XSRFTest(SimpleHandlerTestCase): 2570 class Handler(RequestHandler): 2571 def get(self): 2572 version = int(self.get_argument("version", "2")) 2573 # This would be a bad idea in a real app, but in this test 2574 # it's fine. 2575 self.settings["xsrf_cookie_version"] = version 2576 self.write(self.xsrf_token) 2577 2578 def post(self): 2579 self.write("ok") 2580 2581 def get_app_kwargs(self): 2582 return dict(xsrf_cookies=True) 2583 2584 def setUp(self): 2585 super(XSRFTest, self).setUp() 2586 self.xsrf_token = self.get_token() 2587 2588 def get_token(self, old_token=None, version=None): 2589 if old_token is not None: 2590 headers = self.cookie_headers(old_token) 2591 else: 2592 headers = None 2593 response = self.fetch( 2594 "/" if version is None else ("/?version=%d" % version), 2595 headers=headers) 2596 response.rethrow() 2597 return native_str(response.body) 2598 2599 def cookie_headers(self, token=None): 2600 if token is None: 2601 token = self.xsrf_token 2602 return {"Cookie": "_xsrf=" + token} 2603 2604 def test_xsrf_fail_no_token(self): 2605 with ExpectLog(gen_log, ".*'_xsrf' argument missing"): 2606 response = self.fetch("/", method="POST", body=b"") 2607 self.assertEqual(response.code, 403) 2608 2609 def test_xsrf_fail_body_no_cookie(self): 2610 with ExpectLog(gen_log, ".*XSRF cookie does not match POST"): 2611 response = self.fetch( 2612 "/", method="POST", 2613 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token))) 2614 self.assertEqual(response.code, 403) 2615 2616 def test_xsrf_fail_argument_invalid_format(self): 2617 with ExpectLog(gen_log, ".*'_xsrf' argument has invalid format"): 2618 response = self.fetch( 2619 "/", method="POST", 2620 headers=self.cookie_headers(), 2621 body=urllib_parse.urlencode(dict(_xsrf='3|'))) 2622 self.assertEqual(response.code, 403) 2623 2624 def test_xsrf_fail_cookie_invalid_format(self): 2625 with ExpectLog(gen_log, ".*XSRF cookie does not match POST"): 2626 response = self.fetch( 2627 "/", method="POST", 2628 headers=self.cookie_headers(token='3|'), 2629 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token))) 2630 self.assertEqual(response.code, 403) 2631 2632 def test_xsrf_fail_cookie_no_body(self): 2633 with ExpectLog(gen_log, ".*'_xsrf' argument missing"): 2634 response = self.fetch( 2635 "/", method="POST", body=b"", 2636 headers=self.cookie_headers()) 2637 self.assertEqual(response.code, 403) 2638 2639 def test_xsrf_success_short_token(self): 2640 response = self.fetch( 2641 "/", method="POST", 2642 body=urllib_parse.urlencode(dict(_xsrf='deadbeef')), 2643 headers=self.cookie_headers(token='deadbeef')) 2644 self.assertEqual(response.code, 200) 2645 2646 def test_xsrf_success_non_hex_token(self): 2647 response = self.fetch( 2648 "/", method="POST", 2649 body=urllib_parse.urlencode(dict(_xsrf='xoxo')), 2650 headers=self.cookie_headers(token='xoxo')) 2651 self.assertEqual(response.code, 200) 2652 2653 def test_xsrf_success_post_body(self): 2654 response = self.fetch( 2655 "/", method="POST", 2656 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)), 2657 headers=self.cookie_headers()) 2658 self.assertEqual(response.code, 200) 2659 2660 def test_xsrf_success_query_string(self): 2661 response = self.fetch( 2662 "/?" + urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)), 2663 method="POST", body=b"", 2664 headers=self.cookie_headers()) 2665 self.assertEqual(response.code, 200) 2666 2667 def test_xsrf_success_header(self): 2668 response = self.fetch("/", method="POST", body=b"", 2669 headers=dict({"X-Xsrftoken": self.xsrf_token}, # type: ignore 2670 **self.cookie_headers())) 2671 self.assertEqual(response.code, 200) 2672 2673 def test_distinct_tokens(self): 2674 # Every request gets a distinct token. 2675 NUM_TOKENS = 10 2676 tokens = set() 2677 for i in range(NUM_TOKENS): 2678 tokens.add(self.get_token()) 2679 self.assertEqual(len(tokens), NUM_TOKENS) 2680 2681 def test_cross_user(self): 2682 token2 = self.get_token() 2683 # Each token can be used to authenticate its own request. 2684 for token in (self.xsrf_token, token2): 2685 response = self.fetch( 2686 "/", method="POST", 2687 body=urllib_parse.urlencode(dict(_xsrf=token)), 2688 headers=self.cookie_headers(token)) 2689 self.assertEqual(response.code, 200) 2690 # Sending one in the cookie and the other in the body is not allowed. 2691 for cookie_token, body_token in ((self.xsrf_token, token2), 2692 (token2, self.xsrf_token)): 2693 with ExpectLog(gen_log, '.*XSRF cookie does not match POST'): 2694 response = self.fetch( 2695 "/", method="POST", 2696 body=urllib_parse.urlencode(dict(_xsrf=body_token)), 2697 headers=self.cookie_headers(cookie_token)) 2698 self.assertEqual(response.code, 403) 2699 2700 def test_refresh_token(self): 2701 token = self.xsrf_token 2702 tokens_seen = set([token]) 2703 # A user's token is stable over time. Refreshing the page in one tab 2704 # might update the cookie while an older tab still has the old cookie 2705 # in its DOM. Simulate this scenario by passing a constant token 2706 # in the body and re-querying for the token. 2707 for i in range(5): 2708 token = self.get_token(token) 2709 # Tokens are encoded uniquely each time 2710 tokens_seen.add(token) 2711 response = self.fetch( 2712 "/", method="POST", 2713 body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)), 2714 headers=self.cookie_headers(token)) 2715 self.assertEqual(response.code, 200) 2716 self.assertEqual(len(tokens_seen), 6) 2717 2718 def test_versioning(self): 2719 # Version 1 still produces distinct tokens per request. 2720 self.assertNotEqual(self.get_token(version=1), 2721 self.get_token(version=1)) 2722 2723 # Refreshed v1 tokens are all identical. 2724 v1_token = self.get_token(version=1) 2725 for i in range(5): 2726 self.assertEqual(self.get_token(v1_token, version=1), v1_token) 2727 2728 # Upgrade to a v2 version of the same token 2729 v2_token = self.get_token(v1_token) 2730 self.assertNotEqual(v1_token, v2_token) 2731 # Each v1 token can map to many v2 tokens. 2732 self.assertNotEqual(v2_token, self.get_token(v1_token)) 2733 2734 # The tokens are cross-compatible. 2735 for cookie_token, body_token in ((v1_token, v2_token), 2736 (v2_token, v1_token)): 2737 response = self.fetch( 2738 "/", method="POST", 2739 body=urllib_parse.urlencode(dict(_xsrf=body_token)), 2740 headers=self.cookie_headers(cookie_token)) 2741 self.assertEqual(response.code, 200) 2742 2743 2744@wsgi_safe 2745class XSRFCookieKwargsTest(SimpleHandlerTestCase): 2746 class Handler(RequestHandler): 2747 def get(self): 2748 self.write(self.xsrf_token) 2749 2750 def get_app_kwargs(self): 2751 return dict(xsrf_cookies=True, 2752 xsrf_cookie_kwargs=dict(httponly=True)) 2753 2754 def test_xsrf_httponly(self): 2755 response = self.fetch("/") 2756 self.assertIn('httponly;', response.headers['Set-Cookie'].lower()) 2757 2758 2759@wsgi_safe 2760class FinishExceptionTest(SimpleHandlerTestCase): 2761 class Handler(RequestHandler): 2762 def get(self): 2763 self.set_status(401) 2764 self.set_header('WWW-Authenticate', 'Basic realm="something"') 2765 if self.get_argument('finish_value', ''): 2766 raise Finish('authentication required') 2767 else: 2768 self.write('authentication required') 2769 raise Finish() 2770 2771 def test_finish_exception(self): 2772 for u in ['/', '/?finish_value=1']: 2773 response = self.fetch(u) 2774 self.assertEqual(response.code, 401) 2775 self.assertEqual('Basic realm="something"', 2776 response.headers.get('WWW-Authenticate')) 2777 self.assertEqual(b'authentication required', response.body) 2778 2779 2780@wsgi_safe 2781class DecoratorTest(WebTestCase): 2782 def get_handlers(self): 2783 class RemoveSlashHandler(RequestHandler): 2784 @removeslash 2785 def get(self): 2786 pass 2787 2788 class AddSlashHandler(RequestHandler): 2789 @addslash 2790 def get(self): 2791 pass 2792 2793 return [("/removeslash/", RemoveSlashHandler), 2794 ("/addslash", AddSlashHandler), 2795 ] 2796 2797 def test_removeslash(self): 2798 response = self.fetch("/removeslash/", follow_redirects=False) 2799 self.assertEqual(response.code, 301) 2800 self.assertEqual(response.headers['Location'], "/removeslash") 2801 2802 response = self.fetch("/removeslash/?foo=bar", follow_redirects=False) 2803 self.assertEqual(response.code, 301) 2804 self.assertEqual(response.headers['Location'], "/removeslash?foo=bar") 2805 2806 def test_addslash(self): 2807 response = self.fetch("/addslash", follow_redirects=False) 2808 self.assertEqual(response.code, 301) 2809 self.assertEqual(response.headers['Location'], "/addslash/") 2810 2811 response = self.fetch("/addslash?foo=bar", follow_redirects=False) 2812 self.assertEqual(response.code, 301) 2813 self.assertEqual(response.headers['Location'], "/addslash/?foo=bar") 2814 2815 2816@wsgi_safe 2817class CacheTest(WebTestCase): 2818 def get_handlers(self): 2819 class EtagHandler(RequestHandler): 2820 def get(self, computed_etag): 2821 self.write(computed_etag) 2822 2823 def compute_etag(self): 2824 return self._write_buffer[0] 2825 2826 return [ 2827 ('/etag/(.*)', EtagHandler) 2828 ] 2829 2830 def test_wildcard_etag(self): 2831 computed_etag = '"xyzzy"' 2832 etags = '*' 2833 self._test_etag(computed_etag, etags, 304) 2834 2835 def test_strong_etag_match(self): 2836 computed_etag = '"xyzzy"' 2837 etags = '"xyzzy"' 2838 self._test_etag(computed_etag, etags, 304) 2839 2840 def test_multiple_strong_etag_match(self): 2841 computed_etag = '"xyzzy1"' 2842 etags = '"xyzzy1", "xyzzy2"' 2843 self._test_etag(computed_etag, etags, 304) 2844 2845 def test_strong_etag_not_match(self): 2846 computed_etag = '"xyzzy"' 2847 etags = '"xyzzy1"' 2848 self._test_etag(computed_etag, etags, 200) 2849 2850 def test_multiple_strong_etag_not_match(self): 2851 computed_etag = '"xyzzy"' 2852 etags = '"xyzzy1", "xyzzy2"' 2853 self._test_etag(computed_etag, etags, 200) 2854 2855 def test_weak_etag_match(self): 2856 computed_etag = '"xyzzy1"' 2857 etags = 'W/"xyzzy1"' 2858 self._test_etag(computed_etag, etags, 304) 2859 2860 def test_multiple_weak_etag_match(self): 2861 computed_etag = '"xyzzy2"' 2862 etags = 'W/"xyzzy1", W/"xyzzy2"' 2863 self._test_etag(computed_etag, etags, 304) 2864 2865 def test_weak_etag_not_match(self): 2866 computed_etag = '"xyzzy2"' 2867 etags = 'W/"xyzzy1"' 2868 self._test_etag(computed_etag, etags, 200) 2869 2870 def test_multiple_weak_etag_not_match(self): 2871 computed_etag = '"xyzzy3"' 2872 etags = 'W/"xyzzy1", W/"xyzzy2"' 2873 self._test_etag(computed_etag, etags, 200) 2874 2875 def _test_etag(self, computed_etag, etags, status_code): 2876 response = self.fetch( 2877 '/etag/' + computed_etag, 2878 headers={'If-None-Match': etags} 2879 ) 2880 self.assertEqual(response.code, status_code) 2881 2882 2883@wsgi_safe 2884class RequestSummaryTest(SimpleHandlerTestCase): 2885 class Handler(RequestHandler): 2886 def get(self): 2887 # remote_ip is optional, although it's set by 2888 # both HTTPServer and WSGIAdapter. 2889 # Clobber it to make sure it doesn't break logging. 2890 self.request.remote_ip = None 2891 self.finish(self._request_summary()) 2892 2893 def test_missing_remote_ip(self): 2894 resp = self.fetch("/") 2895 self.assertEqual(resp.body, b"GET / (None)") 2896 2897 2898class HTTPErrorTest(unittest.TestCase): 2899 def test_copy(self): 2900 e = HTTPError(403, reason="Go away") 2901 e2 = copy.copy(e) 2902 self.assertIsNot(e, e2) 2903 self.assertEqual(e.status_code, e2.status_code) 2904 self.assertEqual(e.reason, e2.reason) 2905 2906 2907class ApplicationTest(AsyncTestCase): 2908 def test_listen(self): 2909 app = Application([]) 2910 server = app.listen(0, address='127.0.0.1') 2911 server.stop() 2912 2913 2914class URLSpecReverseTest(unittest.TestCase): 2915 def test_reverse(self): 2916 self.assertEqual('/favicon.ico', url(r'/favicon\.ico', None).reverse()) 2917 self.assertEqual('/favicon.ico', url(r'^/favicon\.ico$', None).reverse()) 2918 2919 def test_non_reversible(self): 2920 # URLSpecs are non-reversible if they include non-constant 2921 # regex features outside capturing groups. Currently, this is 2922 # only strictly enforced for backslash-escaped character 2923 # classes. 2924 paths = [ 2925 r'^/api/v\d+/foo/(\w+)$', 2926 ] 2927 for path in paths: 2928 # A URLSpec can still be created even if it cannot be reversed. 2929 url_spec = url(path, None) 2930 try: 2931 result = url_spec.reverse() 2932 self.fail("did not get expected exception when reversing %s. " 2933 "result: %s" % (path, result)) 2934 except ValueError: 2935 pass 2936 2937 def test_reverse_arguments(self): 2938 self.assertEqual('/api/v1/foo/bar', 2939 url(r'^/api/v1/foo/(\w+)$', None).reverse('bar')) 2940 2941 2942class RedirectHandlerTest(WebTestCase): 2943 def get_handlers(self): 2944 return [ 2945 ('/src', WebRedirectHandler, {'url': '/dst'}), 2946 ('/src2', WebRedirectHandler, {'url': '/dst2?foo=bar'}), 2947 (r'/(.*?)/(.*?)/(.*)', WebRedirectHandler, {'url': '/{1}/{0}/{2}'})] 2948 2949 def test_basic_redirect(self): 2950 response = self.fetch('/src', follow_redirects=False) 2951 self.assertEqual(response.code, 301) 2952 self.assertEqual(response.headers['Location'], '/dst') 2953 2954 def test_redirect_with_argument(self): 2955 response = self.fetch('/src?foo=bar', follow_redirects=False) 2956 self.assertEqual(response.code, 301) 2957 self.assertEqual(response.headers['Location'], '/dst?foo=bar') 2958 2959 def test_redirect_with_appending_argument(self): 2960 response = self.fetch('/src2?foo2=bar2', follow_redirects=False) 2961 self.assertEqual(response.code, 301) 2962 self.assertEqual(response.headers['Location'], '/dst2?foo=bar&foo2=bar2') 2963 2964 def test_redirect_pattern(self): 2965 response = self.fetch('/a/b/c', follow_redirects=False) 2966 self.assertEqual(response.code, 301) 2967 self.assertEqual(response.headers['Location'], '/b/a/c') 2968