1# -*- coding: utf-8 -*-
2''' Tests for the BaseRequest and BaseResponse objects and their subclasses. '''
3
4import unittest
5import sys
6
7import bottle
8from bottle import request, tob, touni, tonat, json_dumps, _e, HTTPError, parse_date
9import tools
10import wsgiref.util
11import base64
12
13from bottle import BaseRequest, BaseResponse, LocalRequest
14
15
16try:
17    from itertools import product
18except ImportError:
19    def product(*args):
20        pools = map(tuple, args)
21        result = [[]]
22        for pool in pools:
23            result = [x + [y] for x in result for y in pool]
24        for prod in result:
25            yield tuple(prod)
26
27class TestRequest(unittest.TestCase):
28
29    def test_app_property(self):
30        e = {}
31        r = BaseRequest(e)
32        self.assertRaises(RuntimeError, lambda: r.app)
33        e.update({'bottle.app': 5})
34        self.assertEqual(r.app, 5)
35
36    def test_route_property(self):
37        e = {'bottle.route': 5}
38        r = BaseRequest(e)
39        self.assertEqual(r.route, 5)
40
41    def test_url_for_property(self):
42        e = {}
43        r = BaseRequest(e)
44        self.assertRaises(RuntimeError, lambda: r.url_args)
45        e.update({'route.url_args': {'a': 5}})
46        self.assertEqual(r.url_args, {'a': 5})
47
48    def test_path(self):
49        """ PATH_INFO normalization. """
50        # Legal paths
51        tests = [('', '/'), ('x','/x'), ('x/', '/x/'), ('/x', '/x'), ('/x/', '/x/')]
52        for raw, norm in tests:
53            self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path)
54        # Strange paths
55        tests = [('///', '/'), ('//x','/x')]
56        for raw, norm in tests:
57            self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path)
58        # No path at all
59        self.assertEqual('/', BaseRequest({}).path)
60
61    def test_method(self):
62        self.assertEqual(BaseRequest({}).method, 'GET')
63        self.assertEqual(BaseRequest({'REQUEST_METHOD':'GET'}).method, 'GET')
64        self.assertEqual(BaseRequest({'REQUEST_METHOD':'GeT'}).method, 'GET')
65        self.assertEqual(BaseRequest({'REQUEST_METHOD':'get'}).method, 'GET')
66        self.assertEqual(BaseRequest({'REQUEST_METHOD':'POst'}).method, 'POST')
67        self.assertEqual(BaseRequest({'REQUEST_METHOD':'FanTASY'}).method, 'FANTASY')
68
69    def test_script_name(self):
70        """ SCRIPT_NAME normalization. """
71        # Legal paths
72        tests = [('', '/'), ('x','/x/'), ('x/', '/x/'), ('/x', '/x/'), ('/x/', '/x/')]
73        for raw, norm in tests:
74            self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name)
75        # Strange paths
76        tests = [('///', '/'), ('///x///','/x/')]
77        for raw, norm in tests:
78            self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name)
79        # No path at all
80        self.assertEqual('/', BaseRequest({}).script_name)
81
82    def test_pathshift(self):
83        """ Request.path_shift() """
84        def test_shift(s, p, c):
85            request = BaseRequest({'SCRIPT_NAME': s, 'PATH_INFO': p})
86            request.path_shift(c)
87            return [request['SCRIPT_NAME'], request.path]
88        self.assertEqual(['/a/b', '/c/d'], test_shift('/a/b', '/c/d', 0))
89        self.assertEqual(['/a/b', '/c/d/'], test_shift('/a/b', '/c/d/', 0))
90        self.assertEqual(['/a/b/c', '/d'], test_shift('/a/b', '/c/d', 1))
91        self.assertEqual(['/a', '/b/c/d'], test_shift('/a/b', '/c/d', -1))
92        self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b', '/c/d/', 1))
93        self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b', '/c/d/', -1))
94        self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b/', '/c/d/', 1))
95        self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b/', '/c/d/', -1))
96        self.assertEqual(['/a/b/c/d', '/'], test_shift('/', '/a/b/c/d', 4))
97        self.assertEqual(['/', '/a/b/c/d/'], test_shift('/a/b/c/d', '/', -4))
98        self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', 3)
99        self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', -3)
100
101    def test_url(self):
102        """ Environ: URL building """
103        request = BaseRequest({'HTTP_HOST':'example.com'})
104        self.assertEqual('http://example.com/', request.url)
105        request = BaseRequest({'SERVER_NAME':'example.com'})
106        self.assertEqual('http://example.com/', request.url)
107        request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'})
108        self.assertEqual('http://example.com:81/', request.url)
109        request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'})
110        self.assertEqual('https://example.com/', request.url)
111        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path',
112                               'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'})
113        self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
114        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th',
115                               'SCRIPT_NAME':'/s p'})
116        self.assertEqual('http://example.com/s%20p/pa%20th', request.url)
117
118    def test_dict_access(self):
119        """ Environ: request objects are environment dicts """
120        e = {}
121        wsgiref.util.setup_testing_defaults(e)
122        request = BaseRequest(e)
123        self.assertEqual(list(request), list(e.keys()))
124        self.assertEqual(len(request), len(e))
125        for k, v in e.items():
126            self.assertTrue(k in request)
127            self.assertEqual(request[k], v)
128            request[k] = 'test'
129            self.assertEqual(request[k], 'test')
130        del request['PATH_INFO']
131        self.assertTrue('PATH_INFO' not in request)
132
133    def test_readonly_environ(self):
134        request = BaseRequest({'bottle.request.readonly':True})
135        def test(): request['x']='y'
136        self.assertRaises(KeyError, test)
137
138    def test_header_access(self):
139        """ Environ: Request objects decode headers """
140        e = {}
141        wsgiref.util.setup_testing_defaults(e)
142        e['HTTP_SOME_HEADER'] = 'some value'
143        request = BaseRequest(e)
144        request['HTTP_SOME_OTHER_HEADER'] = 'some other value'
145        self.assertTrue('Some-Header' in request.headers)
146        self.assertTrue(request.headers['Some-Header'] == 'some value')
147        self.assertTrue(request.headers['Some-Other-Header'] == 'some other value')
148
149    def test_header_access_special(self):
150        e = {}
151        wsgiref.util.setup_testing_defaults(e)
152        request = BaseRequest(e)
153        request['CONTENT_TYPE'] = 'test'
154        request['CONTENT_LENGTH'] = '123'
155        self.assertEqual(request.headers['Content-Type'], 'test')
156        self.assertEqual(request.headers['Content-Length'], '123')
157
158    def test_cookie_dict(self):
159        """ Environ: Cookie dict """
160        t = dict()
161        t['a=a']      = {'a': 'a'}
162        t['a=a; b=b'] = {'a': 'a', 'b':'b'}
163        t['a=a; a=b'] = {'a': 'b'}
164        for k, v in t.items():
165            request = BaseRequest({'HTTP_COOKIE': k})
166            for n in v:
167                self.assertEqual(v[n], request.cookies[n])
168                self.assertEqual(v[n], request.get_cookie(n))
169
170    def test_get(self):
171        """ Environ: GET data """
172        qs = tonat(tob('a=a&a=1&b=b&c=c&cn=%e7%93%b6'), 'latin1')
173        request = BaseRequest({'QUERY_STRING':qs})
174        self.assertTrue('a' in request.query)
175        self.assertTrue('b' in request.query)
176        self.assertEqual(['a','1'], request.query.getall('a'))
177        self.assertEqual(['b'], request.query.getall('b'))
178        self.assertEqual('1', request.query['a'])
179        self.assertEqual('b', request.query['b'])
180        self.assertEqual(tonat(tob('瓶'), 'latin1'), request.query['cn'])
181        self.assertEqual(touni('瓶'), request.query.cn)
182
183    def test_post(self):
184        """ Environ: POST data """
185        sq = tob('a=a&a=1&b=b&c=&d&cn=%e7%93%b6')
186        e = {}
187        wsgiref.util.setup_testing_defaults(e)
188        e['wsgi.input'].write(sq)
189        e['wsgi.input'].seek(0)
190        e['CONTENT_LENGTH'] = str(len(sq))
191        e['REQUEST_METHOD'] = "POST"
192        request = BaseRequest(e)
193        self.assertTrue('a' in request.POST)
194        self.assertTrue('b' in request.POST)
195        self.assertEqual(['a','1'], request.POST.getall('a'))
196        self.assertEqual(['b'], request.POST.getall('b'))
197        self.assertEqual('1', request.POST['a'])
198        self.assertEqual('b', request.POST['b'])
199        self.assertEqual('', request.POST['c'])
200        self.assertEqual('', request.POST['d'])
201        self.assertEqual(tonat(tob('瓶'), 'latin1'), request.POST['cn'])
202        self.assertEqual(touni('瓶'), request.POST.cn)
203
204    def test_bodypost(self):
205        sq = tob('foobar')
206        e = {}
207        wsgiref.util.setup_testing_defaults(e)
208        e['wsgi.input'].write(sq)
209        e['wsgi.input'].seek(0)
210        e['CONTENT_LENGTH'] = str(len(sq))
211        e['REQUEST_METHOD'] = "POST"
212        request = BaseRequest(e)
213        self.assertEqual('', request.POST['foobar'])
214
215    def test_body_noclose(self):
216        """ Test that the body file handler is not closed after request.POST """
217        sq = tob('a=a&a=1&b=b&c=&d')
218        e = {}
219        wsgiref.util.setup_testing_defaults(e)
220        e['wsgi.input'].write(sq)
221        e['wsgi.input'].seek(0)
222        e['CONTENT_LENGTH'] = str(len(sq))
223        e['REQUEST_METHOD'] = "POST"
224        request = BaseRequest(e)
225        self.assertEqual(sq, request.body.read())
226        request.POST # This caused a body.close() with Python 3.x
227        self.assertEqual(sq, request.body.read())
228
229    def test_params(self):
230        """ Environ: GET and POST are combined in request.param """
231        e = {}
232        wsgiref.util.setup_testing_defaults(e)
233        e['wsgi.input'].write(tob('b=b&c=p'))
234        e['wsgi.input'].seek(0)
235        e['CONTENT_LENGTH'] = '7'
236        e['QUERY_STRING'] = 'a=a&c=g'
237        e['REQUEST_METHOD'] = "POST"
238        request = BaseRequest(e)
239        self.assertEqual(['a','b','c'], sorted(request.params.keys()))
240        self.assertEqual('p', request.params['c'])
241
242    def test_getpostleak(self):
243        """ Environ: GET and POST should not leak into each other """
244        e = {}
245        wsgiref.util.setup_testing_defaults(e)
246        e['wsgi.input'].write(tob('b=b'))
247        e['wsgi.input'].seek(0)
248        e['CONTENT_LENGTH'] = '3'
249        e['QUERY_STRING'] = 'a=a'
250        e['REQUEST_METHOD'] = "POST"
251        request = BaseRequest(e)
252        self.assertEqual(['a'], list(request.GET.keys()))
253        self.assertEqual(['b'], list(request.POST.keys()))
254
255    def test_body(self):
256        """ Environ: Request.body should behave like a file object factory """
257        e = {}
258        wsgiref.util.setup_testing_defaults(e)
259        e['wsgi.input'].write(tob('abc'))
260        e['wsgi.input'].seek(0)
261        e['CONTENT_LENGTH'] = str(3)
262        request = BaseRequest(e)
263        self.assertEqual(tob('abc'), request.body.read())
264        self.assertEqual(tob('abc'), request.body.read(3))
265        self.assertEqual(tob('abc'), request.body.readline())
266        self.assertEqual(tob('abc'), request.body.readline(3))
267
268    def test_bigbody(self):
269        """ Environ: Request.body should handle big uploads using files """
270        e = {}
271        wsgiref.util.setup_testing_defaults(e)
272        e['wsgi.input'].write(tob('x')*1024*1000)
273        e['wsgi.input'].seek(0)
274        e['CONTENT_LENGTH'] = str(1024*1000)
275        request = BaseRequest(e)
276        self.assertTrue(hasattr(request.body, 'fileno'))
277        self.assertEqual(1024*1000, len(request.body.read()))
278        self.assertEqual(1024, len(request.body.read(1024)))
279        self.assertEqual(1024*1000, len(request.body.readline()))
280        self.assertEqual(1024, len(request.body.readline(1024)))
281
282    def test_tobigbody(self):
283        """ Environ: Request.body should truncate to Content-Length bytes """
284        e = {}
285        wsgiref.util.setup_testing_defaults(e)
286        e['wsgi.input'].write(tob('x')*1024)
287        e['wsgi.input'].seek(0)
288        e['CONTENT_LENGTH'] = '42'
289        request = BaseRequest(e)
290        self.assertEqual(42, len(request.body.read()))
291        self.assertEqual(42, len(request.body.read(1024)))
292        self.assertEqual(42, len(request.body.readline()))
293        self.assertEqual(42, len(request.body.readline(1024)))
294
295    def _test_chunked(self, body, expect):
296        e = {}
297        wsgiref.util.setup_testing_defaults(e)
298        e['wsgi.input'].write(tob(body))
299        e['wsgi.input'].seek(0)
300        e['HTTP_TRANSFER_ENCODING'] = 'chunked'
301        if isinstance(expect, str):
302            self.assertEquals(tob(expect), BaseRequest(e).body.read())
303        else:
304            self.assertRaises(expect, lambda: BaseRequest(e).body)
305
306    def test_chunked(self):
307        self._test_chunked('1\r\nx\r\nff\r\n' + 'y'*255 + '\r\n0\r\n',
308                           'x' + 'y'*255)
309        self._test_chunked('8\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')
310        self._test_chunked('0\r\n', '')
311
312    def test_chunked_meta_fields(self):
313        self._test_chunked('8 ; foo\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')
314        self._test_chunked('8;foo\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')
315        self._test_chunked('8;foo=bar\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')
316
317    def test_chunked_not_terminated(self):
318        self._test_chunked('1\r\nx\r\n', HTTPError)
319
320    def test_chunked_wrong_size(self):
321        self._test_chunked('2\r\nx\r\n', HTTPError)
322
323    def test_chunked_illegal_size(self):
324        self._test_chunked('x\r\nx\r\n', HTTPError)
325
326    def test_chunked_not_chunked_at_all(self):
327        self._test_chunked('abcdef', HTTPError)
328
329    def test_multipart(self):
330        """ Environ: POST (multipart files and multible values per key) """
331        fields = [('field1','value1'), ('field2','value2'), ('field2','value3')]
332        files = [('file1','filename1.txt','content1'), ('万难','万难foo.py', 'ä\nö\rü')]
333        e = tools.multipart_environ(fields=fields, files=files)
334        request = BaseRequest(e)
335        # File content
336        self.assertTrue('file1' in request.POST)
337        self.assertTrue('file1' in request.files)
338        self.assertTrue('file1' not in request.forms)
339        cmp = tob('content1') if sys.version_info >= (3,2,0) else 'content1'
340        self.assertEqual(cmp, request.POST['file1'].file.read())
341        # File name and meta data
342        self.assertTrue('万难' in request.POST)
343        self.assertTrue('万难' in request.files)
344        self.assertTrue('万难' not in request.forms)
345        self.assertEqual('foo.py', request.POST['万难'].filename)
346        self.assertTrue(request.files['万难'])
347        self.assertFalse(request.files.file77)
348        # UTF-8 files
349        x = request.POST['万难'].file.read()
350        if (3,2,0) > sys.version_info >= (3,0,0):
351            x = x.encode('utf8')
352        self.assertEqual(tob('ä\nö\rü'), x)
353        # No file
354        self.assertTrue('file3' not in request.POST)
355        self.assertTrue('file3' not in request.files)
356        self.assertTrue('file3' not in request.forms)
357        # Field (single)
358        self.assertEqual('value1', request.POST['field1'])
359        self.assertTrue('field1' not in request.files)
360        self.assertEqual('value1', request.forms['field1'])
361        # Field (multi)
362        self.assertEqual(2, len(request.POST.getall('field2')))
363        self.assertEqual(['value2', 'value3'], request.POST.getall('field2'))
364        self.assertEqual(['value2', 'value3'], request.forms.getall('field2'))
365        self.assertTrue('field2' not in request.files)
366
367    def test_json_empty(self):
368        """ Environ: Request.json property with empty body. """
369        self.assertEqual(BaseRequest({}).json, None)
370
371    def test_json_noheader(self):
372        """ Environ: Request.json property with missing content-type header. """
373        test = dict(a=5, b='test', c=[1,2,3])
374        e = {}
375        wsgiref.util.setup_testing_defaults(e)
376        e['wsgi.input'].write(tob(json_dumps(test)))
377        e['wsgi.input'].seek(0)
378        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
379        self.assertEqual(BaseRequest(e).json, None)
380
381    def test_json_tobig(self):
382        """ Environ: Request.json property with huge body. """
383        test = dict(a=5, tobig='x' * bottle.BaseRequest.MEMFILE_MAX)
384        e = {'CONTENT_TYPE': 'application/json'}
385        wsgiref.util.setup_testing_defaults(e)
386        e['wsgi.input'].write(tob(json_dumps(test)))
387        e['wsgi.input'].seek(0)
388        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
389        self.assertRaises(HTTPError, lambda: BaseRequest(e).json)
390
391    def test_json_valid(self):
392        """ Environ: Request.json property. """
393        test = dict(a=5, b='test', c=[1,2,3])
394        e = {'CONTENT_TYPE': 'application/json; charset=UTF-8'}
395        wsgiref.util.setup_testing_defaults(e)
396        e['wsgi.input'].write(tob(json_dumps(test)))
397        e['wsgi.input'].seek(0)
398        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
399        self.assertEqual(BaseRequest(e).json, test)
400
401    def test_json_forged_header_issue616(self):
402        test = dict(a=5, b='test', c=[1,2,3])
403        e = {'CONTENT_TYPE': 'text/plain;application/json'}
404        wsgiref.util.setup_testing_defaults(e)
405        e['wsgi.input'].write(tob(json_dumps(test)))
406        e['wsgi.input'].seek(0)
407        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
408        self.assertEqual(BaseRequest(e).json, None)
409
410    def test_json_header_empty_body(self):
411        """Request Content-Type is application/json but body is empty"""
412        e = {'CONTENT_TYPE': 'application/json'}
413        wsgiref.util.setup_testing_defaults(e)
414        wsgiref.util.setup_testing_defaults(e)
415        e['CONTENT_LENGTH'] = "0"
416        self.assertEqual(BaseRequest(e).json, None)
417
418    def test_isajax(self):
419        e = {}
420        wsgiref.util.setup_testing_defaults(e)
421        self.assertFalse(BaseRequest(e.copy()).is_ajax)
422        e['HTTP_X_REQUESTED_WITH'] = 'XMLHttpRequest'
423        self.assertTrue(BaseRequest(e.copy()).is_ajax)
424
425    def test_auth(self):
426        user, pwd = 'marc', 'secret'
427        basic = touni(base64.b64encode(tob('%s:%s' % (user, pwd))))
428        r = BaseRequest({})
429        self.assertEqual(r.auth, None)
430        r.environ['HTTP_AUTHORIZATION'] = 'basic %s' % basic
431        self.assertEqual(r.auth, (user, pwd))
432        r.environ['REMOTE_USER'] = user
433        self.assertEqual(r.auth, (user, pwd))
434        del r.environ['HTTP_AUTHORIZATION']
435        self.assertEqual(r.auth, (user, None))
436
437    def test_remote_route(self):
438        ips = ['1.2.3.4', '2.3.4.5', '3.4.5.6']
439        r = BaseRequest({})
440        self.assertEqual(r.remote_route, [])
441        r.environ['HTTP_X_FORWARDED_FOR'] = ', '.join(ips)
442        self.assertEqual(r.remote_route, ips)
443        r.environ['REMOTE_ADDR'] = ips[1]
444        self.assertEqual(r.remote_route, ips)
445        del r.environ['HTTP_X_FORWARDED_FOR']
446        self.assertEqual(r.remote_route, [ips[1]])
447
448    def test_remote_addr(self):
449        ips = ['1.2.3.4', '2.3.4.5', '3.4.5.6']
450        r = BaseRequest({})
451        self.assertEqual(r.remote_addr, None)
452        r.environ['HTTP_X_FORWARDED_FOR'] = ', '.join(ips)
453        self.assertEqual(r.remote_addr, ips[0])
454        r.environ['REMOTE_ADDR'] = ips[1]
455        self.assertEqual(r.remote_addr, ips[0])
456        del r.environ['HTTP_X_FORWARDED_FOR']
457        self.assertEqual(r.remote_addr, ips[1])
458
459    def test_user_defined_attributes(self):
460        for cls in (BaseRequest, LocalRequest):
461            r = cls()
462
463            # New attributes go to the environ dict.
464            r.foo = 'somevalue'
465            self.assertEqual(r.foo, 'somevalue')
466            self.assertTrue('somevalue' in r.environ.values())
467
468            # Unknown attributes raise AttributeError.
469            self.assertRaises(AttributeError, getattr, r, 'somevalue')
470
471
472
473class TestResponse(unittest.TestCase):
474
475    def test_constructor_body(self):
476        self.assertEqual('',
477            BaseResponse('').body)
478
479        self.assertEqual('YAY',
480            BaseResponse('YAY').body)
481
482    def test_constructor_status(self):
483        self.assertEqual(200,
484            BaseResponse('YAY', 200).status_code)
485
486        self.assertEqual('200 OK',
487            BaseResponse('YAY', 200).status_line)
488
489        self.assertEqual('200 YAY',
490            BaseResponse('YAY', '200 YAY').status_line)
491
492        self.assertEqual('200 YAY',
493            BaseResponse('YAY', '200 YAY').status_line)
494
495    def test_constructor_headerlist(self):
496        from functools import partial
497        make_res = partial(BaseResponse, '', 200)
498
499        self.assertTrue('yay',
500            make_res([('x-test','yay')])['x-test'])
501
502    def test_constructor_headerlist(self):
503        from functools import partial
504        make_res = partial(BaseResponse, '', 200)
505
506        self.assertTrue('yay', make_res(x_test='yay')['x-test'])
507
508
509    def test_set_status(self):
510        rs = BaseResponse()
511
512        rs.status = 200
513        self.assertEqual(rs.status, rs.status_line)
514        self.assertEqual(rs.status_code, 200)
515        self.assertEqual(rs.status_line, '200 OK')
516
517        rs.status = 999
518        self.assertEqual(rs.status, rs.status_line)
519        self.assertEqual(rs.status_code, 999)
520        self.assertEqual(rs.status_line, '999 Unknown')
521
522        rs.status = 404
523        self.assertEqual(rs.status, rs.status_line)
524        self.assertEqual(rs.status_code, 404)
525        self.assertEqual(rs.status_line, '404 Not Found')
526
527        def test(): rs.status = -200
528        self.assertRaises(ValueError, test)
529        self.assertEqual(rs.status, rs.status_line) # last value
530        self.assertEqual(rs.status_code, 404) # last value
531        self.assertEqual(rs.status_line, '404 Not Found') # last value
532
533        def test(): rs.status = 5
534        self.assertRaises(ValueError, test)
535        self.assertEqual(rs.status, rs.status_line) # last value
536        self.assertEqual(rs.status_code, 404) # last value
537        self.assertEqual(rs.status_line, '404 Not Found') # last value
538
539        rs.status = '999 Who knows?' # Illegal, but acceptable three digit code
540        self.assertEqual(rs.status, rs.status_line)
541        self.assertEqual(rs.status_code, 999)
542        self.assertEqual(rs.status_line, '999 Who knows?')
543
544        rs.status = 555 # Strange code
545        self.assertEqual(rs.status, rs.status_line)
546        self.assertEqual(rs.status_code, 555)
547        self.assertEqual(rs.status_line, '555 Unknown')
548
549        rs.status = '404 Brain not Found' # Custom reason
550        self.assertEqual(rs.status, rs.status_line)
551        self.assertEqual(rs.status_code, 404)
552        self.assertEqual(rs.status_line, '404 Brain not Found')
553
554        def test(): rs.status = '5 Illegal Code'
555        self.assertRaises(ValueError, test)
556        self.assertEqual(rs.status, rs.status_line) # last value
557        self.assertEqual(rs.status_code, 404) # last value
558        self.assertEqual(rs.status_line, '404 Brain not Found') # last value
559
560        def test(): rs.status = '-99 Illegal Code'
561        self.assertRaises(ValueError, test)
562        self.assertEqual(rs.status, rs.status_line) # last value
563        self.assertEqual(rs.status_code, 404) # last value
564        self.assertEqual(rs.status_line, '404 Brain not Found') # last value
565
566        def test(): rs.status = '1000 Illegal Code'
567        self.assertRaises(ValueError, test)
568        self.assertEqual(rs.status, rs.status_line) # last value
569        self.assertEqual(rs.status_code, 404) # last value
570        self.assertEqual(rs.status_line, '404 Brain not Found') # last value
571
572        def test(): rs.status = '555' # No reason
573        self.assertRaises(ValueError, test)
574        self.assertEqual(rs.status, rs.status_line) # last value
575        self.assertEqual(rs.status_code, 404) # last value
576        self.assertEqual(rs.status_line, '404 Brain not Found') # last value
577
578    def test_content_type(self):
579        rs = BaseResponse()
580        rs.content_type = 'test/some'
581        self.assertEquals('test/some', rs.headers.get('Content-Type'))
582
583    def test_charset(self):
584        rs = BaseResponse()
585        self.assertEqual(rs.charset, 'UTF-8')
586        rs.content_type = 'text/html; charset=latin9'
587        self.assertEqual(rs.charset, 'latin9')
588        rs.content_type = 'text/html'
589        self.assertEqual(rs.charset, 'UTF-8')
590
591    def test_set_cookie(self):
592        r = BaseResponse()
593        r.set_cookie('name1', 'value', max_age=5)
594        r.set_cookie('name2', 'value 2', path='/foo')
595        cookies = [value for name, value in r.headerlist
596                   if name.title() == 'Set-Cookie']
597        cookies.sort()
598        self.assertEqual(cookies[0], 'name1=value; Max-Age=5')
599        self.assertEqual(cookies[1], 'name2="value 2"; Path=/foo')
600
601    def test_set_cookie_maxage(self):
602        import datetime
603        r = BaseResponse()
604        r.set_cookie('name1', 'value', max_age=5)
605        r.set_cookie('name2', 'value', max_age=datetime.timedelta(days=1))
606        cookies = sorted([value for name, value in r.headerlist
607                   if name.title() == 'Set-Cookie'])
608        self.assertEqual(cookies[0], 'name1=value; Max-Age=5')
609        self.assertEqual(cookies[1], 'name2=value; Max-Age=86400')
610
611    def test_set_cookie_expires(self):
612        import datetime
613        r = BaseResponse()
614        r.set_cookie('name1', 'value', expires=42)
615        r.set_cookie('name2', 'value', expires=datetime.datetime(1970,1,1,0,0,43))
616        cookies = sorted([value for name, value in r.headerlist
617                   if name.title() == 'Set-Cookie'])
618        self.assertEqual(cookies[0], 'name1=value; expires=Thu, 01 Jan 1970 00:00:42 GMT')
619        self.assertEqual(cookies[1], 'name2=value; expires=Thu, 01 Jan 1970 00:00:43 GMT')
620
621    def test_delete_cookie(self):
622        response = BaseResponse()
623        response.set_cookie('name', 'value')
624        response.delete_cookie('name')
625        cookies = [value for name, value in response.headerlist
626                   if name.title() == 'Set-Cookie']
627        self.assertTrue('name=;' in cookies[0])
628
629    def test_set_header(self):
630        response = BaseResponse()
631        response['x-test'] = 'foo'
632        headers = [value for name, value in response.headerlist
633                   if name.title() == 'X-Test']
634        self.assertEqual(['foo'], headers)
635        self.assertEqual('foo', response['x-test'])
636
637        response['X-Test'] = 'bar'
638        headers = [value for name, value in response.headerlist
639                   if name.title() == 'X-Test']
640        self.assertEqual(['bar'], headers)
641        self.assertEqual('bar', response['x-test'])
642
643    def test_append_header(self):
644        response = BaseResponse()
645        response.set_header('x-test', 'foo')
646        headers = [value for name, value in response.headerlist
647                   if name.title() == 'X-Test']
648        self.assertEqual(['foo'], headers)
649        self.assertEqual('foo', response['x-test'])
650
651        response.add_header('X-Test', 'bar')
652        headers = [value for name, value in response.headerlist
653                   if name.title() == 'X-Test']
654        self.assertEqual(['foo', 'bar'], headers)
655        self.assertEqual('bar', response['x-test'])
656
657    def test_delete_header(self):
658        response = BaseResponse()
659        response['x-test'] = 'foo'
660        self.assertEqual('foo', response['x-test'])
661        del response['X-tESt']
662        self.assertRaises(KeyError, lambda: response['x-test'])
663
664    def test_non_string_header(self):
665        response = BaseResponse()
666        response['x-test'] = 5
667        self.assertEqual('5', response['x-test'])
668        response['x-test'] = None
669        self.assertEqual('None', response['x-test'])
670        response['x-test'] = touni('瓶')
671        self.assertEqual(tonat(touni('瓶')), response['x-test'])
672
673    def test_prevent_control_characters_in_headers(self):
674        masks = '{}test', 'test{}', 'te{}st'
675        tests = '\n', '\r', '\n\r', '\0'
676
677        # Test HeaderDict
678        apis = 'append', 'replace', '__setitem__', 'setdefault'
679        for api, mask, test in product(apis, masks, tests):
680            hd = bottle.HeaderDict()
681            func = getattr(hd, api)
682            value = mask.replace("{}", test)
683            self.assertRaises(ValueError, func, value, "test-value")
684            self.assertRaises(ValueError, func, "test-name", value)
685
686        # Test functions on BaseResponse
687        apis = 'add_header', 'set_header', '__setitem__'
688        for api, mask, test in product(apis, masks, tests):
689            rs = bottle.BaseResponse()
690            func = getattr(rs, api)
691            value = mask.replace("{}", test)
692            self.assertRaises(ValueError, func, value, "test-value")
693            self.assertRaises(ValueError, func, "test-name", value)
694
695    def test_expires_header(self):
696        import datetime
697        response = BaseResponse()
698        now = datetime.datetime.now()
699        response.expires = now
700
701        def seconds(a, b):
702            td = max(a,b) - min(a,b)
703            return td.days*360*24 + td.seconds
704
705        self.assertEqual(0, seconds(response.expires, now))
706        now2 = datetime.datetime.utcfromtimestamp(
707            parse_date(response.headers['Expires']))
708        self.assertEqual(0, seconds(now, now2))
709
710class TestRedirect(unittest.TestCase):
711
712    def assertRedirect(self, target, result, query=None, status=303, **args):
713        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
714        for key in list(args):
715            if key.startswith('wsgi'):
716                args[key.replace('_', '.', 1)] = args[key]
717                del args[key]
718        env.update(args)
719        request.bind(env)
720        bottle.response.bind()
721        try:
722            bottle.redirect(target, **(query or {}))
723        except bottle.HTTPResponse:
724            r = _e()
725            self.assertEqual(status, r.status_code)
726            self.assertTrue(r.headers)
727            self.assertEqual(result, r.headers['Location'])
728
729    def test_absolute_path(self):
730        self.assertRedirect('/', 'http://127.0.0.1/')
731        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html')
732        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html',
733                            PATH_INFO='/some/sub/path/')
734        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html',
735                            PATH_INFO='/some/sub/file.html')
736        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html',
737                            SCRIPT_NAME='/some/sub/path/')
738        self.assertRedirect('/foo/test.html', 'http://127.0.0.1/foo/test.html')
739        self.assertRedirect('/foo/test.html', 'http://127.0.0.1/foo/test.html',
740                            PATH_INFO='/some/sub/file.html')
741
742    def test_relative_path(self):
743        self.assertRedirect('./', 'http://127.0.0.1/')
744        self.assertRedirect('./test.html', 'http://127.0.0.1/test.html')
745        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html',
746                            PATH_INFO='/foo/')
747        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html',
748                            PATH_INFO='/foo/bar.html')
749        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html',
750                            SCRIPT_NAME='/foo/')
751        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/bar/test.html',
752                            SCRIPT_NAME='/foo/', PATH_INFO='/bar/baz.html')
753        self.assertRedirect('./foo/test.html', 'http://127.0.0.1/foo/test.html')
754        self.assertRedirect('./foo/test.html', 'http://127.0.0.1/bar/foo/test.html',
755                            PATH_INFO='/bar/file.html')
756        self.assertRedirect('../test.html', 'http://127.0.0.1/test.html',
757                            PATH_INFO='/foo/')
758        self.assertRedirect('../test.html', 'http://127.0.0.1/foo/test.html',
759                            PATH_INFO='/foo/bar/')
760        self.assertRedirect('../test.html', 'http://127.0.0.1/test.html',
761                            PATH_INFO='/foo/bar.html')
762        self.assertRedirect('../test.html', 'http://127.0.0.1/test.html',
763                            SCRIPT_NAME='/foo/')
764        self.assertRedirect('../test.html', 'http://127.0.0.1/foo/test.html',
765                            SCRIPT_NAME='/foo/', PATH_INFO='/bar/baz.html')
766        self.assertRedirect('../baz/../test.html', 'http://127.0.0.1/foo/test.html',
767                            PATH_INFO='/foo/bar/')
768
769    def test_sheme(self):
770        self.assertRedirect('./test.html', 'https://127.0.0.1/test.html',
771                            wsgi_url_scheme='https')
772        self.assertRedirect('./test.html', 'https://127.0.0.1:80/test.html',
773                            wsgi_url_scheme='https', SERVER_PORT='80')
774
775    def test_host_http_1_0(self):
776        # No HTTP_HOST, just SERVER_NAME and SERVER_PORT.
777        self.assertRedirect('./test.html', 'http://example.com/test.html',
778                            SERVER_NAME='example.com',
779                            SERVER_PROTOCOL='HTTP/1.0', status=302)
780        self.assertRedirect('./test.html', 'http://127.0.0.1:81/test.html',
781                            SERVER_PORT='81',
782                            SERVER_PROTOCOL='HTTP/1.0', status=302)
783
784    def test_host_http_1_1(self):
785        self.assertRedirect('./test.html', 'http://example.com/test.html',
786                            HTTP_HOST='example.com')
787        self.assertRedirect('./test.html', 'http://example.com:81/test.html',
788                            HTTP_HOST='example.com:81')
789        # Trust HTTP_HOST over SERVER_NAME and PORT.
790        self.assertRedirect('./test.html', 'http://example.com:81/test.html',
791                            HTTP_HOST='example.com:81', SERVER_NAME='foobar')
792        self.assertRedirect('./test.html', 'http://example.com:81/test.html',
793                            HTTP_HOST='example.com:81', SERVER_PORT='80')
794
795    def test_host_http_proxy(self):
796        # Trust proxy headers over original header.
797        self.assertRedirect('./test.html', 'http://example.com/test.html',
798                            HTTP_X_FORWARDED_HOST='example.com',
799                            HTTP_HOST='127.0.0.1')
800
801    def test_specialchars(self):
802        ''' The target URL is not quoted automatically. '''
803        self.assertRedirect('./te st.html',
804                            'http://example.com/a%20a/b%20b/te st.html',
805                            HTTP_HOST='example.com', SCRIPT_NAME='/a a/', PATH_INFO='/b b/')
806
807    def test_redirect_preserve_cookies(self):
808        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
809        request.bind(env)
810        bottle.response.bind()
811        try:
812            bottle.response.set_cookie('xxx', 'yyy')
813            bottle.redirect('...')
814        except bottle.HTTPResponse:
815            h = [v for (k, v) in _e().headerlist if k == 'Set-Cookie']
816            self.assertEqual(h, ['xxx=yyy'])
817
818class TestWSGIHeaderDict(unittest.TestCase):
819    def setUp(self):
820        self.env = {}
821        self.headers = bottle.WSGIHeaderDict(self.env)
822
823    def test_empty(self):
824        self.assertEqual(0, len(bottle.WSGIHeaderDict({})))
825
826    def test_native(self):
827        self.env['HTTP_TEST_HEADER'] = 'foobar'
828        self.assertEqual(self.headers['Test-header'], 'foobar')
829
830    def test_bytes(self):
831        self.env['HTTP_TEST_HEADER'] = tob('foobar')
832        self.assertEqual(self.headers['Test-Header'], 'foobar')
833
834    def test_unicode(self):
835        self.env['HTTP_TEST_HEADER'] = touni('foobar')
836        self.assertEqual(self.headers['Test-Header'], 'foobar')
837
838    def test_dict(self):
839        for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
840            self.assertTrue(key not in self.headers)
841            self.assertEqual(self.headers.get(key), None)
842            self.assertEqual(self.headers.get(key, 5), 5)
843            self.assertRaises(KeyError, lambda x: self.headers[x], key)
844        self.env['HTTP_FOO_BAR'] = 'test'
845        for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
846            self.assertTrue(key in self.headers)
847            self.assertEqual(self.headers.get(key), 'test')
848            self.assertEqual(self.headers.get(key, 5), 'test')
849
850
851
852if __name__ == '__main__': #pragma: no cover
853    unittest.main()
854