1from __future__ import absolute_import, division, print_function
2
3from tornado.concurrent import Future
4from tornado import gen
5from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring  # noqa: E501
6from tornado.httpclient import HTTPClientError
7from tornado.httputil import format_timestamp
8from tornado.ioloop import IOLoop
9from tornado.iostream import IOStream
10from tornado import locale
11from tornado.locks import Event
12from tornado.log import app_log, gen_log
13from tornado.simple_httpclient import SimpleAsyncHTTPClient
14from tornado.template import DictLoader
15from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
16from tornado.test.util import unittest, skipBefore35, exec_test, ignore_deprecation
17from tornado.util import ObjectDict, unicode_type, timedelta_to_seconds, PY3
18from tornado.web import (
19    Application, RequestHandler, StaticFileHandler, RedirectHandler as WebRedirectHandler,
20    HTTPError, MissingArgumentError, ErrorHandler, authenticated, asynchronous, url,
21    _create_signature_v1, create_signed_value, decode_signed_value, get_signature_key_version,
22    UIModule, Finish, stream_request_body, removeslash, addslash, GZipContentEncoding,
23)
24
25import binascii
26import contextlib
27import copy
28import datetime
29import email.utils
30import gzip
31from io import BytesIO
32import itertools
33import logging
34import os
35import re
36import socket
37
38if PY3:
39    import urllib.parse as urllib_parse  # py3
40else:
41    import urllib as urllib_parse  # py2
42
43wsgi_safe_tests = []
44
45
46def relpath(*a):
47    return os.path.join(os.path.dirname(__file__), *a)
48
49
50def wsgi_safe(cls):
51    wsgi_safe_tests.append(cls)
52    return cls
53
54
55class WebTestCase(AsyncHTTPTestCase):
56    """Base class for web tests that also supports WSGI mode.
57
58    Override get_handlers and get_app_kwargs instead of get_app.
59    Append to wsgi_safe to have it run in wsgi_test as well.
60    """
61    def get_app(self):
62        self.app = Application(self.get_handlers(), **self.get_app_kwargs())
63        return self.app
64
65    def get_handlers(self):
66        raise NotImplementedError()
67
68    def get_app_kwargs(self):
69        return {}
70
71
72class SimpleHandlerTestCase(WebTestCase):
73    """Simplified base class for tests that work with a single handler class.
74
75    To use, define a nested class named ``Handler``.
76    """
77    def get_handlers(self):
78        return [('/', self.Handler)]
79
80
81class HelloHandler(RequestHandler):
82    def get(self):
83        self.write('hello')
84
85
86class CookieTestRequestHandler(RequestHandler):
87    # stub out enough methods to make the secure_cookie functions work
88    def __init__(self, cookie_secret='0123456789', key_version=None):
89        # don't call super.__init__
90        self._cookies = {}
91        if key_version is None:
92            self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret))
93        else:
94            self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret,
95                                                        key_version=key_version))
96
97    def get_cookie(self, name):
98        return self._cookies.get(name)
99
100    def set_cookie(self, name, value, expires_days=None):
101        self._cookies[name] = value
102
103
104# See SignedValueTest below for more.
105class SecureCookieV1Test(unittest.TestCase):
106    def test_round_trip(self):
107        handler = CookieTestRequestHandler()
108        handler.set_secure_cookie('foo', b'bar', version=1)
109        self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
110                         b'bar')
111
112    def test_cookie_tampering_future_timestamp(self):
113        handler = CookieTestRequestHandler()
114        # this string base64-encodes to '12345678'
115        handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
116                                  version=1)
117        cookie = handler._cookies['foo']
118        match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
119        self.assertTrue(match)
120        timestamp = match.group(1)
121        sig = match.group(2)
122        self.assertEqual(
123            _create_signature_v1(handler.application.settings["cookie_secret"],
124                                 'foo', '12345678', timestamp),
125            sig)
126        # shifting digits from payload to timestamp doesn't alter signature
127        # (this is not desirable behavior, just confirming that that's how it
128        # works)
129        self.assertEqual(
130            _create_signature_v1(handler.application.settings["cookie_secret"],
131                                 'foo', '1234', b'5678' + timestamp),
132            sig)
133        # tamper with the cookie
134        handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
135            to_basestring(timestamp), to_basestring(sig)))
136        # it gets rejected
137        with ExpectLog(gen_log, "Cookie timestamp in future"):
138            self.assertTrue(
139                handler.get_secure_cookie('foo', min_version=1) is None)
140
141    def test_arbitrary_bytes(self):
142        # Secure cookies accept arbitrary data (which is base64 encoded).
143        # Note that normal cookies accept only a subset of ascii.
144        handler = CookieTestRequestHandler()
145        handler.set_secure_cookie('foo', b'\xe9', version=1)
146        self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
147
148
149# See SignedValueTest below for more.
150class SecureCookieV2Test(unittest.TestCase):
151    KEY_VERSIONS = {
152        0: 'ajklasdf0ojaisdf',
153        1: 'aslkjasaolwkjsdf'
154    }
155
156    def test_round_trip(self):
157        handler = CookieTestRequestHandler()
158        handler.set_secure_cookie('foo', b'bar', version=2)
159        self.assertEqual(handler.get_secure_cookie('foo', min_version=2), b'bar')
160
161    def test_key_version_roundtrip(self):
162        handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
163                                           key_version=0)
164        handler.set_secure_cookie('foo', b'bar')
165        self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
166
167    def test_key_version_roundtrip_differing_version(self):
168        handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
169                                           key_version=1)
170        handler.set_secure_cookie('foo', b'bar')
171        self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
172
173    def test_key_version_increment_version(self):
174        handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
175                                           key_version=0)
176        handler.set_secure_cookie('foo', b'bar')
177        new_handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
178                                               key_version=1)
179        new_handler._cookies = handler._cookies
180        self.assertEqual(new_handler.get_secure_cookie('foo'), b'bar')
181
182    def test_key_version_invalidate_version(self):
183        handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
184                                           key_version=0)
185        handler.set_secure_cookie('foo', b'bar')
186        new_key_versions = self.KEY_VERSIONS.copy()
187        new_key_versions.pop(0)
188        new_handler = CookieTestRequestHandler(cookie_secret=new_key_versions,
189                                               key_version=1)
190        new_handler._cookies = handler._cookies
191        self.assertEqual(new_handler.get_secure_cookie('foo'), None)
192
193
194class FinalReturnTest(WebTestCase):
195    def get_handlers(self):
196        test = self
197
198        class FinishHandler(RequestHandler):
199            @gen.coroutine
200            def get(self):
201                test.final_return = self.finish()
202                yield test.final_return
203
204        class RenderHandler(RequestHandler):
205            def create_template_loader(self, path):
206                return DictLoader({'foo.html': 'hi'})
207
208            @gen.coroutine
209            def get(self):
210                test.final_return = self.render('foo.html')
211
212        return [("/finish", FinishHandler),
213                ("/render", RenderHandler)]
214
215    def get_app_kwargs(self):
216        return dict(template_path='FinalReturnTest')
217
218    def test_finish_method_return_future(self):
219        response = self.fetch(self.get_url('/finish'))
220        self.assertEqual(response.code, 200)
221        self.assertIsInstance(self.final_return, Future)
222        self.assertTrue(self.final_return.done())
223
224    def test_render_method_return_future(self):
225        response = self.fetch(self.get_url('/render'))
226        self.assertEqual(response.code, 200)
227        self.assertIsInstance(self.final_return, Future)
228
229
230class CookieTest(WebTestCase):
231    def get_handlers(self):
232        class SetCookieHandler(RequestHandler):
233            def get(self):
234                # Try setting cookies with different argument types
235                # to ensure that everything gets encoded correctly
236                self.set_cookie("str", "asdf")
237                self.set_cookie("unicode", u"qwer")
238                self.set_cookie("bytes", b"zxcv")
239
240        class GetCookieHandler(RequestHandler):
241            def get(self):
242                self.write(self.get_cookie("foo", "default"))
243
244        class SetCookieDomainHandler(RequestHandler):
245            def get(self):
246                # unicode domain and path arguments shouldn't break things
247                # either (see bug #285)
248                self.set_cookie("unicode_args", "blah", domain=u"foo.com",
249                                path=u"/foo")
250
251        class SetCookieSpecialCharHandler(RequestHandler):
252            def get(self):
253                self.set_cookie("equals", "a=b")
254                self.set_cookie("semicolon", "a;b")
255                self.set_cookie("quote", 'a"b')
256
257        class SetCookieOverwriteHandler(RequestHandler):
258            def get(self):
259                self.set_cookie("a", "b", domain="example.com")
260                self.set_cookie("c", "d", domain="example.com")
261                # A second call with the same name clobbers the first.
262                # Attributes from the first call are not carried over.
263                self.set_cookie("a", "e")
264
265        class SetCookieMaxAgeHandler(RequestHandler):
266            def get(self):
267                self.set_cookie("foo", "bar", max_age=10)
268
269        class SetCookieExpiresDaysHandler(RequestHandler):
270            def get(self):
271                self.set_cookie("foo", "bar", expires_days=10)
272
273        class SetCookieFalsyFlags(RequestHandler):
274            def get(self):
275                self.set_cookie("a", "1", secure=True)
276                self.set_cookie("b", "1", secure=False)
277                self.set_cookie("c", "1", httponly=True)
278                self.set_cookie("d", "1", httponly=False)
279
280        return [("/set", SetCookieHandler),
281                ("/get", GetCookieHandler),
282                ("/set_domain", SetCookieDomainHandler),
283                ("/special_char", SetCookieSpecialCharHandler),
284                ("/set_overwrite", SetCookieOverwriteHandler),
285                ("/set_max_age", SetCookieMaxAgeHandler),
286                ("/set_expires_days", SetCookieExpiresDaysHandler),
287                ("/set_falsy_flags", SetCookieFalsyFlags)
288                ]
289
290    def test_set_cookie(self):
291        response = self.fetch("/set")
292        self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
293                         ["bytes=zxcv; Path=/",
294                          "str=asdf; Path=/",
295                          "unicode=qwer; Path=/",
296                          ])
297
298    def test_get_cookie(self):
299        response = self.fetch("/get", headers={"Cookie": "foo=bar"})
300        self.assertEqual(response.body, b"bar")
301
302        response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
303        self.assertEqual(response.body, b"bar")
304
305        response = self.fetch("/get", headers={"Cookie": "/=exception;"})
306        self.assertEqual(response.body, b"default")
307
308    def test_set_cookie_domain(self):
309        response = self.fetch("/set_domain")
310        self.assertEqual(response.headers.get_list("Set-Cookie"),
311                         ["unicode_args=blah; Domain=foo.com; Path=/foo"])
312
313    def test_cookie_special_char(self):
314        response = self.fetch("/special_char")
315        headers = sorted(response.headers.get_list("Set-Cookie"))
316        self.assertEqual(len(headers), 3)
317        self.assertEqual(headers[0], 'equals="a=b"; Path=/')
318        self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
319        # python 2.7 octal-escapes the semicolon; older versions leave it alone
320        self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
321                                       'semicolon="a\\073b"; Path=/'),
322                        headers[2])
323
324        data = [('foo=a=b', 'a=b'),
325                ('foo="a=b"', 'a=b'),
326                ('foo="a;b"', '"a'),  # even quoted, ";" is a delimiter
327                ('foo=a\\073b', 'a\\073b'),  # escapes only decoded in quotes
328                ('foo="a\\073b"', 'a;b'),
329                ('foo="a\\"b"', 'a"b'),
330                ]
331        for header, expected in data:
332            logging.debug("trying %r", header)
333            response = self.fetch("/get", headers={"Cookie": header})
334            self.assertEqual(response.body, utf8(expected))
335
336    def test_set_cookie_overwrite(self):
337        response = self.fetch("/set_overwrite")
338        headers = response.headers.get_list("Set-Cookie")
339        self.assertEqual(sorted(headers),
340                         ["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
341
342    def test_set_cookie_max_age(self):
343        response = self.fetch("/set_max_age")
344        headers = response.headers.get_list("Set-Cookie")
345        self.assertEqual(sorted(headers),
346                         ["foo=bar; Max-Age=10; Path=/"])
347
348    def test_set_cookie_expires_days(self):
349        response = self.fetch("/set_expires_days")
350        header = response.headers.get("Set-Cookie")
351        match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header)
352        self.assertIsNotNone(match)
353
354        expires = datetime.datetime.utcnow() + datetime.timedelta(days=10)
355        header_expires = datetime.datetime(
356            *email.utils.parsedate(match.groupdict()["expires"])[:6])
357        self.assertTrue(abs(timedelta_to_seconds(expires - header_expires)) < 10)
358
359    def test_set_cookie_false_flags(self):
360        response = self.fetch("/set_falsy_flags")
361        headers = sorted(response.headers.get_list("Set-Cookie"))
362        # The secure and httponly headers are capitalized in py35 and
363        # lowercase in older versions.
364        self.assertEqual(headers[0].lower(), 'a=1; path=/; secure')
365        self.assertEqual(headers[1].lower(), 'b=1; path=/')
366        self.assertEqual(headers[2].lower(), 'c=1; httponly; path=/')
367        self.assertEqual(headers[3].lower(), 'd=1; path=/')
368
369
370class AuthRedirectRequestHandler(RequestHandler):
371    def initialize(self, login_url):
372        self.login_url = login_url
373
374    def get_login_url(self):
375        return self.login_url
376
377    @authenticated
378    def get(self):
379        # we'll never actually get here because the test doesn't follow redirects
380        self.send_error(500)
381
382
383class AuthRedirectTest(WebTestCase):
384    def get_handlers(self):
385        return [('/relative', AuthRedirectRequestHandler,
386                 dict(login_url='/login')),
387                ('/absolute', AuthRedirectRequestHandler,
388                 dict(login_url='http://example.com/login'))]
389
390    def test_relative_auth_redirect(self):
391        response = self.fetch(self.get_url('/relative'),
392                              follow_redirects=False)
393        self.assertEqual(response.code, 302)
394        self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
395
396    def test_absolute_auth_redirect(self):
397        response = self.fetch(self.get_url('/absolute'),
398                              follow_redirects=False)
399        self.assertEqual(response.code, 302)
400        self.assertTrue(re.match(
401            'http://example.com/login\?next=http%3A%2F%2F127.0.0.1%3A[0-9]+%2Fabsolute',
402            response.headers['Location']), response.headers['Location'])
403
404
405class ConnectionCloseHandler(RequestHandler):
406    def initialize(self, test):
407        self.test = test
408
409    @gen.coroutine
410    def get(self):
411        self.test.on_handler_waiting()
412        never_finish = Event()
413        yield never_finish.wait()
414
415    def on_connection_close(self):
416        self.test.on_connection_close()
417
418
419class ConnectionCloseTest(WebTestCase):
420    def get_handlers(self):
421        return [('/', ConnectionCloseHandler, dict(test=self))]
422
423    def test_connection_close(self):
424        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
425        s.connect(("127.0.0.1", self.get_http_port()))
426        self.stream = IOStream(s)
427        self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
428        self.wait()
429
430    def on_handler_waiting(self):
431        logging.debug('handler waiting')
432        self.stream.close()
433
434    def on_connection_close(self):
435        logging.debug('connection closed')
436        self.stop()
437
438
439class EchoHandler(RequestHandler):
440    def get(self, *path_args):
441        # Type checks: web.py interfaces convert argument values to
442        # unicode strings (by default, but see also decode_argument).
443        # In httpserver.py (i.e. self.request.arguments), they're left
444        # as bytes.  Keys are always native strings.
445        for key in self.request.arguments:
446            if type(key) != str:
447                raise Exception("incorrect type for key: %r" % type(key))
448            for value in self.request.arguments[key]:
449                if type(value) != bytes:
450                    raise Exception("incorrect type for value: %r" %
451                                    type(value))
452            for value in self.get_arguments(key):
453                if type(value) != unicode_type:
454                    raise Exception("incorrect type for value: %r" %
455                                    type(value))
456        for arg in path_args:
457            if type(arg) != unicode_type:
458                raise Exception("incorrect type for path arg: %r" % type(arg))
459        self.write(dict(path=self.request.path,
460                        path_args=path_args,
461                        args=recursive_unicode(self.request.arguments)))
462
463
464class RequestEncodingTest(WebTestCase):
465    def get_handlers(self):
466        return [("/group/(.*)", EchoHandler),
467                ("/slashes/([^/]*)/([^/]*)", EchoHandler),
468                ]
469
470    def fetch_json(self, path):
471        return json_decode(self.fetch(path).body)
472
473    def test_group_question_mark(self):
474        # Ensure that url-encoded question marks are handled properly
475        self.assertEqual(self.fetch_json('/group/%3F'),
476                         dict(path='/group/%3F', path_args=['?'], args={}))
477        self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
478                         dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
479
480    def test_group_encoding(self):
481        # Path components and query arguments should be decoded the same way
482        self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
483                         {u"path": u"/group/%C3%A9",
484                          u"path_args": [u"\u00e9"],
485                          u"args": {u"arg": [u"\u00e9"]}})
486
487    def test_slashes(self):
488        # Slashes may be escaped to appear as a single "directory" in the path,
489        # but they are then unescaped when passed to the get() method.
490        self.assertEqual(self.fetch_json('/slashes/foo/bar'),
491                         dict(path="/slashes/foo/bar",
492                              path_args=["foo", "bar"],
493                              args={}))
494        self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
495                         dict(path="/slashes/a%2Fb/c%2Fd",
496                              path_args=["a/b", "c/d"],
497                              args={}))
498
499    def test_error(self):
500        # Percent signs (encoded as %25) should not mess up printf-style
501        # messages in logs
502        with ExpectLog(gen_log, ".*Invalid unicode"):
503            self.fetch("/group/?arg=%25%e9")
504
505
506class TypeCheckHandler(RequestHandler):
507    def prepare(self):
508        self.errors = {}
509
510        self.check_type('status', self.get_status(), int)
511
512        # get_argument is an exception from the general rule of using
513        # type str for non-body data mainly for historical reasons.
514        self.check_type('argument', self.get_argument('foo'), unicode_type)
515        self.check_type('cookie_key', list(self.cookies.keys())[0], str)
516        self.check_type('cookie_value', list(self.cookies.values())[0].value, str)
517
518        # Secure cookies return bytes because they can contain arbitrary
519        # data, but regular cookies are native strings.
520        if list(self.cookies.keys()) != ['asdf']:
521            raise Exception("unexpected values for cookie keys: %r" %
522                            self.cookies.keys())
523        self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes)
524        self.check_type('get_cookie', self.get_cookie('asdf'), str)
525
526        self.check_type('xsrf_token', self.xsrf_token, bytes)
527        self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
528
529        self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
530
531        self.check_type('request_summary', self._request_summary(), str)
532
533    def get(self, path_component):
534        # path_component uses type unicode instead of str for consistency
535        # with get_argument()
536        self.check_type('path_component', path_component, unicode_type)
537        self.write(self.errors)
538
539    def post(self, path_component):
540        self.check_type('path_component', path_component, unicode_type)
541        self.write(self.errors)
542
543    def check_type(self, name, obj, expected_type):
544        actual_type = type(obj)
545        if expected_type != actual_type:
546            self.errors[name] = "expected %s, got %s" % (expected_type,
547                                                         actual_type)
548
549
550class DecodeArgHandler(RequestHandler):
551    def decode_argument(self, value, name=None):
552        if type(value) != bytes:
553            raise Exception("unexpected type for value: %r" % type(value))
554        # use self.request.arguments directly to avoid recursion
555        if 'encoding' in self.request.arguments:
556            return value.decode(to_unicode(self.request.arguments['encoding'][0]))
557        else:
558            return value
559
560    def get(self, arg):
561        def describe(s):
562            if type(s) == bytes:
563                return ["bytes", native_str(binascii.b2a_hex(s))]
564            elif type(s) == unicode_type:
565                return ["unicode", s]
566            raise Exception("unknown type")
567        self.write({'path': describe(arg),
568                    'query': describe(self.get_argument("foo")),
569                    })
570
571
572class LinkifyHandler(RequestHandler):
573    def get(self):
574        self.render("linkify.html", message="http://example.com")
575
576
577class UIModuleResourceHandler(RequestHandler):
578    def get(self):
579        self.render("page.html", entries=[1, 2])
580
581
582class OptionalPathHandler(RequestHandler):
583    def get(self, path):
584        self.write({"path": path})
585
586
587class FlowControlHandler(RequestHandler):
588    # These writes are too small to demonstrate real flow control,
589    # but at least it shows that the callbacks get run.
590    with ignore_deprecation():
591        @asynchronous
592        def get(self):
593            self.write("1")
594            with ignore_deprecation():
595                self.flush(callback=self.step2)
596
597    def step2(self):
598        self.write("2")
599        with ignore_deprecation():
600            self.flush(callback=self.step3)
601
602    def step3(self):
603        self.write("3")
604        self.finish()
605
606
607class MultiHeaderHandler(RequestHandler):
608    def get(self):
609        self.set_header("x-overwrite", "1")
610        self.set_header("X-Overwrite", 2)
611        self.add_header("x-multi", 3)
612        self.add_header("X-Multi", "4")
613
614
615class RedirectHandler(RequestHandler):
616    def get(self):
617        if self.get_argument('permanent', None) is not None:
618            self.redirect('/', permanent=int(self.get_argument('permanent')))
619        elif self.get_argument('status', None) is not None:
620            self.redirect('/', status=int(self.get_argument('status')))
621        else:
622            raise Exception("didn't get permanent or status arguments")
623
624
625class EmptyFlushCallbackHandler(RequestHandler):
626    @gen.coroutine
627    def get(self):
628        # Ensure that the flush callback is run whether or not there
629        # was any output.  The gen.Task and direct yield forms are
630        # equivalent.
631        yield self.flush()  # "empty" flush, but writes headers
632        yield self.flush()  # empty flush
633        self.write("o")
634        yield self.flush()  # flushes the "o"
635        yield self.flush()  # empty flush
636        self.finish("k")
637
638
639class HeaderInjectionHandler(RequestHandler):
640    def get(self):
641        try:
642            self.set_header("X-Foo", "foo\r\nX-Bar: baz")
643            raise Exception("Didn't get expected exception")
644        except ValueError as e:
645            if "Unsafe header value" in str(e):
646                self.finish(b"ok")
647            else:
648                raise
649
650
651class GetArgumentHandler(RequestHandler):
652    def prepare(self):
653        if self.get_argument('source', None) == 'query':
654            method = self.get_query_argument
655        elif self.get_argument('source', None) == 'body':
656            method = self.get_body_argument
657        else:
658            method = self.get_argument
659        self.finish(method("foo", "default"))
660
661
662class GetArgumentsHandler(RequestHandler):
663    def prepare(self):
664        self.finish(dict(default=self.get_arguments("foo"),
665                         query=self.get_query_arguments("foo"),
666                         body=self.get_body_arguments("foo")))
667
668
669# This test is shared with wsgi_test.py
670@wsgi_safe
671class WSGISafeWebTest(WebTestCase):
672    COOKIE_SECRET = "WebTest.COOKIE_SECRET"
673
674    def get_app_kwargs(self):
675        loader = DictLoader({
676            "linkify.html": "{% module linkify(message) %}",
677            "page.html": """\
678<html><head></head><body>
679{% for e in entries %}
680{% module Template("entry.html", entry=e) %}
681{% end %}
682</body></html>""",
683            "entry.html": """\
684{{ set_resources(embedded_css=".entry { margin-bottom: 1em; }",
685                 embedded_javascript="js_embed()",
686                 css_files=["/base.css", "/foo.css"],
687                 javascript_files="/common.js",
688                 html_head="<meta>",
689                 html_body='<script src="/analytics.js"/>') }}
690<div class="entry">...</div>""",
691        })
692        return dict(template_loader=loader,
693                    autoescape="xhtml_escape",
694                    cookie_secret=self.COOKIE_SECRET)
695
696    def tearDown(self):
697        super(WSGISafeWebTest, self).tearDown()
698        RequestHandler._template_loaders.clear()
699
700    def get_handlers(self):
701        urls = [
702            url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
703            url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
704            url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
705            url("/linkify", LinkifyHandler),
706            url("/uimodule_resources", UIModuleResourceHandler),
707            url("/optional_path/(.+)?", OptionalPathHandler),
708            url("/multi_header", MultiHeaderHandler),
709            url("/redirect", RedirectHandler),
710            url("/web_redirect_permanent", WebRedirectHandler, {"url": "/web_redirect_newpath"}),
711            url("/web_redirect", WebRedirectHandler,
712                {"url": "/web_redirect_newpath", "permanent": False}),
713            url("//web_redirect_double_slash", WebRedirectHandler,
714                {"url": '/web_redirect_newpath'}),
715            url("/header_injection", HeaderInjectionHandler),
716            url("/get_argument", GetArgumentHandler),
717            url("/get_arguments", GetArgumentsHandler),
718        ]
719        return urls
720
721    def fetch_json(self, *args, **kwargs):
722        response = self.fetch(*args, **kwargs)
723        response.rethrow()
724        return json_decode(response.body)
725
726    def test_types(self):
727        cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
728                                                      "asdf", "qwer"))
729        response = self.fetch("/typecheck/asdf?foo=bar",
730                              headers={"Cookie": "asdf=" + cookie_value})
731        data = json_decode(response.body)
732        self.assertEqual(data, {})
733
734        response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
735                              headers={"Cookie": "asdf=" + cookie_value},
736                              body="foo=bar")
737
738    def test_decode_argument(self):
739        # These urls all decode to the same thing
740        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
741                "/decode_arg/%E9?foo=%E9&encoding=latin1",
742                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
743                ]
744        for req_url in urls:
745            response = self.fetch(req_url)
746            response.rethrow()
747            data = json_decode(response.body)
748            self.assertEqual(data, {u'path': [u'unicode', u'\u00e9'],
749                                    u'query': [u'unicode', u'\u00e9'],
750                                    })
751
752        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
753        response.rethrow()
754        data = json_decode(response.body)
755        self.assertEqual(data, {u'path': [u'bytes', u'c3a9'],
756                                u'query': [u'bytes', u'c3a9'],
757                                })
758
759    def test_decode_argument_invalid_unicode(self):
760        # test that invalid unicode in URLs causes 400, not 500
761        with ExpectLog(gen_log, ".*Invalid unicode.*"):
762            response = self.fetch("/typecheck/invalid%FF")
763            self.assertEqual(response.code, 400)
764            response = self.fetch("/typecheck/invalid?foo=%FF")
765            self.assertEqual(response.code, 400)
766
767    def test_decode_argument_plus(self):
768        # These urls are all equivalent.
769        urls = ["/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8",
770                "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8"]
771        for req_url in urls:
772            response = self.fetch(req_url)
773            response.rethrow()
774            data = json_decode(response.body)
775            self.assertEqual(data, {u'path': [u'unicode', u'1 + 1'],
776                                    u'query': [u'unicode', u'1 + 1'],
777                                    })
778
779    def test_reverse_url(self):
780        self.assertEqual(self.app.reverse_url('decode_arg', 'foo'),
781                         '/decode_arg/foo')
782        self.assertEqual(self.app.reverse_url('decode_arg', 42),
783                         '/decode_arg/42')
784        self.assertEqual(self.app.reverse_url('decode_arg', b'\xe9'),
785                         '/decode_arg/%E9')
786        self.assertEqual(self.app.reverse_url('decode_arg', u'\u00e9'),
787                         '/decode_arg/%C3%A9')
788        self.assertEqual(self.app.reverse_url('decode_arg', '1 + 1'),
789                         '/decode_arg/1%20%2B%201')
790
791    def test_uimodule_unescaped(self):
792        response = self.fetch("/linkify")
793        self.assertEqual(response.body,
794                         b"<a href=\"http://example.com\">http://example.com</a>")
795
796    def test_uimodule_resources(self):
797        response = self.fetch("/uimodule_resources")
798        self.assertEqual(response.body, b"""\
799<html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
800<style type="text/css">
801.entry { margin-bottom: 1em; }
802</style>
803<meta>
804</head><body>
805
806
807<div class="entry">...</div>
808
809
810<div class="entry">...</div>
811
812<script src="/common.js" type="text/javascript"></script>
813<script type="text/javascript">
814//<![CDATA[
815js_embed()
816//]]>
817</script>
818<script src="/analytics.js"/>
819</body></html>""")  # noqa: E501
820
821    def test_optional_path(self):
822        self.assertEqual(self.fetch_json("/optional_path/foo"),
823                         {u"path": u"foo"})
824        self.assertEqual(self.fetch_json("/optional_path/"),
825                         {u"path": None})
826
827    def test_multi_header(self):
828        response = self.fetch("/multi_header")
829        self.assertEqual(response.headers["x-overwrite"], "2")
830        self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
831
832    def test_redirect(self):
833        response = self.fetch("/redirect?permanent=1", follow_redirects=False)
834        self.assertEqual(response.code, 301)
835        response = self.fetch("/redirect?permanent=0", follow_redirects=False)
836        self.assertEqual(response.code, 302)
837        response = self.fetch("/redirect?status=307", follow_redirects=False)
838        self.assertEqual(response.code, 307)
839
840    def test_web_redirect(self):
841        response = self.fetch("/web_redirect_permanent", follow_redirects=False)
842        self.assertEqual(response.code, 301)
843        self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
844        response = self.fetch("/web_redirect", follow_redirects=False)
845        self.assertEqual(response.code, 302)
846        self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
847
848    def test_web_redirect_double_slash(self):
849        response = self.fetch("//web_redirect_double_slash", follow_redirects=False)
850        self.assertEqual(response.code, 301)
851        self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
852
853    def test_header_injection(self):
854        response = self.fetch("/header_injection")
855        self.assertEqual(response.body, b"ok")
856
857    def test_get_argument(self):
858        response = self.fetch("/get_argument?foo=bar")
859        self.assertEqual(response.body, b"bar")
860        response = self.fetch("/get_argument?foo=")
861        self.assertEqual(response.body, b"")
862        response = self.fetch("/get_argument")
863        self.assertEqual(response.body, b"default")
864
865        # Test merging of query and body arguments.
866        # In singular form, body arguments take precedence over query arguments.
867        body = urllib_parse.urlencode(dict(foo="hello"))
868        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
869        self.assertEqual(response.body, b"hello")
870        # In plural methods they are merged.
871        response = self.fetch("/get_arguments?foo=bar",
872                              method="POST", body=body)
873        self.assertEqual(json_decode(response.body),
874                         dict(default=['bar', 'hello'],
875                              query=['bar'],
876                              body=['hello']))
877
878    def test_get_query_arguments(self):
879        # send as a post so we can ensure the separation between query
880        # string and body arguments.
881        body = urllib_parse.urlencode(dict(foo="hello"))
882        response = self.fetch("/get_argument?source=query&foo=bar",
883                              method="POST", body=body)
884        self.assertEqual(response.body, b"bar")
885        response = self.fetch("/get_argument?source=query&foo=",
886                              method="POST", body=body)
887        self.assertEqual(response.body, b"")
888        response = self.fetch("/get_argument?source=query",
889                              method="POST", body=body)
890        self.assertEqual(response.body, b"default")
891
892    def test_get_body_arguments(self):
893        body = urllib_parse.urlencode(dict(foo="bar"))
894        response = self.fetch("/get_argument?source=body&foo=hello",
895                              method="POST", body=body)
896        self.assertEqual(response.body, b"bar")
897
898        body = urllib_parse.urlencode(dict(foo=""))
899        response = self.fetch("/get_argument?source=body&foo=hello",
900                              method="POST", body=body)
901        self.assertEqual(response.body, b"")
902
903        body = urllib_parse.urlencode(dict())
904        response = self.fetch("/get_argument?source=body&foo=hello",
905                              method="POST", body=body)
906        self.assertEqual(response.body, b"default")
907
908    def test_no_gzip(self):
909        response = self.fetch('/get_argument')
910        self.assertNotIn('Accept-Encoding', response.headers.get('Vary', ''))
911        self.assertNotIn('gzip', response.headers.get('Content-Encoding', ''))
912
913
914class NonWSGIWebTests(WebTestCase):
915    def get_handlers(self):
916        return [("/flow_control", FlowControlHandler),
917                ("/empty_flush", EmptyFlushCallbackHandler),
918                ]
919
920    def test_flow_control(self):
921        self.assertEqual(self.fetch("/flow_control").body, b"123")
922
923    def test_empty_flush(self):
924        response = self.fetch("/empty_flush")
925        self.assertEqual(response.body, b"ok")
926
927
928@wsgi_safe
929class ErrorResponseTest(WebTestCase):
930    def get_handlers(self):
931        class DefaultHandler(RequestHandler):
932            def get(self):
933                if self.get_argument("status", None):
934                    raise HTTPError(int(self.get_argument("status")))
935                1 / 0
936
937        class WriteErrorHandler(RequestHandler):
938            def get(self):
939                if self.get_argument("status", None):
940                    self.send_error(int(self.get_argument("status")))
941                else:
942                    1 / 0
943
944            def write_error(self, status_code, **kwargs):
945                self.set_header("Content-Type", "text/plain")
946                if "exc_info" in kwargs:
947                    self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
948                else:
949                    self.write("Status: %d" % status_code)
950
951        class FailedWriteErrorHandler(RequestHandler):
952            def get(self):
953                1 / 0
954
955            def write_error(self, status_code, **kwargs):
956                raise Exception("exception in write_error")
957
958        return [url("/default", DefaultHandler),
959                url("/write_error", WriteErrorHandler),
960                url("/failed_write_error", FailedWriteErrorHandler),
961                ]
962
963    def test_default(self):
964        with ExpectLog(app_log, "Uncaught exception"):
965            response = self.fetch("/default")
966            self.assertEqual(response.code, 500)
967            self.assertTrue(b"500: Internal Server Error" in response.body)
968
969            response = self.fetch("/default?status=503")
970            self.assertEqual(response.code, 503)
971            self.assertTrue(b"503: Service Unavailable" in response.body)
972
973            response = self.fetch("/default?status=435")
974            self.assertEqual(response.code, 435)
975            self.assertTrue(b"435: Unknown" in response.body)
976
977    def test_write_error(self):
978        with ExpectLog(app_log, "Uncaught exception"):
979            response = self.fetch("/write_error")
980            self.assertEqual(response.code, 500)
981            self.assertEqual(b"Exception: ZeroDivisionError", response.body)
982
983            response = self.fetch("/write_error?status=503")
984            self.assertEqual(response.code, 503)
985            self.assertEqual(b"Status: 503", response.body)
986
987    def test_failed_write_error(self):
988        with ExpectLog(app_log, "Uncaught exception"):
989            response = self.fetch("/failed_write_error")
990            self.assertEqual(response.code, 500)
991            self.assertEqual(b"", response.body)
992
993
994@wsgi_safe
995class StaticFileTest(WebTestCase):
996    # The expected MD5 hash of robots.txt, used in tests that call
997    # StaticFileHandler.get_version
998    robots_txt_hash = b"f71d20196d4caf35b6a670db8c70b03d"
999    static_dir = os.path.join(os.path.dirname(__file__), 'static')
1000
1001    def get_handlers(self):
1002        class StaticUrlHandler(RequestHandler):
1003            def get(self, path):
1004                with_v = int(self.get_argument('include_version', 1))
1005                self.write(self.static_url(path, include_version=with_v))
1006
1007        class AbsoluteStaticUrlHandler(StaticUrlHandler):
1008            include_host = True
1009
1010        class OverrideStaticUrlHandler(RequestHandler):
1011            def get(self, path):
1012                do_include = bool(self.get_argument("include_host"))
1013                self.include_host = not do_include
1014
1015                regular_url = self.static_url(path)
1016                override_url = self.static_url(path, include_host=do_include)
1017                if override_url == regular_url:
1018                    return self.write(str(False))
1019
1020                protocol = self.request.protocol + "://"
1021                protocol_length = len(protocol)
1022                check_regular = regular_url.find(protocol, 0, protocol_length)
1023                check_override = override_url.find(protocol, 0, protocol_length)
1024
1025                if do_include:
1026                    result = (check_override == 0 and check_regular == -1)
1027                else:
1028                    result = (check_override == -1 and check_regular == 0)
1029                self.write(str(result))
1030
1031        return [('/static_url/(.*)', StaticUrlHandler),
1032                ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler),
1033                ('/override_static_url/(.*)', OverrideStaticUrlHandler),
1034                ('/root_static/(.*)', StaticFileHandler, dict(path='/'))]
1035
1036    def get_app_kwargs(self):
1037        return dict(static_path=relpath('static'))
1038
1039    def test_static_files(self):
1040        response = self.fetch('/robots.txt')
1041        self.assertTrue(b"Disallow: /" in response.body)
1042
1043        response = self.fetch('/static/robots.txt')
1044        self.assertTrue(b"Disallow: /" in response.body)
1045        self.assertEqual(response.headers.get("Content-Type"), "text/plain")
1046
1047    def test_static_compressed_files(self):
1048        response = self.fetch("/static/sample.xml.gz")
1049        self.assertEqual(response.headers.get("Content-Type"),
1050                         "application/gzip")
1051        response = self.fetch("/static/sample.xml.bz2")
1052        self.assertEqual(response.headers.get("Content-Type"),
1053                         "application/octet-stream")
1054        # make sure the uncompressed file still has the correct type
1055        response = self.fetch("/static/sample.xml")
1056        self.assertTrue(response.headers.get("Content-Type")
1057                        in set(("text/xml", "application/xml")))
1058
1059    def test_static_url(self):
1060        response = self.fetch("/static_url/robots.txt")
1061        self.assertEqual(response.body,
1062                         b"/static/robots.txt?v=" + self.robots_txt_hash)
1063
1064    def test_absolute_static_url(self):
1065        response = self.fetch("/abs_static_url/robots.txt")
1066        self.assertEqual(response.body, (
1067            utf8(self.get_url("/")) +
1068            b"static/robots.txt?v=" +
1069            self.robots_txt_hash
1070        ))
1071
1072    def test_relative_version_exclusion(self):
1073        response = self.fetch("/static_url/robots.txt?include_version=0")
1074        self.assertEqual(response.body, b"/static/robots.txt")
1075
1076    def test_absolute_version_exclusion(self):
1077        response = self.fetch("/abs_static_url/robots.txt?include_version=0")
1078        self.assertEqual(response.body,
1079                         utf8(self.get_url("/") + "static/robots.txt"))
1080
1081    def test_include_host_override(self):
1082        self._trigger_include_host_check(False)
1083        self._trigger_include_host_check(True)
1084
1085    def _trigger_include_host_check(self, include_host):
1086        path = "/override_static_url/robots.txt?include_host=%s"
1087        response = self.fetch(path % int(include_host))
1088        self.assertEqual(response.body, utf8(str(True)))
1089
1090    def get_and_head(self, *args, **kwargs):
1091        """Performs a GET and HEAD request and returns the GET response.
1092
1093        Fails if any ``Content-*`` headers returned by the two requests
1094        differ.
1095        """
1096        head_response = self.fetch(*args, method="HEAD", **kwargs)
1097        get_response = self.fetch(*args, method="GET", **kwargs)
1098        content_headers = set()
1099        for h in itertools.chain(head_response.headers, get_response.headers):
1100            if h.startswith('Content-'):
1101                content_headers.add(h)
1102        for h in content_headers:
1103            self.assertEqual(head_response.headers.get(h),
1104                             get_response.headers.get(h),
1105                             "%s differs between GET (%s) and HEAD (%s)" %
1106                             (h, head_response.headers.get(h),
1107                              get_response.headers.get(h)))
1108        return get_response
1109
1110    def test_static_304_if_modified_since(self):
1111        response1 = self.get_and_head("/static/robots.txt")
1112        response2 = self.get_and_head("/static/robots.txt", headers={
1113            'If-Modified-Since': response1.headers['Last-Modified']})
1114        self.assertEqual(response2.code, 304)
1115        self.assertTrue('Content-Length' not in response2.headers)
1116        self.assertTrue('Last-Modified' not in response2.headers)
1117
1118    def test_static_304_if_none_match(self):
1119        response1 = self.get_and_head("/static/robots.txt")
1120        response2 = self.get_and_head("/static/robots.txt", headers={
1121            'If-None-Match': response1.headers['Etag']})
1122        self.assertEqual(response2.code, 304)
1123
1124    def test_static_304_etag_modified_bug(self):
1125        response1 = self.get_and_head("/static/robots.txt")
1126        response2 = self.get_and_head("/static/robots.txt", headers={
1127            'If-None-Match': '"MISMATCH"',
1128            'If-Modified-Since': response1.headers['Last-Modified']})
1129        self.assertEqual(response2.code, 200)
1130
1131    def test_static_if_modified_since_pre_epoch(self):
1132        # On windows, the functions that work with time_t do not accept
1133        # negative values, and at least one client (processing.js) seems
1134        # to use if-modified-since 1/1/1960 as a cache-busting technique.
1135        response = self.get_and_head("/static/robots.txt", headers={
1136            'If-Modified-Since': 'Fri, 01 Jan 1960 00:00:00 GMT'})
1137        self.assertEqual(response.code, 200)
1138
1139    def test_static_if_modified_since_time_zone(self):
1140        # Instead of the value from Last-Modified, make requests with times
1141        # chosen just before and after the known modification time
1142        # of the file to ensure that the right time zone is being used
1143        # when parsing If-Modified-Since.
1144        stat = os.stat(relpath('static/robots.txt'))
1145
1146        response = self.get_and_head('/static/robots.txt', headers={
1147            'If-Modified-Since': format_timestamp(stat.st_mtime - 1)})
1148        self.assertEqual(response.code, 200)
1149        response = self.get_and_head('/static/robots.txt', headers={
1150            'If-Modified-Since': format_timestamp(stat.st_mtime + 1)})
1151        self.assertEqual(response.code, 304)
1152
1153    def test_static_etag(self):
1154        response = self.get_and_head('/static/robots.txt')
1155        self.assertEqual(utf8(response.headers.get("Etag")),
1156                         b'"' + self.robots_txt_hash + b'"')
1157
1158    def test_static_with_range(self):
1159        response = self.get_and_head('/static/robots.txt', headers={
1160            'Range': 'bytes=0-9'})
1161        self.assertEqual(response.code, 206)
1162        self.assertEqual(response.body, b"User-agent")
1163        self.assertEqual(utf8(response.headers.get("Etag")),
1164                         b'"' + self.robots_txt_hash + b'"')
1165        self.assertEqual(response.headers.get("Content-Length"), "10")
1166        self.assertEqual(response.headers.get("Content-Range"),
1167                         "bytes 0-9/26")
1168
1169    def test_static_with_range_full_file(self):
1170        response = self.get_and_head('/static/robots.txt', headers={
1171            'Range': 'bytes=0-'})
1172        # Note: Chrome refuses to play audio if it gets an HTTP 206 in response
1173        # to ``Range: bytes=0-`` :(
1174        self.assertEqual(response.code, 200)
1175        robots_file_path = os.path.join(self.static_dir, "robots.txt")
1176        with open(robots_file_path) as f:
1177            self.assertEqual(response.body, utf8(f.read()))
1178        self.assertEqual(response.headers.get("Content-Length"), "26")
1179        self.assertEqual(response.headers.get("Content-Range"), None)
1180
1181    def test_static_with_range_full_past_end(self):
1182        response = self.get_and_head('/static/robots.txt', headers={
1183            'Range': 'bytes=0-10000000'})
1184        self.assertEqual(response.code, 200)
1185        robots_file_path = os.path.join(self.static_dir, "robots.txt")
1186        with open(robots_file_path) as f:
1187            self.assertEqual(response.body, utf8(f.read()))
1188        self.assertEqual(response.headers.get("Content-Length"), "26")
1189        self.assertEqual(response.headers.get("Content-Range"), None)
1190
1191    def test_static_with_range_partial_past_end(self):
1192        response = self.get_and_head('/static/robots.txt', headers={
1193            'Range': 'bytes=1-10000000'})
1194        self.assertEqual(response.code, 206)
1195        robots_file_path = os.path.join(self.static_dir, "robots.txt")
1196        with open(robots_file_path) as f:
1197            self.assertEqual(response.body, utf8(f.read()[1:]))
1198        self.assertEqual(response.headers.get("Content-Length"), "25")
1199        self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26")
1200
1201    def test_static_with_range_end_edge(self):
1202        response = self.get_and_head('/static/robots.txt', headers={
1203            'Range': 'bytes=22-'})
1204        self.assertEqual(response.body, b": /\n")
1205        self.assertEqual(response.headers.get("Content-Length"), "4")
1206        self.assertEqual(response.headers.get("Content-Range"),
1207                         "bytes 22-25/26")
1208
1209    def test_static_with_range_neg_end(self):
1210        response = self.get_and_head('/static/robots.txt', headers={
1211            'Range': 'bytes=-4'})
1212        self.assertEqual(response.body, b": /\n")
1213        self.assertEqual(response.headers.get("Content-Length"), "4")
1214        self.assertEqual(response.headers.get("Content-Range"),
1215                         "bytes 22-25/26")
1216
1217    def test_static_invalid_range(self):
1218        response = self.get_and_head('/static/robots.txt', headers={
1219            'Range': 'asdf'})
1220        self.assertEqual(response.code, 200)
1221
1222    def test_static_unsatisfiable_range_zero_suffix(self):
1223        response = self.get_and_head('/static/robots.txt', headers={
1224            'Range': 'bytes=-0'})
1225        self.assertEqual(response.headers.get("Content-Range"),
1226                         "bytes */26")
1227        self.assertEqual(response.code, 416)
1228
1229    def test_static_unsatisfiable_range_invalid_start(self):
1230        response = self.get_and_head('/static/robots.txt', headers={
1231            'Range': 'bytes=26'})
1232        self.assertEqual(response.code, 416)
1233        self.assertEqual(response.headers.get("Content-Range"),
1234                         "bytes */26")
1235
1236    def test_static_head(self):
1237        response = self.fetch('/static/robots.txt', method='HEAD')
1238        self.assertEqual(response.code, 200)
1239        # No body was returned, but we did get the right content length.
1240        self.assertEqual(response.body, b'')
1241        self.assertEqual(response.headers['Content-Length'], '26')
1242        self.assertEqual(utf8(response.headers['Etag']),
1243                         b'"' + self.robots_txt_hash + b'"')
1244
1245    def test_static_head_range(self):
1246        response = self.fetch('/static/robots.txt', method='HEAD',
1247                              headers={'Range': 'bytes=1-4'})
1248        self.assertEqual(response.code, 206)
1249        self.assertEqual(response.body, b'')
1250        self.assertEqual(response.headers['Content-Length'], '4')
1251        self.assertEqual(utf8(response.headers['Etag']),
1252                         b'"' + self.robots_txt_hash + b'"')
1253
1254    def test_static_range_if_none_match(self):
1255        response = self.get_and_head('/static/robots.txt', headers={
1256            'Range': 'bytes=1-4',
1257            'If-None-Match': b'"' + self.robots_txt_hash + b'"'})
1258        self.assertEqual(response.code, 304)
1259        self.assertEqual(response.body, b'')
1260        self.assertTrue('Content-Length' not in response.headers)
1261        self.assertEqual(utf8(response.headers['Etag']),
1262                         b'"' + self.robots_txt_hash + b'"')
1263
1264    def test_static_404(self):
1265        response = self.get_and_head('/static/blarg')
1266        self.assertEqual(response.code, 404)
1267
1268    def test_path_traversal_protection(self):
1269        # curl_httpclient processes ".." on the client side, so we
1270        # must test this with simple_httpclient.
1271        self.http_client.close()
1272        self.http_client = SimpleAsyncHTTPClient()
1273        with ExpectLog(gen_log, ".*not in root static directory"):
1274            response = self.get_and_head('/static/../static_foo.txt')
1275        # Attempted path traversal should result in 403, not 200
1276        # (which means the check failed and the file was served)
1277        # or 404 (which means that the file didn't exist and
1278        # is probably a packaging error).
1279        self.assertEqual(response.code, 403)
1280
1281    @unittest.skipIf(os.name != 'posix', 'non-posix OS')
1282    def test_root_static_path(self):
1283        # Sometimes people set the StaticFileHandler's path to '/'
1284        # to disable Tornado's path validation (in conjunction with
1285        # their own validation in get_absolute_path). Make sure
1286        # that the stricter validation in 4.2.1 doesn't break them.
1287        path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
1288                            'static/robots.txt')
1289        response = self.get_and_head('/root_static' + urllib_parse.quote(path))
1290        self.assertEqual(response.code, 200)
1291
1292
1293@wsgi_safe
1294class StaticDefaultFilenameTest(WebTestCase):
1295    def get_app_kwargs(self):
1296        return dict(static_path=relpath('static'),
1297                    static_handler_args=dict(default_filename='index.html'))
1298
1299    def get_handlers(self):
1300        return []
1301
1302    def test_static_default_filename(self):
1303        response = self.fetch('/static/dir/', follow_redirects=False)
1304        self.assertEqual(response.code, 200)
1305        self.assertEqual(b'this is the index\n', response.body)
1306
1307    def test_static_default_redirect(self):
1308        response = self.fetch('/static/dir', follow_redirects=False)
1309        self.assertEqual(response.code, 301)
1310        self.assertTrue(response.headers['Location'].endswith('/static/dir/'))
1311
1312
1313@wsgi_safe
1314class StaticFileWithPathTest(WebTestCase):
1315    def get_app_kwargs(self):
1316        return dict(static_path=relpath('static'),
1317                    static_handler_args=dict(default_filename='index.html'))
1318
1319    def get_handlers(self):
1320        return [("/foo/(.*)", StaticFileHandler, {
1321            "path": relpath("templates/"),
1322        })]
1323
1324    def test_serve(self):
1325        response = self.fetch("/foo/utf8.html")
1326        self.assertEqual(response.body, b"H\xc3\xa9llo\n")
1327
1328
1329@wsgi_safe
1330class CustomStaticFileTest(WebTestCase):
1331    def get_handlers(self):
1332        class MyStaticFileHandler(StaticFileHandler):
1333            @classmethod
1334            def make_static_url(cls, settings, path):
1335                version_hash = cls.get_version(settings, path)
1336                extension_index = path.rindex('.')
1337                before_version = path[:extension_index]
1338                after_version = path[(extension_index + 1):]
1339                return '/static/%s.%s.%s' % (before_version, version_hash,
1340                                             after_version)
1341
1342            def parse_url_path(self, url_path):
1343                extension_index = url_path.rindex('.')
1344                version_index = url_path.rindex('.', 0, extension_index)
1345                return '%s%s' % (url_path[:version_index],
1346                                 url_path[extension_index:])
1347
1348            @classmethod
1349            def get_absolute_path(cls, settings, path):
1350                return 'CustomStaticFileTest:' + path
1351
1352            def validate_absolute_path(self, root, absolute_path):
1353                return absolute_path
1354
1355            @classmethod
1356            def get_content(self, path, start=None, end=None):
1357                assert start is None and end is None
1358                if path == 'CustomStaticFileTest:foo.txt':
1359                    return b'bar'
1360                raise Exception("unexpected path %r" % path)
1361
1362            def get_content_size(self):
1363                if self.absolute_path == 'CustomStaticFileTest:foo.txt':
1364                    return 3
1365                raise Exception("unexpected path %r" % self.absolute_path)
1366
1367            def get_modified_time(self):
1368                return None
1369
1370            @classmethod
1371            def get_version(cls, settings, path):
1372                return "42"
1373
1374        class StaticUrlHandler(RequestHandler):
1375            def get(self, path):
1376                self.write(self.static_url(path))
1377
1378        self.static_handler_class = MyStaticFileHandler
1379
1380        return [("/static_url/(.*)", StaticUrlHandler)]
1381
1382    def get_app_kwargs(self):
1383        return dict(static_path="dummy",
1384                    static_handler_class=self.static_handler_class)
1385
1386    def test_serve(self):
1387        response = self.fetch("/static/foo.42.txt")
1388        self.assertEqual(response.body, b"bar")
1389
1390    def test_static_url(self):
1391        with ExpectLog(gen_log, "Could not open static file", required=False):
1392            response = self.fetch("/static_url/foo.txt")
1393            self.assertEqual(response.body, b"/static/foo.42.txt")
1394
1395
1396@wsgi_safe
1397class HostMatchingTest(WebTestCase):
1398    class Handler(RequestHandler):
1399        def initialize(self, reply):
1400            self.reply = reply
1401
1402        def get(self):
1403            self.write(self.reply)
1404
1405    def get_handlers(self):
1406        return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})]
1407
1408    def test_host_matching(self):
1409        self.app.add_handlers("www.example.com",
1410                              [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
1411        self.app.add_handlers(r"www\.example\.com",
1412                              [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
1413        self.app.add_handlers("www.example.com",
1414                              [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
1415        self.app.add_handlers("www.e.*e.com",
1416                              [("/baz", HostMatchingTest.Handler, {"reply": "[3]"})])
1417
1418        response = self.fetch("/foo")
1419        self.assertEqual(response.body, b"wildcard")
1420        response = self.fetch("/bar")
1421        self.assertEqual(response.code, 404)
1422        response = self.fetch("/baz")
1423        self.assertEqual(response.code, 404)
1424
1425        response = self.fetch("/foo", headers={'Host': 'www.example.com'})
1426        self.assertEqual(response.body, b"[0]")
1427        response = self.fetch("/bar", headers={'Host': 'www.example.com'})
1428        self.assertEqual(response.body, b"[1]")
1429        response = self.fetch("/baz", headers={'Host': 'www.example.com'})
1430        self.assertEqual(response.body, b"[2]")
1431        response = self.fetch("/baz", headers={'Host': 'www.exe.com'})
1432        self.assertEqual(response.body, b"[3]")
1433
1434
1435@wsgi_safe
1436class DefaultHostMatchingTest(WebTestCase):
1437    def get_handlers(self):
1438        return []
1439
1440    def get_app_kwargs(self):
1441        return {'default_host': "www.example.com"}
1442
1443    def test_default_host_matching(self):
1444        self.app.add_handlers("www.example.com",
1445                              [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
1446        self.app.add_handlers(r"www\.example\.com",
1447                              [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
1448        self.app.add_handlers("www.test.com",
1449                              [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
1450
1451        response = self.fetch("/foo")
1452        self.assertEqual(response.body, b"[0]")
1453        response = self.fetch("/bar")
1454        self.assertEqual(response.body, b"[1]")
1455        response = self.fetch("/baz")
1456        self.assertEqual(response.code, 404)
1457
1458        response = self.fetch("/foo", headers={"X-Real-Ip": "127.0.0.1"})
1459        self.assertEqual(response.code, 404)
1460
1461        self.app.default_host = "www.test.com"
1462
1463        response = self.fetch("/baz")
1464        self.assertEqual(response.body, b"[2]")
1465
1466
1467@wsgi_safe
1468class NamedURLSpecGroupsTest(WebTestCase):
1469    def get_handlers(self):
1470        class EchoHandler(RequestHandler):
1471            def get(self, path):
1472                self.write(path)
1473
1474        return [("/str/(?P<path>.*)", EchoHandler),
1475                (u"/unicode/(?P<path>.*)", EchoHandler)]
1476
1477    def test_named_urlspec_groups(self):
1478        response = self.fetch("/str/foo")
1479        self.assertEqual(response.body, b"foo")
1480
1481        response = self.fetch("/unicode/bar")
1482        self.assertEqual(response.body, b"bar")
1483
1484
1485@wsgi_safe
1486class ClearHeaderTest(SimpleHandlerTestCase):
1487    class Handler(RequestHandler):
1488        def get(self):
1489            self.set_header("h1", "foo")
1490            self.set_header("h2", "bar")
1491            self.clear_header("h1")
1492            self.clear_header("nonexistent")
1493
1494    def test_clear_header(self):
1495        response = self.fetch("/")
1496        self.assertTrue("h1" not in response.headers)
1497        self.assertEqual(response.headers["h2"], "bar")
1498
1499
1500class Header204Test(SimpleHandlerTestCase):
1501    class Handler(RequestHandler):
1502        def get(self):
1503            self.set_status(204)
1504            self.finish()
1505
1506    def test_204_headers(self):
1507        response = self.fetch('/')
1508        self.assertEqual(response.code, 204)
1509        self.assertNotIn("Content-Length", response.headers)
1510        self.assertNotIn("Transfer-Encoding", response.headers)
1511
1512
1513@wsgi_safe
1514class Header304Test(SimpleHandlerTestCase):
1515    class Handler(RequestHandler):
1516        def get(self):
1517            self.set_header("Content-Language", "en_US")
1518            self.write("hello")
1519
1520    def test_304_headers(self):
1521        response1 = self.fetch('/')
1522        self.assertEqual(response1.headers["Content-Length"], "5")
1523        self.assertEqual(response1.headers["Content-Language"], "en_US")
1524
1525        response2 = self.fetch('/', headers={
1526            'If-None-Match': response1.headers["Etag"]})
1527        self.assertEqual(response2.code, 304)
1528        self.assertTrue("Content-Length" not in response2.headers)
1529        self.assertTrue("Content-Language" not in response2.headers)
1530        # Not an entity header, but should not be added to 304s by chunking
1531        self.assertTrue("Transfer-Encoding" not in response2.headers)
1532
1533
1534@wsgi_safe
1535class StatusReasonTest(SimpleHandlerTestCase):
1536    class Handler(RequestHandler):
1537        def get(self):
1538            reason = self.request.arguments.get('reason', [])
1539            self.set_status(int(self.get_argument('code')),
1540                            reason=reason[0] if reason else None)
1541
1542    def get_http_client(self):
1543        # simple_httpclient only: curl doesn't expose the reason string
1544        return SimpleAsyncHTTPClient()
1545
1546    def test_status(self):
1547        response = self.fetch("/?code=304")
1548        self.assertEqual(response.code, 304)
1549        self.assertEqual(response.reason, "Not Modified")
1550        response = self.fetch("/?code=304&reason=Foo")
1551        self.assertEqual(response.code, 304)
1552        self.assertEqual(response.reason, "Foo")
1553        response = self.fetch("/?code=682&reason=Bar")
1554        self.assertEqual(response.code, 682)
1555        self.assertEqual(response.reason, "Bar")
1556        response = self.fetch("/?code=682")
1557        self.assertEqual(response.code, 682)
1558        self.assertEqual(response.reason, "Unknown")
1559
1560
1561@wsgi_safe
1562class DateHeaderTest(SimpleHandlerTestCase):
1563    class Handler(RequestHandler):
1564        def get(self):
1565            self.write("hello")
1566
1567    def test_date_header(self):
1568        response = self.fetch('/')
1569        header_date = datetime.datetime(
1570            *email.utils.parsedate(response.headers['Date'])[:6])
1571        self.assertTrue(header_date - datetime.datetime.utcnow() <
1572                        datetime.timedelta(seconds=2))
1573
1574
1575@wsgi_safe
1576class RaiseWithReasonTest(SimpleHandlerTestCase):
1577    class Handler(RequestHandler):
1578        def get(self):
1579            raise HTTPError(682, reason="Foo")
1580
1581    def get_http_client(self):
1582        # simple_httpclient only: curl doesn't expose the reason string
1583        return SimpleAsyncHTTPClient()
1584
1585    def test_raise_with_reason(self):
1586        response = self.fetch("/")
1587        self.assertEqual(response.code, 682)
1588        self.assertEqual(response.reason, "Foo")
1589        self.assertIn(b'682: Foo', response.body)
1590
1591    def test_httperror_str(self):
1592        self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
1593
1594    def test_httperror_str_from_httputil(self):
1595        self.assertEqual(str(HTTPError(682)), "HTTP 682: Unknown")
1596
1597
1598@wsgi_safe
1599class ErrorHandlerXSRFTest(WebTestCase):
1600    def get_handlers(self):
1601        # note that if the handlers list is empty we get the default_host
1602        # redirect fallback instead of a 404, so test with both an
1603        # explicitly defined error handler and an implicit 404.
1604        return [('/error', ErrorHandler, dict(status_code=417))]
1605
1606    def get_app_kwargs(self):
1607        return dict(xsrf_cookies=True)
1608
1609    def test_error_xsrf(self):
1610        response = self.fetch('/error', method='POST', body='')
1611        self.assertEqual(response.code, 417)
1612
1613    def test_404_xsrf(self):
1614        response = self.fetch('/404', method='POST', body='')
1615        self.assertEqual(response.code, 404)
1616
1617
1618@wsgi_safe
1619class GzipTestCase(SimpleHandlerTestCase):
1620    class Handler(RequestHandler):
1621        def get(self):
1622            for v in self.get_arguments('vary'):
1623                self.add_header('Vary', v)
1624            # Must write at least MIN_LENGTH bytes to activate compression.
1625            self.write('hello world' + ('!' * GZipContentEncoding.MIN_LENGTH))
1626
1627    def get_app_kwargs(self):
1628        return dict(
1629            gzip=True,
1630            static_path=os.path.join(os.path.dirname(__file__), 'static'))
1631
1632    def assert_compressed(self, response):
1633        # simple_httpclient renames the content-encoding header;
1634        # curl_httpclient doesn't.
1635        self.assertEqual(
1636            response.headers.get(
1637                'Content-Encoding',
1638                response.headers.get('X-Consumed-Content-Encoding')),
1639            'gzip')
1640
1641    def test_gzip(self):
1642        response = self.fetch('/')
1643        self.assert_compressed(response)
1644        self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
1645
1646    def test_gzip_static(self):
1647        # The streaming responses in StaticFileHandler have subtle
1648        # interactions with the gzip output so test this case separately.
1649        response = self.fetch('/robots.txt')
1650        self.assert_compressed(response)
1651        self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
1652
1653    def test_gzip_not_requested(self):
1654        response = self.fetch('/', use_gzip=False)
1655        self.assertNotIn('Content-Encoding', response.headers)
1656        self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
1657
1658    def test_vary_already_present(self):
1659        response = self.fetch('/?vary=Accept-Language')
1660        self.assert_compressed(response)
1661        self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')],
1662                         ['Accept-Language', 'Accept-Encoding'])
1663
1664    def test_vary_already_present_multiple(self):
1665        # Regression test for https://github.com/tornadoweb/tornado/issues/1670
1666        response = self.fetch('/?vary=Accept-Language&vary=Cookie')
1667        self.assert_compressed(response)
1668        self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')],
1669                         ['Accept-Language', 'Cookie', 'Accept-Encoding'])
1670
1671
1672@wsgi_safe
1673class PathArgsInPrepareTest(WebTestCase):
1674    class Handler(RequestHandler):
1675        def prepare(self):
1676            self.write(dict(args=self.path_args, kwargs=self.path_kwargs))
1677
1678        def get(self, path):
1679            assert path == 'foo'
1680            self.finish()
1681
1682    def get_handlers(self):
1683        return [('/pos/(.*)', self.Handler),
1684                ('/kw/(?P<path>.*)', self.Handler)]
1685
1686    def test_pos(self):
1687        response = self.fetch('/pos/foo')
1688        response.rethrow()
1689        data = json_decode(response.body)
1690        self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
1691
1692    def test_kw(self):
1693        response = self.fetch('/kw/foo')
1694        response.rethrow()
1695        data = json_decode(response.body)
1696        self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
1697
1698
1699@wsgi_safe
1700class ClearAllCookiesTest(SimpleHandlerTestCase):
1701    class Handler(RequestHandler):
1702        def get(self):
1703            self.clear_all_cookies()
1704            self.write('ok')
1705
1706    def test_clear_all_cookies(self):
1707        response = self.fetch('/', headers={'Cookie': 'foo=bar; baz=xyzzy'})
1708        set_cookies = sorted(response.headers.get_list('Set-Cookie'))
1709        # Python 3.5 sends 'baz="";'; older versions use 'baz=;'
1710        self.assertTrue(set_cookies[0].startswith('baz=;') or
1711                        set_cookies[0].startswith('baz="";'))
1712        self.assertTrue(set_cookies[1].startswith('foo=;') or
1713                        set_cookies[1].startswith('foo="";'))
1714
1715
1716class PermissionError(Exception):
1717    pass
1718
1719
1720@wsgi_safe
1721class ExceptionHandlerTest(SimpleHandlerTestCase):
1722    class Handler(RequestHandler):
1723        def get(self):
1724            exc = self.get_argument('exc')
1725            if exc == 'http':
1726                raise HTTPError(410, "no longer here")
1727            elif exc == 'zero':
1728                1 / 0
1729            elif exc == 'permission':
1730                raise PermissionError('not allowed')
1731
1732        def write_error(self, status_code, **kwargs):
1733            if 'exc_info' in kwargs:
1734                typ, value, tb = kwargs['exc_info']
1735                if isinstance(value, PermissionError):
1736                    self.set_status(403)
1737                    self.write('PermissionError')
1738                    return
1739            RequestHandler.write_error(self, status_code, **kwargs)
1740
1741        def log_exception(self, typ, value, tb):
1742            if isinstance(value, PermissionError):
1743                app_log.warning('custom logging for PermissionError: %s',
1744                                value.args[0])
1745            else:
1746                RequestHandler.log_exception(self, typ, value, tb)
1747
1748    def test_http_error(self):
1749        # HTTPErrors are logged as warnings with no stack trace.
1750        # TODO: extend ExpectLog to test this more precisely
1751        with ExpectLog(gen_log, '.*no longer here'):
1752            response = self.fetch('/?exc=http')
1753            self.assertEqual(response.code, 410)
1754
1755    def test_unknown_error(self):
1756        # Unknown errors are logged as errors with a stack trace.
1757        with ExpectLog(app_log, 'Uncaught exception'):
1758            response = self.fetch('/?exc=zero')
1759            self.assertEqual(response.code, 500)
1760
1761    def test_known_error(self):
1762        # log_exception can override logging behavior, and write_error
1763        # can override the response.
1764        with ExpectLog(app_log,
1765                       'custom logging for PermissionError: not allowed'):
1766            response = self.fetch('/?exc=permission')
1767            self.assertEqual(response.code, 403)
1768
1769
1770@wsgi_safe
1771class BuggyLoggingTest(SimpleHandlerTestCase):
1772    class Handler(RequestHandler):
1773        def get(self):
1774            1 / 0
1775
1776        def log_exception(self, typ, value, tb):
1777            1 / 0
1778
1779    def test_buggy_log_exception(self):
1780        # Something gets logged even though the application's
1781        # logger is broken.
1782        with ExpectLog(app_log, '.*'):
1783            self.fetch('/')
1784
1785
1786@wsgi_safe
1787class UIMethodUIModuleTest(SimpleHandlerTestCase):
1788    """Test that UI methods and modules are created correctly and
1789    associated with the handler.
1790    """
1791    class Handler(RequestHandler):
1792        def get(self):
1793            self.render('foo.html')
1794
1795        def value(self):
1796            return self.get_argument("value")
1797
1798    def get_app_kwargs(self):
1799        def my_ui_method(handler, x):
1800            return "In my_ui_method(%s) with handler value %s." % (
1801                x, handler.value())
1802
1803        class MyModule(UIModule):
1804            def render(self, x):
1805                return "In MyModule(%s) with handler value %s." % (
1806                    x, self.handler.value())
1807
1808        loader = DictLoader({
1809            'foo.html': '{{ my_ui_method(42) }} {% module MyModule(123) %}',
1810        })
1811        return dict(template_loader=loader,
1812                    ui_methods={'my_ui_method': my_ui_method},
1813                    ui_modules={'MyModule': MyModule})
1814
1815    def tearDown(self):
1816        super(UIMethodUIModuleTest, self).tearDown()
1817        # TODO: fix template loader caching so this isn't necessary.
1818        RequestHandler._template_loaders.clear()
1819
1820    def test_ui_method(self):
1821        response = self.fetch('/?value=asdf')
1822        self.assertEqual(response.body,
1823                         b'In my_ui_method(42) with handler value asdf. '
1824                         b'In MyModule(123) with handler value asdf.')
1825
1826
1827@wsgi_safe
1828class GetArgumentErrorTest(SimpleHandlerTestCase):
1829    class Handler(RequestHandler):
1830        def get(self):
1831            try:
1832                self.get_argument('foo')
1833                self.write({})
1834            except MissingArgumentError as e:
1835                self.write({'arg_name': e.arg_name,
1836                            'log_message': e.log_message})
1837
1838    def test_catch_error(self):
1839        response = self.fetch('/')
1840        self.assertEqual(json_decode(response.body),
1841                         {'arg_name': 'foo',
1842                          'log_message': 'Missing argument foo'})
1843
1844
1845class MultipleExceptionTest(SimpleHandlerTestCase):
1846    class Handler(RequestHandler):
1847        exc_count = 0
1848
1849        with ignore_deprecation():
1850            @asynchronous
1851            def get(self):
1852                IOLoop.current().add_callback(lambda: 1 / 0)
1853                IOLoop.current().add_callback(lambda: 1 / 0)
1854
1855        def log_exception(self, typ, value, tb):
1856            MultipleExceptionTest.Handler.exc_count += 1
1857
1858    def test_multi_exception(self):
1859        with ignore_deprecation():
1860            # This test verifies that multiple exceptions raised into the same
1861            # ExceptionStackContext do not generate extraneous log entries
1862            # due to "Cannot send error response after headers written".
1863            # log_exception is called, but it does not proceed to send_error.
1864            response = self.fetch('/')
1865            self.assertEqual(response.code, 500)
1866            response = self.fetch('/')
1867            self.assertEqual(response.code, 500)
1868            # Each of our two requests generated two exceptions, we should have
1869            # seen at least three of them by now (the fourth may still be
1870            # in the queue).
1871            self.assertGreater(MultipleExceptionTest.Handler.exc_count, 2)
1872
1873
1874@wsgi_safe
1875class SetLazyPropertiesTest(SimpleHandlerTestCase):
1876    class Handler(RequestHandler):
1877        def prepare(self):
1878            self.current_user = 'Ben'
1879            self.locale = locale.get('en_US')
1880
1881        def get_user_locale(self):
1882            raise NotImplementedError()
1883
1884        def get_current_user(self):
1885            raise NotImplementedError()
1886
1887        def get(self):
1888            self.write('Hello %s (%s)' % (self.current_user, self.locale.code))
1889
1890    def test_set_properties(self):
1891        # Ensure that current_user can be assigned to normally for apps
1892        # that want to forgo the lazy get_current_user property
1893        response = self.fetch('/')
1894        self.assertEqual(response.body, b'Hello Ben (en_US)')
1895
1896
1897@wsgi_safe
1898class GetCurrentUserTest(WebTestCase):
1899    def get_app_kwargs(self):
1900        class WithoutUserModule(UIModule):
1901            def render(self):
1902                return ''
1903
1904        class WithUserModule(UIModule):
1905            def render(self):
1906                return str(self.current_user)
1907
1908        loader = DictLoader({
1909            'without_user.html': '',
1910            'with_user.html': '{{ current_user }}',
1911            'without_user_module.html': '{% module WithoutUserModule() %}',
1912            'with_user_module.html': '{% module WithUserModule() %}',
1913        })
1914        return dict(template_loader=loader,
1915                    ui_modules={'WithUserModule': WithUserModule,
1916                                'WithoutUserModule': WithoutUserModule})
1917
1918    def tearDown(self):
1919        super(GetCurrentUserTest, self).tearDown()
1920        RequestHandler._template_loaders.clear()
1921
1922    def get_handlers(self):
1923        class CurrentUserHandler(RequestHandler):
1924            def prepare(self):
1925                self.has_loaded_current_user = False
1926
1927            def get_current_user(self):
1928                self.has_loaded_current_user = True
1929                return ''
1930
1931        class WithoutUserHandler(CurrentUserHandler):
1932            def get(self):
1933                self.render_string('without_user.html')
1934                self.finish(str(self.has_loaded_current_user))
1935
1936        class WithUserHandler(CurrentUserHandler):
1937            def get(self):
1938                self.render_string('with_user.html')
1939                self.finish(str(self.has_loaded_current_user))
1940
1941        class CurrentUserModuleHandler(CurrentUserHandler):
1942            def get_template_namespace(self):
1943                # If RequestHandler.get_template_namespace is called, then
1944                # get_current_user is evaluated. Until #820 is fixed, this
1945                # is a small hack to circumvent the issue.
1946                return self.ui
1947
1948        class WithoutUserModuleHandler(CurrentUserModuleHandler):
1949            def get(self):
1950                self.render_string('without_user_module.html')
1951                self.finish(str(self.has_loaded_current_user))
1952
1953        class WithUserModuleHandler(CurrentUserModuleHandler):
1954            def get(self):
1955                self.render_string('with_user_module.html')
1956                self.finish(str(self.has_loaded_current_user))
1957
1958        return [('/without_user', WithoutUserHandler),
1959                ('/with_user', WithUserHandler),
1960                ('/without_user_module', WithoutUserModuleHandler),
1961                ('/with_user_module', WithUserModuleHandler)]
1962
1963    @unittest.skip('needs fix')
1964    def test_get_current_user_is_lazy(self):
1965        # TODO: Make this test pass. See #820.
1966        response = self.fetch('/without_user')
1967        self.assertEqual(response.body, b'False')
1968
1969    def test_get_current_user_works(self):
1970        response = self.fetch('/with_user')
1971        self.assertEqual(response.body, b'True')
1972
1973    def test_get_current_user_from_ui_module_is_lazy(self):
1974        response = self.fetch('/without_user_module')
1975        self.assertEqual(response.body, b'False')
1976
1977    def test_get_current_user_from_ui_module_works(self):
1978        response = self.fetch('/with_user_module')
1979        self.assertEqual(response.body, b'True')
1980
1981
1982@wsgi_safe
1983class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
1984    class Handler(RequestHandler):
1985        pass
1986
1987    def test_unimplemented_standard_methods(self):
1988        for method in ['HEAD', 'GET', 'DELETE', 'OPTIONS']:
1989            response = self.fetch('/', method=method)
1990            self.assertEqual(response.code, 405)
1991        for method in ['POST', 'PUT']:
1992            response = self.fetch('/', method=method, body=b'')
1993            self.assertEqual(response.code, 405)
1994
1995
1996class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
1997    # wsgiref.validate complains about unknown methods in a way that makes
1998    # this test not wsgi_safe.
1999    class Handler(RequestHandler):
2000        def other(self):
2001            # Even though this method exists, it won't get called automatically
2002            # because it is not in SUPPORTED_METHODS.
2003            self.write('other')
2004
2005    def test_unimplemented_patch(self):
2006        # PATCH is recently standardized; Tornado supports it by default
2007        # but wsgiref.validate doesn't like it.
2008        response = self.fetch('/', method='PATCH', body=b'')
2009        self.assertEqual(response.code, 405)
2010
2011    def test_unimplemented_other(self):
2012        response = self.fetch('/', method='OTHER',
2013                              allow_nonstandard_methods=True)
2014        self.assertEqual(response.code, 405)
2015
2016
2017@wsgi_safe
2018class AllHTTPMethodsTest(SimpleHandlerTestCase):
2019    class Handler(RequestHandler):
2020        def method(self):
2021            self.write(self.request.method)
2022
2023        get = delete = options = post = put = method
2024
2025    def test_standard_methods(self):
2026        response = self.fetch('/', method='HEAD')
2027        self.assertEqual(response.body, b'')
2028        for method in ['GET', 'DELETE', 'OPTIONS']:
2029            response = self.fetch('/', method=method)
2030            self.assertEqual(response.body, utf8(method))
2031        for method in ['POST', 'PUT']:
2032            response = self.fetch('/', method=method, body=b'')
2033            self.assertEqual(response.body, utf8(method))
2034
2035
2036class PatchMethodTest(SimpleHandlerTestCase):
2037    class Handler(RequestHandler):
2038        SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ('OTHER',)
2039
2040        def patch(self):
2041            self.write('patch')
2042
2043        def other(self):
2044            self.write('other')
2045
2046    def test_patch(self):
2047        response = self.fetch('/', method='PATCH', body=b'')
2048        self.assertEqual(response.body, b'patch')
2049
2050    def test_other(self):
2051        response = self.fetch('/', method='OTHER',
2052                              allow_nonstandard_methods=True)
2053        self.assertEqual(response.body, b'other')
2054
2055
2056@wsgi_safe
2057class FinishInPrepareTest(SimpleHandlerTestCase):
2058    class Handler(RequestHandler):
2059        def prepare(self):
2060            self.finish('done')
2061
2062        def get(self):
2063            # It's difficult to assert for certain that a method did not
2064            # or will not be called in an asynchronous context, but this
2065            # will be logged noisily if it is reached.
2066            raise Exception('should not reach this method')
2067
2068    def test_finish_in_prepare(self):
2069        response = self.fetch('/')
2070        self.assertEqual(response.body, b'done')
2071
2072
2073@wsgi_safe
2074class Default404Test(WebTestCase):
2075    def get_handlers(self):
2076        # If there are no handlers at all a default redirect handler gets added.
2077        return [('/foo', RequestHandler)]
2078
2079    def test_404(self):
2080        response = self.fetch('/')
2081        self.assertEqual(response.code, 404)
2082        self.assertEqual(response.body,
2083                         b'<html><title>404: Not Found</title>'
2084                         b'<body>404: Not Found</body></html>')
2085
2086
2087@wsgi_safe
2088class Custom404Test(WebTestCase):
2089    def get_handlers(self):
2090        return [('/foo', RequestHandler)]
2091
2092    def get_app_kwargs(self):
2093        class Custom404Handler(RequestHandler):
2094            def get(self):
2095                self.set_status(404)
2096                self.write('custom 404 response')
2097
2098        return dict(default_handler_class=Custom404Handler)
2099
2100    def test_404(self):
2101        response = self.fetch('/')
2102        self.assertEqual(response.code, 404)
2103        self.assertEqual(response.body, b'custom 404 response')
2104
2105
2106@wsgi_safe
2107class DefaultHandlerArgumentsTest(WebTestCase):
2108    def get_handlers(self):
2109        return [('/foo', RequestHandler)]
2110
2111    def get_app_kwargs(self):
2112        return dict(default_handler_class=ErrorHandler,
2113                    default_handler_args=dict(status_code=403))
2114
2115    def test_403(self):
2116        response = self.fetch('/')
2117        self.assertEqual(response.code, 403)
2118
2119
2120@wsgi_safe
2121class HandlerByNameTest(WebTestCase):
2122    def get_handlers(self):
2123        # All three are equivalent.
2124        return [('/hello1', HelloHandler),
2125                ('/hello2', 'tornado.test.web_test.HelloHandler'),
2126                url('/hello3', 'tornado.test.web_test.HelloHandler'),
2127                ]
2128
2129    def test_handler_by_name(self):
2130        resp = self.fetch('/hello1')
2131        self.assertEqual(resp.body, b'hello')
2132        resp = self.fetch('/hello2')
2133        self.assertEqual(resp.body, b'hello')
2134        resp = self.fetch('/hello3')
2135        self.assertEqual(resp.body, b'hello')
2136
2137
2138class StreamingRequestBodyTest(WebTestCase):
2139    def get_handlers(self):
2140        @stream_request_body
2141        class StreamingBodyHandler(RequestHandler):
2142            def initialize(self, test):
2143                self.test = test
2144
2145            def prepare(self):
2146                self.test.prepared.set_result(None)
2147
2148            def data_received(self, data):
2149                self.test.data.set_result(data)
2150
2151            def get(self):
2152                self.test.finished.set_result(None)
2153                self.write({})
2154
2155        @stream_request_body
2156        class EarlyReturnHandler(RequestHandler):
2157            def prepare(self):
2158                # If we finish the response in prepare, it won't continue to
2159                # the (non-existent) data_received.
2160                raise HTTPError(401)
2161
2162        @stream_request_body
2163        class CloseDetectionHandler(RequestHandler):
2164            def initialize(self, test):
2165                self.test = test
2166
2167            def on_connection_close(self):
2168                super(CloseDetectionHandler, self).on_connection_close()
2169                self.test.close_future.set_result(None)
2170
2171        return [('/stream_body', StreamingBodyHandler, dict(test=self)),
2172                ('/early_return', EarlyReturnHandler),
2173                ('/close_detection', CloseDetectionHandler, dict(test=self))]
2174
2175    def connect(self, url, connection_close):
2176        # Use a raw connection so we can control the sending of data.
2177        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
2178        s.connect(("127.0.0.1", self.get_http_port()))
2179        stream = IOStream(s)
2180        stream.write(b"GET " + url + b" HTTP/1.1\r\n")
2181        if connection_close:
2182            stream.write(b"Connection: close\r\n")
2183        stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
2184        return stream
2185
2186    @gen_test
2187    def test_streaming_body(self):
2188        self.prepared = Future()
2189        self.data = Future()
2190        self.finished = Future()
2191
2192        stream = self.connect(b"/stream_body", connection_close=True)
2193        yield self.prepared
2194        stream.write(b"4\r\nasdf\r\n")
2195        # Ensure the first chunk is received before we send the second.
2196        data = yield self.data
2197        self.assertEqual(data, b"asdf")
2198        self.data = Future()
2199        stream.write(b"4\r\nqwer\r\n")
2200        data = yield self.data
2201        self.assertEquals(data, b"qwer")
2202        stream.write(b"0\r\n\r\n")
2203        yield self.finished
2204        data = yield stream.read_until_close()
2205        # This would ideally use an HTTP1Connection to read the response.
2206        self.assertTrue(data.endswith(b"{}"))
2207        stream.close()
2208
2209    @gen_test
2210    def test_early_return(self):
2211        stream = self.connect(b"/early_return", connection_close=False)
2212        data = yield stream.read_until_close()
2213        self.assertTrue(data.startswith(b"HTTP/1.1 401"))
2214
2215    @gen_test
2216    def test_early_return_with_data(self):
2217        stream = self.connect(b"/early_return", connection_close=False)
2218        stream.write(b"4\r\nasdf\r\n")
2219        data = yield stream.read_until_close()
2220        self.assertTrue(data.startswith(b"HTTP/1.1 401"))
2221
2222    @gen_test
2223    def test_close_during_upload(self):
2224        self.close_future = Future()
2225        stream = self.connect(b"/close_detection", connection_close=False)
2226        stream.close()
2227        yield self.close_future
2228
2229
2230# Each method in this handler returns a yieldable object and yields to the
2231# IOLoop so the future is not immediately ready.  Ensure that the
2232# yieldables are respected and no method is called before the previous
2233# one has completed.
2234@stream_request_body
2235class BaseFlowControlHandler(RequestHandler):
2236    def initialize(self, test):
2237        self.test = test
2238        self.method = None
2239        self.methods = []
2240
2241    @contextlib.contextmanager
2242    def in_method(self, method):
2243        if self.method is not None:
2244            self.test.fail("entered method %s while in %s" %
2245                           (method, self.method))
2246        self.method = method
2247        self.methods.append(method)
2248        try:
2249            yield
2250        finally:
2251            self.method = None
2252
2253    @gen.coroutine
2254    def prepare(self):
2255        # Note that asynchronous prepare() does not block data_received,
2256        # so we don't use in_method here.
2257        self.methods.append('prepare')
2258        yield gen.moment
2259
2260    @gen.coroutine
2261    def post(self):
2262        with self.in_method('post'):
2263            yield gen.moment
2264        self.write(dict(methods=self.methods))
2265
2266
2267class BaseStreamingRequestFlowControlTest(object):
2268    def get_httpserver_options(self):
2269        # Use a small chunk size so flow control is relevant even though
2270        # all the data arrives at once.
2271        return dict(chunk_size=10, decompress_request=True)
2272
2273    def get_http_client(self):
2274        # simple_httpclient only: curl doesn't support body_producer.
2275        return SimpleAsyncHTTPClient()
2276
2277    # Test all the slightly different code paths for fixed, chunked, etc bodies.
2278    def test_flow_control_fixed_body(self):
2279        response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
2280                              method='POST')
2281        response.rethrow()
2282        self.assertEqual(json_decode(response.body),
2283                         dict(methods=['prepare', 'data_received',
2284                                       'data_received', 'data_received',
2285                                       'post']))
2286
2287    def test_flow_control_chunked_body(self):
2288        chunks = [b'abcd', b'efgh', b'ijkl']
2289
2290        @gen.coroutine
2291        def body_producer(write):
2292            for i in chunks:
2293                yield write(i)
2294        response = self.fetch('/', body_producer=body_producer, method='POST')
2295        response.rethrow()
2296        self.assertEqual(json_decode(response.body),
2297                         dict(methods=['prepare', 'data_received',
2298                                       'data_received', 'data_received',
2299                                       'post']))
2300
2301    def test_flow_control_compressed_body(self):
2302        bytesio = BytesIO()
2303        gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio)
2304        gzip_file.write(b'abcdefghijklmnopqrstuvwxyz')
2305        gzip_file.close()
2306        compressed_body = bytesio.getvalue()
2307        response = self.fetch('/', body=compressed_body, method='POST',
2308                              headers={'Content-Encoding': 'gzip'})
2309        response.rethrow()
2310        self.assertEqual(json_decode(response.body),
2311                         dict(methods=['prepare', 'data_received',
2312                                       'data_received', 'data_received',
2313                                       'post']))
2314
2315
2316class DecoratedStreamingRequestFlowControlTest(
2317        BaseStreamingRequestFlowControlTest,
2318        WebTestCase):
2319    def get_handlers(self):
2320        class DecoratedFlowControlHandler(BaseFlowControlHandler):
2321            @gen.coroutine
2322            def data_received(self, data):
2323                with self.in_method('data_received'):
2324                    yield gen.moment
2325        return [('/', DecoratedFlowControlHandler, dict(test=self))]
2326
2327
2328@skipBefore35
2329class NativeStreamingRequestFlowControlTest(
2330        BaseStreamingRequestFlowControlTest,
2331        WebTestCase):
2332    def get_handlers(self):
2333        class NativeFlowControlHandler(BaseFlowControlHandler):
2334            data_received = exec_test(globals(), locals(), """
2335            async def data_received(self, data):
2336                with self.in_method('data_received'):
2337                    import asyncio
2338                    await asyncio.sleep(0)
2339            """)["data_received"]
2340        return [('/', NativeFlowControlHandler, dict(test=self))]
2341
2342
2343@wsgi_safe
2344class IncorrectContentLengthTest(SimpleHandlerTestCase):
2345    def get_handlers(self):
2346        test = self
2347        self.server_error = None
2348
2349        # Manually set a content-length that doesn't match the actual content.
2350        class TooHigh(RequestHandler):
2351            def get(self):
2352                self.set_header("Content-Length", "42")
2353                try:
2354                    self.finish("ok")
2355                except Exception as e:
2356                    test.server_error = e
2357                    raise
2358
2359        class TooLow(RequestHandler):
2360            def get(self):
2361                self.set_header("Content-Length", "2")
2362                try:
2363                    self.finish("hello")
2364                except Exception as e:
2365                    test.server_error = e
2366                    raise
2367
2368        return [('/high', TooHigh),
2369                ('/low', TooLow)]
2370
2371    def test_content_length_too_high(self):
2372        # When the content-length is too high, the connection is simply
2373        # closed without completing the response.  An error is logged on
2374        # the server.
2375        with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
2376            with ExpectLog(gen_log,
2377                           "(Cannot send error response after headers written"
2378                           "|Failed to flush partial response)"):
2379                with self.assertRaises(HTTPClientError):
2380                    self.fetch("/high", raise_error=True)
2381        self.assertEqual(str(self.server_error),
2382                         "Tried to write 40 bytes less than Content-Length")
2383
2384    def test_content_length_too_low(self):
2385        # When the content-length is too low, the connection is closed
2386        # without writing the last chunk, so the client never sees the request
2387        # complete (which would be a framing error).
2388        with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
2389            with ExpectLog(gen_log,
2390                           "(Cannot send error response after headers written"
2391                           "|Failed to flush partial response)"):
2392                with self.assertRaises(HTTPClientError):
2393                    self.fetch("/low", raise_error=True)
2394        self.assertEqual(str(self.server_error),
2395                         "Tried to write more data than Content-Length")
2396
2397
2398class ClientCloseTest(SimpleHandlerTestCase):
2399    class Handler(RequestHandler):
2400        def get(self):
2401            if self.request.version.startswith('HTTP/1'):
2402                # Simulate a connection closed by the client during
2403                # request processing.  The client will see an error, but the
2404                # server should respond gracefully (without logging errors
2405                # because we were unable to write out as many bytes as
2406                # Content-Length said we would)
2407                self.request.connection.stream.close()
2408                self.write('hello')
2409            else:
2410                # TODO: add a HTTP2-compatible version of this test.
2411                self.write('requires HTTP/1.x')
2412
2413    def test_client_close(self):
2414        with self.assertRaises((HTTPClientError, unittest.SkipTest)):
2415            response = self.fetch('/', raise_error=True)
2416            if response.body == b'requires HTTP/1.x':
2417                self.skipTest('requires HTTP/1.x')
2418            self.assertEqual(response.code, 599)
2419
2420
2421class SignedValueTest(unittest.TestCase):
2422    SECRET = "It's a secret to everybody"
2423    SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
2424
2425    def past(self):
2426        return self.present() - 86400 * 32
2427
2428    def present(self):
2429        return 1300000000
2430
2431    def test_known_values(self):
2432        signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value",
2433                                        version=1, clock=self.present)
2434        self.assertEqual(
2435            signed_v1,
2436            b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f")
2437
2438        signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value",
2439                                        version=2, clock=self.present)
2440        self.assertEqual(
2441            signed_v2,
2442            b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
2443            b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
2444
2445        signed_default = create_signed_value(SignedValueTest.SECRET,
2446                                             "key", "value", clock=self.present)
2447        self.assertEqual(signed_default, signed_v2)
2448
2449        decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key",
2450                                         signed_v1, min_version=1,
2451                                         clock=self.present)
2452        self.assertEqual(decoded_v1, b"value")
2453
2454        decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key",
2455                                         signed_v2, min_version=2,
2456                                         clock=self.present)
2457        self.assertEqual(decoded_v2, b"value")
2458
2459    def test_name_swap(self):
2460        signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value",
2461                                      clock=self.present)
2462        signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value",
2463                                      clock=self.present)
2464        # Try decoding each string with the other's "name"
2465        decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1,
2466                                       clock=self.present)
2467        self.assertIs(decoded1, None)
2468        decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2,
2469                                       clock=self.present)
2470        self.assertIs(decoded2, None)
2471
2472    def test_expired(self):
2473        signed = create_signed_value(SignedValueTest.SECRET, "key1", "value",
2474                                     clock=self.past)
2475        decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1",
2476                                           signed, clock=self.past)
2477        self.assertEqual(decoded_past, b"value")
2478        decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1",
2479                                              signed, clock=self.present)
2480        self.assertIs(decoded_present, None)
2481
2482    def test_payload_tampering(self):
2483        # These cookies are variants of the one in test_known_values.
2484        sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
2485
2486        def validate(prefix):
2487            return (b'value' ==
2488                    decode_signed_value(SignedValueTest.SECRET, "key",
2489                                        prefix + sig, clock=self.present))
2490        self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
2491        # Change key version
2492        self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
2493        # length mismatch (field too short)
2494        self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
2495        # length mismatch (field too long)
2496        self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
2497
2498    def test_signature_tampering(self):
2499        prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
2500
2501        def validate(sig):
2502            return (b'value' ==
2503                    decode_signed_value(SignedValueTest.SECRET, "key",
2504                                        prefix + sig, clock=self.present))
2505        self.assertTrue(validate(
2506            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
2507        # All zeros
2508        self.assertFalse(validate("0" * 32))
2509        # Change one character
2510        self.assertFalse(validate(
2511            "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
2512        # Change another character
2513        self.assertFalse(validate(
2514            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153"))
2515        # Truncate
2516        self.assertFalse(validate(
2517            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15"))
2518        # Lengthen
2519        self.assertFalse(validate(
2520            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"))
2521
2522    def test_non_ascii(self):
2523        value = b"\xe9"
2524        signed = create_signed_value(SignedValueTest.SECRET, "key", value,
2525                                     clock=self.present)
2526        decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed,
2527                                      clock=self.present)
2528        self.assertEqual(value, decoded)
2529
2530    def test_key_versioning_read_write_default_key(self):
2531        value = b"\xe9"
2532        signed = create_signed_value(SignedValueTest.SECRET_DICT,
2533                                     "key", value, clock=self.present,
2534                                     key_version=0)
2535        decoded = decode_signed_value(SignedValueTest.SECRET_DICT,
2536                                      "key", signed, clock=self.present)
2537        self.assertEqual(value, decoded)
2538
2539    def test_key_versioning_read_write_non_default_key(self):
2540        value = b"\xe9"
2541        signed = create_signed_value(SignedValueTest.SECRET_DICT,
2542                                     "key", value, clock=self.present,
2543                                     key_version=1)
2544        decoded = decode_signed_value(SignedValueTest.SECRET_DICT,
2545                                      "key", signed, clock=self.present)
2546        self.assertEqual(value, decoded)
2547
2548    def test_key_versioning_invalid_key(self):
2549        value = b"\xe9"
2550        signed = create_signed_value(SignedValueTest.SECRET_DICT,
2551                                     "key", value, clock=self.present,
2552                                     key_version=0)
2553        newkeys = SignedValueTest.SECRET_DICT.copy()
2554        newkeys.pop(0)
2555        decoded = decode_signed_value(newkeys,
2556                                      "key", signed, clock=self.present)
2557        self.assertEqual(None, decoded)
2558
2559    def test_key_version_retrieval(self):
2560        value = b"\xe9"
2561        signed = create_signed_value(SignedValueTest.SECRET_DICT,
2562                                     "key", value, clock=self.present,
2563                                     key_version=1)
2564        key_version = get_signature_key_version(signed)
2565        self.assertEqual(1, key_version)
2566
2567
2568@wsgi_safe
2569class XSRFTest(SimpleHandlerTestCase):
2570    class Handler(RequestHandler):
2571        def get(self):
2572            version = int(self.get_argument("version", "2"))
2573            # This would be a bad idea in a real app, but in this test
2574            # it's fine.
2575            self.settings["xsrf_cookie_version"] = version
2576            self.write(self.xsrf_token)
2577
2578        def post(self):
2579            self.write("ok")
2580
2581    def get_app_kwargs(self):
2582        return dict(xsrf_cookies=True)
2583
2584    def setUp(self):
2585        super(XSRFTest, self).setUp()
2586        self.xsrf_token = self.get_token()
2587
2588    def get_token(self, old_token=None, version=None):
2589        if old_token is not None:
2590            headers = self.cookie_headers(old_token)
2591        else:
2592            headers = None
2593        response = self.fetch(
2594            "/" if version is None else ("/?version=%d" % version),
2595            headers=headers)
2596        response.rethrow()
2597        return native_str(response.body)
2598
2599    def cookie_headers(self, token=None):
2600        if token is None:
2601            token = self.xsrf_token
2602        return {"Cookie": "_xsrf=" + token}
2603
2604    def test_xsrf_fail_no_token(self):
2605        with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
2606            response = self.fetch("/", method="POST", body=b"")
2607        self.assertEqual(response.code, 403)
2608
2609    def test_xsrf_fail_body_no_cookie(self):
2610        with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
2611            response = self.fetch(
2612                "/", method="POST",
2613                body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
2614        self.assertEqual(response.code, 403)
2615
2616    def test_xsrf_fail_argument_invalid_format(self):
2617        with ExpectLog(gen_log, ".*'_xsrf' argument has invalid format"):
2618            response = self.fetch(
2619                "/", method="POST",
2620                headers=self.cookie_headers(),
2621                body=urllib_parse.urlencode(dict(_xsrf='3|')))
2622        self.assertEqual(response.code, 403)
2623
2624    def test_xsrf_fail_cookie_invalid_format(self):
2625        with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
2626            response = self.fetch(
2627                "/", method="POST",
2628                headers=self.cookie_headers(token='3|'),
2629                body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
2630        self.assertEqual(response.code, 403)
2631
2632    def test_xsrf_fail_cookie_no_body(self):
2633        with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
2634            response = self.fetch(
2635                "/", method="POST", body=b"",
2636                headers=self.cookie_headers())
2637        self.assertEqual(response.code, 403)
2638
2639    def test_xsrf_success_short_token(self):
2640        response = self.fetch(
2641            "/", method="POST",
2642            body=urllib_parse.urlencode(dict(_xsrf='deadbeef')),
2643            headers=self.cookie_headers(token='deadbeef'))
2644        self.assertEqual(response.code, 200)
2645
2646    def test_xsrf_success_non_hex_token(self):
2647        response = self.fetch(
2648            "/", method="POST",
2649            body=urllib_parse.urlencode(dict(_xsrf='xoxo')),
2650            headers=self.cookie_headers(token='xoxo'))
2651        self.assertEqual(response.code, 200)
2652
2653    def test_xsrf_success_post_body(self):
2654        response = self.fetch(
2655            "/", method="POST",
2656            body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
2657            headers=self.cookie_headers())
2658        self.assertEqual(response.code, 200)
2659
2660    def test_xsrf_success_query_string(self):
2661        response = self.fetch(
2662            "/?" + urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
2663            method="POST", body=b"",
2664            headers=self.cookie_headers())
2665        self.assertEqual(response.code, 200)
2666
2667    def test_xsrf_success_header(self):
2668        response = self.fetch("/", method="POST", body=b"",
2669                              headers=dict({"X-Xsrftoken": self.xsrf_token},  # type: ignore
2670                                           **self.cookie_headers()))
2671        self.assertEqual(response.code, 200)
2672
2673    def test_distinct_tokens(self):
2674        # Every request gets a distinct token.
2675        NUM_TOKENS = 10
2676        tokens = set()
2677        for i in range(NUM_TOKENS):
2678            tokens.add(self.get_token())
2679        self.assertEqual(len(tokens), NUM_TOKENS)
2680
2681    def test_cross_user(self):
2682        token2 = self.get_token()
2683        # Each token can be used to authenticate its own request.
2684        for token in (self.xsrf_token, token2):
2685            response = self.fetch(
2686                "/", method="POST",
2687                body=urllib_parse.urlencode(dict(_xsrf=token)),
2688                headers=self.cookie_headers(token))
2689            self.assertEqual(response.code, 200)
2690        # Sending one in the cookie and the other in the body is not allowed.
2691        for cookie_token, body_token in ((self.xsrf_token, token2),
2692                                         (token2, self.xsrf_token)):
2693            with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
2694                response = self.fetch(
2695                    "/", method="POST",
2696                    body=urllib_parse.urlencode(dict(_xsrf=body_token)),
2697                    headers=self.cookie_headers(cookie_token))
2698            self.assertEqual(response.code, 403)
2699
2700    def test_refresh_token(self):
2701        token = self.xsrf_token
2702        tokens_seen = set([token])
2703        # A user's token is stable over time.  Refreshing the page in one tab
2704        # might update the cookie while an older tab still has the old cookie
2705        # in its DOM.  Simulate this scenario by passing a constant token
2706        # in the body and re-querying for the token.
2707        for i in range(5):
2708            token = self.get_token(token)
2709            # Tokens are encoded uniquely each time
2710            tokens_seen.add(token)
2711            response = self.fetch(
2712                "/", method="POST",
2713                body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
2714                headers=self.cookie_headers(token))
2715            self.assertEqual(response.code, 200)
2716        self.assertEqual(len(tokens_seen), 6)
2717
2718    def test_versioning(self):
2719        # Version 1 still produces distinct tokens per request.
2720        self.assertNotEqual(self.get_token(version=1),
2721                            self.get_token(version=1))
2722
2723        # Refreshed v1 tokens are all identical.
2724        v1_token = self.get_token(version=1)
2725        for i in range(5):
2726            self.assertEqual(self.get_token(v1_token, version=1), v1_token)
2727
2728        # Upgrade to a v2 version of the same token
2729        v2_token = self.get_token(v1_token)
2730        self.assertNotEqual(v1_token, v2_token)
2731        # Each v1 token can map to many v2 tokens.
2732        self.assertNotEqual(v2_token, self.get_token(v1_token))
2733
2734        # The tokens are cross-compatible.
2735        for cookie_token, body_token in ((v1_token, v2_token),
2736                                         (v2_token, v1_token)):
2737            response = self.fetch(
2738                "/", method="POST",
2739                body=urllib_parse.urlencode(dict(_xsrf=body_token)),
2740                headers=self.cookie_headers(cookie_token))
2741            self.assertEqual(response.code, 200)
2742
2743
2744@wsgi_safe
2745class XSRFCookieKwargsTest(SimpleHandlerTestCase):
2746    class Handler(RequestHandler):
2747        def get(self):
2748            self.write(self.xsrf_token)
2749
2750    def get_app_kwargs(self):
2751        return dict(xsrf_cookies=True,
2752                    xsrf_cookie_kwargs=dict(httponly=True))
2753
2754    def test_xsrf_httponly(self):
2755        response = self.fetch("/")
2756        self.assertIn('httponly;', response.headers['Set-Cookie'].lower())
2757
2758
2759@wsgi_safe
2760class FinishExceptionTest(SimpleHandlerTestCase):
2761    class Handler(RequestHandler):
2762        def get(self):
2763            self.set_status(401)
2764            self.set_header('WWW-Authenticate', 'Basic realm="something"')
2765            if self.get_argument('finish_value', ''):
2766                raise Finish('authentication required')
2767            else:
2768                self.write('authentication required')
2769                raise Finish()
2770
2771    def test_finish_exception(self):
2772        for u in ['/', '/?finish_value=1']:
2773            response = self.fetch(u)
2774            self.assertEqual(response.code, 401)
2775            self.assertEqual('Basic realm="something"',
2776                             response.headers.get('WWW-Authenticate'))
2777            self.assertEqual(b'authentication required', response.body)
2778
2779
2780@wsgi_safe
2781class DecoratorTest(WebTestCase):
2782    def get_handlers(self):
2783        class RemoveSlashHandler(RequestHandler):
2784            @removeslash
2785            def get(self):
2786                pass
2787
2788        class AddSlashHandler(RequestHandler):
2789            @addslash
2790            def get(self):
2791                pass
2792
2793        return [("/removeslash/", RemoveSlashHandler),
2794                ("/addslash", AddSlashHandler),
2795                ]
2796
2797    def test_removeslash(self):
2798        response = self.fetch("/removeslash/", follow_redirects=False)
2799        self.assertEqual(response.code, 301)
2800        self.assertEqual(response.headers['Location'], "/removeslash")
2801
2802        response = self.fetch("/removeslash/?foo=bar", follow_redirects=False)
2803        self.assertEqual(response.code, 301)
2804        self.assertEqual(response.headers['Location'], "/removeslash?foo=bar")
2805
2806    def test_addslash(self):
2807        response = self.fetch("/addslash", follow_redirects=False)
2808        self.assertEqual(response.code, 301)
2809        self.assertEqual(response.headers['Location'], "/addslash/")
2810
2811        response = self.fetch("/addslash?foo=bar", follow_redirects=False)
2812        self.assertEqual(response.code, 301)
2813        self.assertEqual(response.headers['Location'], "/addslash/?foo=bar")
2814
2815
2816@wsgi_safe
2817class CacheTest(WebTestCase):
2818    def get_handlers(self):
2819        class EtagHandler(RequestHandler):
2820            def get(self, computed_etag):
2821                self.write(computed_etag)
2822
2823            def compute_etag(self):
2824                return self._write_buffer[0]
2825
2826        return [
2827            ('/etag/(.*)', EtagHandler)
2828        ]
2829
2830    def test_wildcard_etag(self):
2831        computed_etag = '"xyzzy"'
2832        etags = '*'
2833        self._test_etag(computed_etag, etags, 304)
2834
2835    def test_strong_etag_match(self):
2836        computed_etag = '"xyzzy"'
2837        etags = '"xyzzy"'
2838        self._test_etag(computed_etag, etags, 304)
2839
2840    def test_multiple_strong_etag_match(self):
2841        computed_etag = '"xyzzy1"'
2842        etags = '"xyzzy1", "xyzzy2"'
2843        self._test_etag(computed_etag, etags, 304)
2844
2845    def test_strong_etag_not_match(self):
2846        computed_etag = '"xyzzy"'
2847        etags = '"xyzzy1"'
2848        self._test_etag(computed_etag, etags, 200)
2849
2850    def test_multiple_strong_etag_not_match(self):
2851        computed_etag = '"xyzzy"'
2852        etags = '"xyzzy1", "xyzzy2"'
2853        self._test_etag(computed_etag, etags, 200)
2854
2855    def test_weak_etag_match(self):
2856        computed_etag = '"xyzzy1"'
2857        etags = 'W/"xyzzy1"'
2858        self._test_etag(computed_etag, etags, 304)
2859
2860    def test_multiple_weak_etag_match(self):
2861        computed_etag = '"xyzzy2"'
2862        etags = 'W/"xyzzy1", W/"xyzzy2"'
2863        self._test_etag(computed_etag, etags, 304)
2864
2865    def test_weak_etag_not_match(self):
2866        computed_etag = '"xyzzy2"'
2867        etags = 'W/"xyzzy1"'
2868        self._test_etag(computed_etag, etags, 200)
2869
2870    def test_multiple_weak_etag_not_match(self):
2871        computed_etag = '"xyzzy3"'
2872        etags = 'W/"xyzzy1", W/"xyzzy2"'
2873        self._test_etag(computed_etag, etags, 200)
2874
2875    def _test_etag(self, computed_etag, etags, status_code):
2876        response = self.fetch(
2877            '/etag/' + computed_etag,
2878            headers={'If-None-Match': etags}
2879        )
2880        self.assertEqual(response.code, status_code)
2881
2882
2883@wsgi_safe
2884class RequestSummaryTest(SimpleHandlerTestCase):
2885    class Handler(RequestHandler):
2886        def get(self):
2887            # remote_ip is optional, although it's set by
2888            # both HTTPServer and WSGIAdapter.
2889            # Clobber it to make sure it doesn't break logging.
2890            self.request.remote_ip = None
2891            self.finish(self._request_summary())
2892
2893    def test_missing_remote_ip(self):
2894        resp = self.fetch("/")
2895        self.assertEqual(resp.body, b"GET / (None)")
2896
2897
2898class HTTPErrorTest(unittest.TestCase):
2899    def test_copy(self):
2900        e = HTTPError(403, reason="Go away")
2901        e2 = copy.copy(e)
2902        self.assertIsNot(e, e2)
2903        self.assertEqual(e.status_code, e2.status_code)
2904        self.assertEqual(e.reason, e2.reason)
2905
2906
2907class ApplicationTest(AsyncTestCase):
2908    def test_listen(self):
2909        app = Application([])
2910        server = app.listen(0, address='127.0.0.1')
2911        server.stop()
2912
2913
2914class URLSpecReverseTest(unittest.TestCase):
2915    def test_reverse(self):
2916        self.assertEqual('/favicon.ico', url(r'/favicon\.ico', None).reverse())
2917        self.assertEqual('/favicon.ico', url(r'^/favicon\.ico$', None).reverse())
2918
2919    def test_non_reversible(self):
2920        # URLSpecs are non-reversible if they include non-constant
2921        # regex features outside capturing groups. Currently, this is
2922        # only strictly enforced for backslash-escaped character
2923        # classes.
2924        paths = [
2925            r'^/api/v\d+/foo/(\w+)$',
2926        ]
2927        for path in paths:
2928            # A URLSpec can still be created even if it cannot be reversed.
2929            url_spec = url(path, None)
2930            try:
2931                result = url_spec.reverse()
2932                self.fail("did not get expected exception when reversing %s. "
2933                          "result: %s" % (path, result))
2934            except ValueError:
2935                pass
2936
2937    def test_reverse_arguments(self):
2938        self.assertEqual('/api/v1/foo/bar',
2939                         url(r'^/api/v1/foo/(\w+)$', None).reverse('bar'))
2940
2941
2942class RedirectHandlerTest(WebTestCase):
2943    def get_handlers(self):
2944        return [
2945            ('/src', WebRedirectHandler, {'url': '/dst'}),
2946            ('/src2', WebRedirectHandler, {'url': '/dst2?foo=bar'}),
2947            (r'/(.*?)/(.*?)/(.*)', WebRedirectHandler, {'url': '/{1}/{0}/{2}'})]
2948
2949    def test_basic_redirect(self):
2950        response = self.fetch('/src', follow_redirects=False)
2951        self.assertEqual(response.code, 301)
2952        self.assertEqual(response.headers['Location'], '/dst')
2953
2954    def test_redirect_with_argument(self):
2955        response = self.fetch('/src?foo=bar', follow_redirects=False)
2956        self.assertEqual(response.code, 301)
2957        self.assertEqual(response.headers['Location'], '/dst?foo=bar')
2958
2959    def test_redirect_with_appending_argument(self):
2960        response = self.fetch('/src2?foo2=bar2', follow_redirects=False)
2961        self.assertEqual(response.code, 301)
2962        self.assertEqual(response.headers['Location'], '/dst2?foo=bar&foo2=bar2')
2963
2964    def test_redirect_pattern(self):
2965        response = self.fetch('/a/b/c', follow_redirects=False)
2966        self.assertEqual(response.code, 301)
2967        self.assertEqual(response.headers['Location'], '/b/a/c')
2968