1# -*- coding: utf-8 -*- 2''' Tests for the BaseRequest and BaseResponse objects and their subclasses. ''' 3 4import unittest 5import sys 6 7import bottle 8from bottle import request, tob, touni, tonat, json_dumps, _e, HTTPError, parse_date 9import tools 10import wsgiref.util 11import base64 12 13from bottle import BaseRequest, BaseResponse, LocalRequest 14 15 16try: 17 from itertools import product 18except ImportError: 19 def product(*args): 20 pools = map(tuple, args) 21 result = [[]] 22 for pool in pools: 23 result = [x + [y] for x in result for y in pool] 24 for prod in result: 25 yield tuple(prod) 26 27class TestRequest(unittest.TestCase): 28 29 def test_app_property(self): 30 e = {} 31 r = BaseRequest(e) 32 self.assertRaises(RuntimeError, lambda: r.app) 33 e.update({'bottle.app': 5}) 34 self.assertEqual(r.app, 5) 35 36 def test_route_property(self): 37 e = {'bottle.route': 5} 38 r = BaseRequest(e) 39 self.assertEqual(r.route, 5) 40 41 def test_url_for_property(self): 42 e = {} 43 r = BaseRequest(e) 44 self.assertRaises(RuntimeError, lambda: r.url_args) 45 e.update({'route.url_args': {'a': 5}}) 46 self.assertEqual(r.url_args, {'a': 5}) 47 48 def test_path(self): 49 """ PATH_INFO normalization. """ 50 # Legal paths 51 tests = [('', '/'), ('x','/x'), ('x/', '/x/'), ('/x', '/x'), ('/x/', '/x/')] 52 for raw, norm in tests: 53 self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path) 54 # Strange paths 55 tests = [('///', '/'), ('//x','/x')] 56 for raw, norm in tests: 57 self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path) 58 # No path at all 59 self.assertEqual('/', BaseRequest({}).path) 60 61 def test_method(self): 62 self.assertEqual(BaseRequest({}).method, 'GET') 63 self.assertEqual(BaseRequest({'REQUEST_METHOD':'GET'}).method, 'GET') 64 self.assertEqual(BaseRequest({'REQUEST_METHOD':'GeT'}).method, 'GET') 65 self.assertEqual(BaseRequest({'REQUEST_METHOD':'get'}).method, 'GET') 66 self.assertEqual(BaseRequest({'REQUEST_METHOD':'POst'}).method, 'POST') 67 self.assertEqual(BaseRequest({'REQUEST_METHOD':'FanTASY'}).method, 'FANTASY') 68 69 def test_script_name(self): 70 """ SCRIPT_NAME normalization. """ 71 # Legal paths 72 tests = [('', '/'), ('x','/x/'), ('x/', '/x/'), ('/x', '/x/'), ('/x/', '/x/')] 73 for raw, norm in tests: 74 self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name) 75 # Strange paths 76 tests = [('///', '/'), ('///x///','/x/')] 77 for raw, norm in tests: 78 self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name) 79 # No path at all 80 self.assertEqual('/', BaseRequest({}).script_name) 81 82 def test_pathshift(self): 83 """ Request.path_shift() """ 84 def test_shift(s, p, c): 85 request = BaseRequest({'SCRIPT_NAME': s, 'PATH_INFO': p}) 86 request.path_shift(c) 87 return [request['SCRIPT_NAME'], request.path] 88 self.assertEqual(['/a/b', '/c/d'], test_shift('/a/b', '/c/d', 0)) 89 self.assertEqual(['/a/b', '/c/d/'], test_shift('/a/b', '/c/d/', 0)) 90 self.assertEqual(['/a/b/c', '/d'], test_shift('/a/b', '/c/d', 1)) 91 self.assertEqual(['/a', '/b/c/d'], test_shift('/a/b', '/c/d', -1)) 92 self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b', '/c/d/', 1)) 93 self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b', '/c/d/', -1)) 94 self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b/', '/c/d/', 1)) 95 self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b/', '/c/d/', -1)) 96 self.assertEqual(['/a/b/c/d', '/'], test_shift('/', '/a/b/c/d', 4)) 97 self.assertEqual(['/', '/a/b/c/d/'], test_shift('/a/b/c/d', '/', -4)) 98 self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', 3) 99 self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', -3) 100 101 def test_url(self): 102 """ Environ: URL building """ 103 request = BaseRequest({'HTTP_HOST':'example.com'}) 104 self.assertEqual('http://example.com/', request.url) 105 request = BaseRequest({'SERVER_NAME':'example.com'}) 106 self.assertEqual('http://example.com/', request.url) 107 request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'}) 108 self.assertEqual('http://example.com:81/', request.url) 109 request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'}) 110 self.assertEqual('https://example.com/', request.url) 111 request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path', 112 'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'}) 113 self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url) 114 request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th', 115 'SCRIPT_NAME':'/s p'}) 116 self.assertEqual('http://example.com/s%20p/pa%20th', request.url) 117 118 def test_dict_access(self): 119 """ Environ: request objects are environment dicts """ 120 e = {} 121 wsgiref.util.setup_testing_defaults(e) 122 request = BaseRequest(e) 123 self.assertEqual(list(request), list(e.keys())) 124 self.assertEqual(len(request), len(e)) 125 for k, v in e.items(): 126 self.assertTrue(k in request) 127 self.assertEqual(request[k], v) 128 request[k] = 'test' 129 self.assertEqual(request[k], 'test') 130 del request['PATH_INFO'] 131 self.assertTrue('PATH_INFO' not in request) 132 133 def test_readonly_environ(self): 134 request = BaseRequest({'bottle.request.readonly':True}) 135 def test(): request['x']='y' 136 self.assertRaises(KeyError, test) 137 138 def test_header_access(self): 139 """ Environ: Request objects decode headers """ 140 e = {} 141 wsgiref.util.setup_testing_defaults(e) 142 e['HTTP_SOME_HEADER'] = 'some value' 143 request = BaseRequest(e) 144 request['HTTP_SOME_OTHER_HEADER'] = 'some other value' 145 self.assertTrue('Some-Header' in request.headers) 146 self.assertTrue(request.headers['Some-Header'] == 'some value') 147 self.assertTrue(request.headers['Some-Other-Header'] == 'some other value') 148 149 def test_header_access_special(self): 150 e = {} 151 wsgiref.util.setup_testing_defaults(e) 152 request = BaseRequest(e) 153 request['CONTENT_TYPE'] = 'test' 154 request['CONTENT_LENGTH'] = '123' 155 self.assertEqual(request.headers['Content-Type'], 'test') 156 self.assertEqual(request.headers['Content-Length'], '123') 157 158 def test_cookie_dict(self): 159 """ Environ: Cookie dict """ 160 t = dict() 161 t['a=a'] = {'a': 'a'} 162 t['a=a; b=b'] = {'a': 'a', 'b':'b'} 163 t['a=a; a=b'] = {'a': 'b'} 164 for k, v in t.items(): 165 request = BaseRequest({'HTTP_COOKIE': k}) 166 for n in v: 167 self.assertEqual(v[n], request.cookies[n]) 168 self.assertEqual(v[n], request.get_cookie(n)) 169 170 def test_get(self): 171 """ Environ: GET data """ 172 qs = tonat(tob('a=a&a=1&b=b&c=c&cn=%e7%93%b6'), 'latin1') 173 request = BaseRequest({'QUERY_STRING':qs}) 174 self.assertTrue('a' in request.query) 175 self.assertTrue('b' in request.query) 176 self.assertEqual(['a','1'], request.query.getall('a')) 177 self.assertEqual(['b'], request.query.getall('b')) 178 self.assertEqual('1', request.query['a']) 179 self.assertEqual('b', request.query['b']) 180 self.assertEqual(tonat(tob('瓶'), 'latin1'), request.query['cn']) 181 self.assertEqual(touni('瓶'), request.query.cn) 182 183 def test_post(self): 184 """ Environ: POST data """ 185 sq = tob('a=a&a=1&b=b&c=&d&cn=%e7%93%b6') 186 e = {} 187 wsgiref.util.setup_testing_defaults(e) 188 e['wsgi.input'].write(sq) 189 e['wsgi.input'].seek(0) 190 e['CONTENT_LENGTH'] = str(len(sq)) 191 e['REQUEST_METHOD'] = "POST" 192 request = BaseRequest(e) 193 self.assertTrue('a' in request.POST) 194 self.assertTrue('b' in request.POST) 195 self.assertEqual(['a','1'], request.POST.getall('a')) 196 self.assertEqual(['b'], request.POST.getall('b')) 197 self.assertEqual('1', request.POST['a']) 198 self.assertEqual('b', request.POST['b']) 199 self.assertEqual('', request.POST['c']) 200 self.assertEqual('', request.POST['d']) 201 self.assertEqual(tonat(tob('瓶'), 'latin1'), request.POST['cn']) 202 self.assertEqual(touni('瓶'), request.POST.cn) 203 204 def test_bodypost(self): 205 sq = tob('foobar') 206 e = {} 207 wsgiref.util.setup_testing_defaults(e) 208 e['wsgi.input'].write(sq) 209 e['wsgi.input'].seek(0) 210 e['CONTENT_LENGTH'] = str(len(sq)) 211 e['REQUEST_METHOD'] = "POST" 212 request = BaseRequest(e) 213 self.assertEqual('', request.POST['foobar']) 214 215 def test_body_noclose(self): 216 """ Test that the body file handler is not closed after request.POST """ 217 sq = tob('a=a&a=1&b=b&c=&d') 218 e = {} 219 wsgiref.util.setup_testing_defaults(e) 220 e['wsgi.input'].write(sq) 221 e['wsgi.input'].seek(0) 222 e['CONTENT_LENGTH'] = str(len(sq)) 223 e['REQUEST_METHOD'] = "POST" 224 request = BaseRequest(e) 225 self.assertEqual(sq, request.body.read()) 226 request.POST # This caused a body.close() with Python 3.x 227 self.assertEqual(sq, request.body.read()) 228 229 def test_params(self): 230 """ Environ: GET and POST are combined in request.param """ 231 e = {} 232 wsgiref.util.setup_testing_defaults(e) 233 e['wsgi.input'].write(tob('b=b&c=p')) 234 e['wsgi.input'].seek(0) 235 e['CONTENT_LENGTH'] = '7' 236 e['QUERY_STRING'] = 'a=a&c=g' 237 e['REQUEST_METHOD'] = "POST" 238 request = BaseRequest(e) 239 self.assertEqual(['a','b','c'], sorted(request.params.keys())) 240 self.assertEqual('p', request.params['c']) 241 242 def test_getpostleak(self): 243 """ Environ: GET and POST should not leak into each other """ 244 e = {} 245 wsgiref.util.setup_testing_defaults(e) 246 e['wsgi.input'].write(tob('b=b')) 247 e['wsgi.input'].seek(0) 248 e['CONTENT_LENGTH'] = '3' 249 e['QUERY_STRING'] = 'a=a' 250 e['REQUEST_METHOD'] = "POST" 251 request = BaseRequest(e) 252 self.assertEqual(['a'], list(request.GET.keys())) 253 self.assertEqual(['b'], list(request.POST.keys())) 254 255 def test_body(self): 256 """ Environ: Request.body should behave like a file object factory """ 257 e = {} 258 wsgiref.util.setup_testing_defaults(e) 259 e['wsgi.input'].write(tob('abc')) 260 e['wsgi.input'].seek(0) 261 e['CONTENT_LENGTH'] = str(3) 262 request = BaseRequest(e) 263 self.assertEqual(tob('abc'), request.body.read()) 264 self.assertEqual(tob('abc'), request.body.read(3)) 265 self.assertEqual(tob('abc'), request.body.readline()) 266 self.assertEqual(tob('abc'), request.body.readline(3)) 267 268 def test_bigbody(self): 269 """ Environ: Request.body should handle big uploads using files """ 270 e = {} 271 wsgiref.util.setup_testing_defaults(e) 272 e['wsgi.input'].write(tob('x')*1024*1000) 273 e['wsgi.input'].seek(0) 274 e['CONTENT_LENGTH'] = str(1024*1000) 275 request = BaseRequest(e) 276 self.assertTrue(hasattr(request.body, 'fileno')) 277 self.assertEqual(1024*1000, len(request.body.read())) 278 self.assertEqual(1024, len(request.body.read(1024))) 279 self.assertEqual(1024*1000, len(request.body.readline())) 280 self.assertEqual(1024, len(request.body.readline(1024))) 281 282 def test_tobigbody(self): 283 """ Environ: Request.body should truncate to Content-Length bytes """ 284 e = {} 285 wsgiref.util.setup_testing_defaults(e) 286 e['wsgi.input'].write(tob('x')*1024) 287 e['wsgi.input'].seek(0) 288 e['CONTENT_LENGTH'] = '42' 289 request = BaseRequest(e) 290 self.assertEqual(42, len(request.body.read())) 291 self.assertEqual(42, len(request.body.read(1024))) 292 self.assertEqual(42, len(request.body.readline())) 293 self.assertEqual(42, len(request.body.readline(1024))) 294 295 def _test_chunked(self, body, expect): 296 e = {} 297 wsgiref.util.setup_testing_defaults(e) 298 e['wsgi.input'].write(tob(body)) 299 e['wsgi.input'].seek(0) 300 e['HTTP_TRANSFER_ENCODING'] = 'chunked' 301 if isinstance(expect, str): 302 self.assertEquals(tob(expect), BaseRequest(e).body.read()) 303 else: 304 self.assertRaises(expect, lambda: BaseRequest(e).body) 305 306 def test_chunked(self): 307 self._test_chunked('1\r\nx\r\nff\r\n' + 'y'*255 + '\r\n0\r\n', 308 'x' + 'y'*255) 309 self._test_chunked('8\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx') 310 self._test_chunked('0\r\n', '') 311 312 def test_chunked_meta_fields(self): 313 self._test_chunked('8 ; foo\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx') 314 self._test_chunked('8;foo\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx') 315 self._test_chunked('8;foo=bar\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx') 316 317 def test_chunked_not_terminated(self): 318 self._test_chunked('1\r\nx\r\n', HTTPError) 319 320 def test_chunked_wrong_size(self): 321 self._test_chunked('2\r\nx\r\n', HTTPError) 322 323 def test_chunked_illegal_size(self): 324 self._test_chunked('x\r\nx\r\n', HTTPError) 325 326 def test_chunked_not_chunked_at_all(self): 327 self._test_chunked('abcdef', HTTPError) 328 329 def test_multipart(self): 330 """ Environ: POST (multipart files and multible values per key) """ 331 fields = [('field1','value1'), ('field2','value2'), ('field2','value3')] 332 files = [('file1','filename1.txt','content1'), ('万难','万难foo.py', 'ä\nö\rü')] 333 e = tools.multipart_environ(fields=fields, files=files) 334 request = BaseRequest(e) 335 # File content 336 self.assertTrue('file1' in request.POST) 337 self.assertTrue('file1' in request.files) 338 self.assertTrue('file1' not in request.forms) 339 cmp = tob('content1') if sys.version_info >= (3,2,0) else 'content1' 340 self.assertEqual(cmp, request.POST['file1'].file.read()) 341 # File name and meta data 342 self.assertTrue('万难' in request.POST) 343 self.assertTrue('万难' in request.files) 344 self.assertTrue('万难' not in request.forms) 345 self.assertEqual('foo.py', request.POST['万难'].filename) 346 self.assertTrue(request.files['万难']) 347 self.assertFalse(request.files.file77) 348 # UTF-8 files 349 x = request.POST['万难'].file.read() 350 if (3,2,0) > sys.version_info >= (3,0,0): 351 x = x.encode('utf8') 352 self.assertEqual(tob('ä\nö\rü'), x) 353 # No file 354 self.assertTrue('file3' not in request.POST) 355 self.assertTrue('file3' not in request.files) 356 self.assertTrue('file3' not in request.forms) 357 # Field (single) 358 self.assertEqual('value1', request.POST['field1']) 359 self.assertTrue('field1' not in request.files) 360 self.assertEqual('value1', request.forms['field1']) 361 # Field (multi) 362 self.assertEqual(2, len(request.POST.getall('field2'))) 363 self.assertEqual(['value2', 'value3'], request.POST.getall('field2')) 364 self.assertEqual(['value2', 'value3'], request.forms.getall('field2')) 365 self.assertTrue('field2' not in request.files) 366 367 def test_json_empty(self): 368 """ Environ: Request.json property with empty body. """ 369 self.assertEqual(BaseRequest({}).json, None) 370 371 def test_json_noheader(self): 372 """ Environ: Request.json property with missing content-type header. """ 373 test = dict(a=5, b='test', c=[1,2,3]) 374 e = {} 375 wsgiref.util.setup_testing_defaults(e) 376 e['wsgi.input'].write(tob(json_dumps(test))) 377 e['wsgi.input'].seek(0) 378 e['CONTENT_LENGTH'] = str(len(json_dumps(test))) 379 self.assertEqual(BaseRequest(e).json, None) 380 381 def test_json_tobig(self): 382 """ Environ: Request.json property with huge body. """ 383 test = dict(a=5, tobig='x' * bottle.BaseRequest.MEMFILE_MAX) 384 e = {'CONTENT_TYPE': 'application/json'} 385 wsgiref.util.setup_testing_defaults(e) 386 e['wsgi.input'].write(tob(json_dumps(test))) 387 e['wsgi.input'].seek(0) 388 e['CONTENT_LENGTH'] = str(len(json_dumps(test))) 389 self.assertRaises(HTTPError, lambda: BaseRequest(e).json) 390 391 def test_json_valid(self): 392 """ Environ: Request.json property. """ 393 test = dict(a=5, b='test', c=[1,2,3]) 394 e = {'CONTENT_TYPE': 'application/json; charset=UTF-8'} 395 wsgiref.util.setup_testing_defaults(e) 396 e['wsgi.input'].write(tob(json_dumps(test))) 397 e['wsgi.input'].seek(0) 398 e['CONTENT_LENGTH'] = str(len(json_dumps(test))) 399 self.assertEqual(BaseRequest(e).json, test) 400 401 def test_json_forged_header_issue616(self): 402 test = dict(a=5, b='test', c=[1,2,3]) 403 e = {'CONTENT_TYPE': 'text/plain;application/json'} 404 wsgiref.util.setup_testing_defaults(e) 405 e['wsgi.input'].write(tob(json_dumps(test))) 406 e['wsgi.input'].seek(0) 407 e['CONTENT_LENGTH'] = str(len(json_dumps(test))) 408 self.assertEqual(BaseRequest(e).json, None) 409 410 def test_json_header_empty_body(self): 411 """Request Content-Type is application/json but body is empty""" 412 e = {'CONTENT_TYPE': 'application/json'} 413 wsgiref.util.setup_testing_defaults(e) 414 wsgiref.util.setup_testing_defaults(e) 415 e['CONTENT_LENGTH'] = "0" 416 self.assertEqual(BaseRequest(e).json, None) 417 418 def test_isajax(self): 419 e = {} 420 wsgiref.util.setup_testing_defaults(e) 421 self.assertFalse(BaseRequest(e.copy()).is_ajax) 422 e['HTTP_X_REQUESTED_WITH'] = 'XMLHttpRequest' 423 self.assertTrue(BaseRequest(e.copy()).is_ajax) 424 425 def test_auth(self): 426 user, pwd = 'marc', 'secret' 427 basic = touni(base64.b64encode(tob('%s:%s' % (user, pwd)))) 428 r = BaseRequest({}) 429 self.assertEqual(r.auth, None) 430 r.environ['HTTP_AUTHORIZATION'] = 'basic %s' % basic 431 self.assertEqual(r.auth, (user, pwd)) 432 r.environ['REMOTE_USER'] = user 433 self.assertEqual(r.auth, (user, pwd)) 434 del r.environ['HTTP_AUTHORIZATION'] 435 self.assertEqual(r.auth, (user, None)) 436 437 def test_remote_route(self): 438 ips = ['1.2.3.4', '2.3.4.5', '3.4.5.6'] 439 r = BaseRequest({}) 440 self.assertEqual(r.remote_route, []) 441 r.environ['HTTP_X_FORWARDED_FOR'] = ', '.join(ips) 442 self.assertEqual(r.remote_route, ips) 443 r.environ['REMOTE_ADDR'] = ips[1] 444 self.assertEqual(r.remote_route, ips) 445 del r.environ['HTTP_X_FORWARDED_FOR'] 446 self.assertEqual(r.remote_route, [ips[1]]) 447 448 def test_remote_addr(self): 449 ips = ['1.2.3.4', '2.3.4.5', '3.4.5.6'] 450 r = BaseRequest({}) 451 self.assertEqual(r.remote_addr, None) 452 r.environ['HTTP_X_FORWARDED_FOR'] = ', '.join(ips) 453 self.assertEqual(r.remote_addr, ips[0]) 454 r.environ['REMOTE_ADDR'] = ips[1] 455 self.assertEqual(r.remote_addr, ips[0]) 456 del r.environ['HTTP_X_FORWARDED_FOR'] 457 self.assertEqual(r.remote_addr, ips[1]) 458 459 def test_user_defined_attributes(self): 460 for cls in (BaseRequest, LocalRequest): 461 r = cls() 462 463 # New attributes go to the environ dict. 464 r.foo = 'somevalue' 465 self.assertEqual(r.foo, 'somevalue') 466 self.assertTrue('somevalue' in r.environ.values()) 467 468 # Unknown attributes raise AttributeError. 469 self.assertRaises(AttributeError, getattr, r, 'somevalue') 470 471 472 473class TestResponse(unittest.TestCase): 474 475 def test_constructor_body(self): 476 self.assertEqual('', 477 BaseResponse('').body) 478 479 self.assertEqual('YAY', 480 BaseResponse('YAY').body) 481 482 def test_constructor_status(self): 483 self.assertEqual(200, 484 BaseResponse('YAY', 200).status_code) 485 486 self.assertEqual('200 OK', 487 BaseResponse('YAY', 200).status_line) 488 489 self.assertEqual('200 YAY', 490 BaseResponse('YAY', '200 YAY').status_line) 491 492 self.assertEqual('200 YAY', 493 BaseResponse('YAY', '200 YAY').status_line) 494 495 def test_constructor_headerlist(self): 496 from functools import partial 497 make_res = partial(BaseResponse, '', 200) 498 499 self.assertTrue('yay', 500 make_res([('x-test','yay')])['x-test']) 501 502 def test_constructor_headerlist(self): 503 from functools import partial 504 make_res = partial(BaseResponse, '', 200) 505 506 self.assertTrue('yay', make_res(x_test='yay')['x-test']) 507 508 509 def test_set_status(self): 510 rs = BaseResponse() 511 512 rs.status = 200 513 self.assertEqual(rs.status, rs.status_line) 514 self.assertEqual(rs.status_code, 200) 515 self.assertEqual(rs.status_line, '200 OK') 516 517 rs.status = 999 518 self.assertEqual(rs.status, rs.status_line) 519 self.assertEqual(rs.status_code, 999) 520 self.assertEqual(rs.status_line, '999 Unknown') 521 522 rs.status = 404 523 self.assertEqual(rs.status, rs.status_line) 524 self.assertEqual(rs.status_code, 404) 525 self.assertEqual(rs.status_line, '404 Not Found') 526 527 def test(): rs.status = -200 528 self.assertRaises(ValueError, test) 529 self.assertEqual(rs.status, rs.status_line) # last value 530 self.assertEqual(rs.status_code, 404) # last value 531 self.assertEqual(rs.status_line, '404 Not Found') # last value 532 533 def test(): rs.status = 5 534 self.assertRaises(ValueError, test) 535 self.assertEqual(rs.status, rs.status_line) # last value 536 self.assertEqual(rs.status_code, 404) # last value 537 self.assertEqual(rs.status_line, '404 Not Found') # last value 538 539 rs.status = '999 Who knows?' # Illegal, but acceptable three digit code 540 self.assertEqual(rs.status, rs.status_line) 541 self.assertEqual(rs.status_code, 999) 542 self.assertEqual(rs.status_line, '999 Who knows?') 543 544 rs.status = 555 # Strange code 545 self.assertEqual(rs.status, rs.status_line) 546 self.assertEqual(rs.status_code, 555) 547 self.assertEqual(rs.status_line, '555 Unknown') 548 549 rs.status = '404 Brain not Found' # Custom reason 550 self.assertEqual(rs.status, rs.status_line) 551 self.assertEqual(rs.status_code, 404) 552 self.assertEqual(rs.status_line, '404 Brain not Found') 553 554 def test(): rs.status = '5 Illegal Code' 555 self.assertRaises(ValueError, test) 556 self.assertEqual(rs.status, rs.status_line) # last value 557 self.assertEqual(rs.status_code, 404) # last value 558 self.assertEqual(rs.status_line, '404 Brain not Found') # last value 559 560 def test(): rs.status = '-99 Illegal Code' 561 self.assertRaises(ValueError, test) 562 self.assertEqual(rs.status, rs.status_line) # last value 563 self.assertEqual(rs.status_code, 404) # last value 564 self.assertEqual(rs.status_line, '404 Brain not Found') # last value 565 566 def test(): rs.status = '1000 Illegal Code' 567 self.assertRaises(ValueError, test) 568 self.assertEqual(rs.status, rs.status_line) # last value 569 self.assertEqual(rs.status_code, 404) # last value 570 self.assertEqual(rs.status_line, '404 Brain not Found') # last value 571 572 def test(): rs.status = '555' # No reason 573 self.assertRaises(ValueError, test) 574 self.assertEqual(rs.status, rs.status_line) # last value 575 self.assertEqual(rs.status_code, 404) # last value 576 self.assertEqual(rs.status_line, '404 Brain not Found') # last value 577 578 def test_content_type(self): 579 rs = BaseResponse() 580 rs.content_type = 'test/some' 581 self.assertEquals('test/some', rs.headers.get('Content-Type')) 582 583 def test_charset(self): 584 rs = BaseResponse() 585 self.assertEqual(rs.charset, 'UTF-8') 586 rs.content_type = 'text/html; charset=latin9' 587 self.assertEqual(rs.charset, 'latin9') 588 rs.content_type = 'text/html' 589 self.assertEqual(rs.charset, 'UTF-8') 590 591 def test_set_cookie(self): 592 r = BaseResponse() 593 r.set_cookie('name1', 'value', max_age=5) 594 r.set_cookie('name2', 'value 2', path='/foo') 595 cookies = [value for name, value in r.headerlist 596 if name.title() == 'Set-Cookie'] 597 cookies.sort() 598 self.assertEqual(cookies[0], 'name1=value; Max-Age=5') 599 self.assertEqual(cookies[1], 'name2="value 2"; Path=/foo') 600 601 def test_set_cookie_maxage(self): 602 import datetime 603 r = BaseResponse() 604 r.set_cookie('name1', 'value', max_age=5) 605 r.set_cookie('name2', 'value', max_age=datetime.timedelta(days=1)) 606 cookies = sorted([value for name, value in r.headerlist 607 if name.title() == 'Set-Cookie']) 608 self.assertEqual(cookies[0], 'name1=value; Max-Age=5') 609 self.assertEqual(cookies[1], 'name2=value; Max-Age=86400') 610 611 def test_set_cookie_expires(self): 612 import datetime 613 r = BaseResponse() 614 r.set_cookie('name1', 'value', expires=42) 615 r.set_cookie('name2', 'value', expires=datetime.datetime(1970,1,1,0,0,43)) 616 cookies = sorted([value for name, value in r.headerlist 617 if name.title() == 'Set-Cookie']) 618 self.assertEqual(cookies[0], 'name1=value; expires=Thu, 01 Jan 1970 00:00:42 GMT') 619 self.assertEqual(cookies[1], 'name2=value; expires=Thu, 01 Jan 1970 00:00:43 GMT') 620 621 def test_delete_cookie(self): 622 response = BaseResponse() 623 response.set_cookie('name', 'value') 624 response.delete_cookie('name') 625 cookies = [value for name, value in response.headerlist 626 if name.title() == 'Set-Cookie'] 627 self.assertTrue('name=;' in cookies[0]) 628 629 def test_set_header(self): 630 response = BaseResponse() 631 response['x-test'] = 'foo' 632 headers = [value for name, value in response.headerlist 633 if name.title() == 'X-Test'] 634 self.assertEqual(['foo'], headers) 635 self.assertEqual('foo', response['x-test']) 636 637 response['X-Test'] = 'bar' 638 headers = [value for name, value in response.headerlist 639 if name.title() == 'X-Test'] 640 self.assertEqual(['bar'], headers) 641 self.assertEqual('bar', response['x-test']) 642 643 def test_append_header(self): 644 response = BaseResponse() 645 response.set_header('x-test', 'foo') 646 headers = [value for name, value in response.headerlist 647 if name.title() == 'X-Test'] 648 self.assertEqual(['foo'], headers) 649 self.assertEqual('foo', response['x-test']) 650 651 response.add_header('X-Test', 'bar') 652 headers = [value for name, value in response.headerlist 653 if name.title() == 'X-Test'] 654 self.assertEqual(['foo', 'bar'], headers) 655 self.assertEqual('bar', response['x-test']) 656 657 def test_delete_header(self): 658 response = BaseResponse() 659 response['x-test'] = 'foo' 660 self.assertEqual('foo', response['x-test']) 661 del response['X-tESt'] 662 self.assertRaises(KeyError, lambda: response['x-test']) 663 664 def test_non_string_header(self): 665 response = BaseResponse() 666 response['x-test'] = 5 667 self.assertEqual('5', response['x-test']) 668 response['x-test'] = None 669 self.assertEqual('None', response['x-test']) 670 response['x-test'] = touni('瓶') 671 self.assertEqual(tonat(touni('瓶')), response['x-test']) 672 673 def test_prevent_control_characters_in_headers(self): 674 masks = '{}test', 'test{}', 'te{}st' 675 tests = '\n', '\r', '\n\r', '\0' 676 677 # Test HeaderDict 678 apis = 'append', 'replace', '__setitem__', 'setdefault' 679 for api, mask, test in product(apis, masks, tests): 680 hd = bottle.HeaderDict() 681 func = getattr(hd, api) 682 value = mask.replace("{}", test) 683 self.assertRaises(ValueError, func, value, "test-value") 684 self.assertRaises(ValueError, func, "test-name", value) 685 686 # Test functions on BaseResponse 687 apis = 'add_header', 'set_header', '__setitem__' 688 for api, mask, test in product(apis, masks, tests): 689 rs = bottle.BaseResponse() 690 func = getattr(rs, api) 691 value = mask.replace("{}", test) 692 self.assertRaises(ValueError, func, value, "test-value") 693 self.assertRaises(ValueError, func, "test-name", value) 694 695 def test_expires_header(self): 696 import datetime 697 response = BaseResponse() 698 now = datetime.datetime.now() 699 response.expires = now 700 701 def seconds(a, b): 702 td = max(a,b) - min(a,b) 703 return td.days*360*24 + td.seconds 704 705 self.assertEqual(0, seconds(response.expires, now)) 706 now2 = datetime.datetime.utcfromtimestamp( 707 parse_date(response.headers['Expires'])) 708 self.assertEqual(0, seconds(now, now2)) 709 710class TestRedirect(unittest.TestCase): 711 712 def assertRedirect(self, target, result, query=None, status=303, **args): 713 env = {'SERVER_PROTOCOL':'HTTP/1.1'} 714 for key in list(args): 715 if key.startswith('wsgi'): 716 args[key.replace('_', '.', 1)] = args[key] 717 del args[key] 718 env.update(args) 719 request.bind(env) 720 bottle.response.bind() 721 try: 722 bottle.redirect(target, **(query or {})) 723 except bottle.HTTPResponse: 724 r = _e() 725 self.assertEqual(status, r.status_code) 726 self.assertTrue(r.headers) 727 self.assertEqual(result, r.headers['Location']) 728 729 def test_absolute_path(self): 730 self.assertRedirect('/', 'http://127.0.0.1/') 731 self.assertRedirect('/test.html', 'http://127.0.0.1/test.html') 732 self.assertRedirect('/test.html', 'http://127.0.0.1/test.html', 733 PATH_INFO='/some/sub/path/') 734 self.assertRedirect('/test.html', 'http://127.0.0.1/test.html', 735 PATH_INFO='/some/sub/file.html') 736 self.assertRedirect('/test.html', 'http://127.0.0.1/test.html', 737 SCRIPT_NAME='/some/sub/path/') 738 self.assertRedirect('/foo/test.html', 'http://127.0.0.1/foo/test.html') 739 self.assertRedirect('/foo/test.html', 'http://127.0.0.1/foo/test.html', 740 PATH_INFO='/some/sub/file.html') 741 742 def test_relative_path(self): 743 self.assertRedirect('./', 'http://127.0.0.1/') 744 self.assertRedirect('./test.html', 'http://127.0.0.1/test.html') 745 self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html', 746 PATH_INFO='/foo/') 747 self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html', 748 PATH_INFO='/foo/bar.html') 749 self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html', 750 SCRIPT_NAME='/foo/') 751 self.assertRedirect('./test.html', 'http://127.0.0.1/foo/bar/test.html', 752 SCRIPT_NAME='/foo/', PATH_INFO='/bar/baz.html') 753 self.assertRedirect('./foo/test.html', 'http://127.0.0.1/foo/test.html') 754 self.assertRedirect('./foo/test.html', 'http://127.0.0.1/bar/foo/test.html', 755 PATH_INFO='/bar/file.html') 756 self.assertRedirect('../test.html', 'http://127.0.0.1/test.html', 757 PATH_INFO='/foo/') 758 self.assertRedirect('../test.html', 'http://127.0.0.1/foo/test.html', 759 PATH_INFO='/foo/bar/') 760 self.assertRedirect('../test.html', 'http://127.0.0.1/test.html', 761 PATH_INFO='/foo/bar.html') 762 self.assertRedirect('../test.html', 'http://127.0.0.1/test.html', 763 SCRIPT_NAME='/foo/') 764 self.assertRedirect('../test.html', 'http://127.0.0.1/foo/test.html', 765 SCRIPT_NAME='/foo/', PATH_INFO='/bar/baz.html') 766 self.assertRedirect('../baz/../test.html', 'http://127.0.0.1/foo/test.html', 767 PATH_INFO='/foo/bar/') 768 769 def test_sheme(self): 770 self.assertRedirect('./test.html', 'https://127.0.0.1/test.html', 771 wsgi_url_scheme='https') 772 self.assertRedirect('./test.html', 'https://127.0.0.1:80/test.html', 773 wsgi_url_scheme='https', SERVER_PORT='80') 774 775 def test_host_http_1_0(self): 776 # No HTTP_HOST, just SERVER_NAME and SERVER_PORT. 777 self.assertRedirect('./test.html', 'http://example.com/test.html', 778 SERVER_NAME='example.com', 779 SERVER_PROTOCOL='HTTP/1.0', status=302) 780 self.assertRedirect('./test.html', 'http://127.0.0.1:81/test.html', 781 SERVER_PORT='81', 782 SERVER_PROTOCOL='HTTP/1.0', status=302) 783 784 def test_host_http_1_1(self): 785 self.assertRedirect('./test.html', 'http://example.com/test.html', 786 HTTP_HOST='example.com') 787 self.assertRedirect('./test.html', 'http://example.com:81/test.html', 788 HTTP_HOST='example.com:81') 789 # Trust HTTP_HOST over SERVER_NAME and PORT. 790 self.assertRedirect('./test.html', 'http://example.com:81/test.html', 791 HTTP_HOST='example.com:81', SERVER_NAME='foobar') 792 self.assertRedirect('./test.html', 'http://example.com:81/test.html', 793 HTTP_HOST='example.com:81', SERVER_PORT='80') 794 795 def test_host_http_proxy(self): 796 # Trust proxy headers over original header. 797 self.assertRedirect('./test.html', 'http://example.com/test.html', 798 HTTP_X_FORWARDED_HOST='example.com', 799 HTTP_HOST='127.0.0.1') 800 801 def test_specialchars(self): 802 ''' The target URL is not quoted automatically. ''' 803 self.assertRedirect('./te st.html', 804 'http://example.com/a%20a/b%20b/te st.html', 805 HTTP_HOST='example.com', SCRIPT_NAME='/a a/', PATH_INFO='/b b/') 806 807 def test_redirect_preserve_cookies(self): 808 env = {'SERVER_PROTOCOL':'HTTP/1.1'} 809 request.bind(env) 810 bottle.response.bind() 811 try: 812 bottle.response.set_cookie('xxx', 'yyy') 813 bottle.redirect('...') 814 except bottle.HTTPResponse: 815 h = [v for (k, v) in _e().headerlist if k == 'Set-Cookie'] 816 self.assertEqual(h, ['xxx=yyy']) 817 818class TestWSGIHeaderDict(unittest.TestCase): 819 def setUp(self): 820 self.env = {} 821 self.headers = bottle.WSGIHeaderDict(self.env) 822 823 def test_empty(self): 824 self.assertEqual(0, len(bottle.WSGIHeaderDict({}))) 825 826 def test_native(self): 827 self.env['HTTP_TEST_HEADER'] = 'foobar' 828 self.assertEqual(self.headers['Test-header'], 'foobar') 829 830 def test_bytes(self): 831 self.env['HTTP_TEST_HEADER'] = tob('foobar') 832 self.assertEqual(self.headers['Test-Header'], 'foobar') 833 834 def test_unicode(self): 835 self.env['HTTP_TEST_HEADER'] = touni('foobar') 836 self.assertEqual(self.headers['Test-Header'], 'foobar') 837 838 def test_dict(self): 839 for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split(): 840 self.assertTrue(key not in self.headers) 841 self.assertEqual(self.headers.get(key), None) 842 self.assertEqual(self.headers.get(key, 5), 5) 843 self.assertRaises(KeyError, lambda x: self.headers[x], key) 844 self.env['HTTP_FOO_BAR'] = 'test' 845 for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split(): 846 self.assertTrue(key in self.headers) 847 self.assertEqual(self.headers.get(key), 'test') 848 self.assertEqual(self.headers.get(key, 5), 'test') 849 850 851 852if __name__ == '__main__': #pragma: no cover 853 unittest.main() 854