1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2010-2021 Edgewall Software 4# All rights reserved. 5# 6# This software is licensed as described in the file COPYING, which 7# you should have received as part of this distribution. The terms 8# are also available at https://trac.edgewall.org/wiki/TracLicense. 9# 10# This software consists of voluntary contributions made by many 11# individuals. For the exact contribution history, see the revision 12# history and logs, available at https://trac.edgewall.org/log/. 13 14import io 15import os.path 16import re 17import textwrap 18import unittest 19 20import trac.env 21from trac.config import ConfigurationError 22from trac.core import Component, TracError, implements 23from trac.db.api import DatabaseManager 24from trac.perm import PermissionError, PermissionSystem 25from trac.resource import ResourceNotFound 26from trac.test import EnvironmentStub, MockRequest, mkdtemp 27from trac.util import create_file 28from trac.web.api import (HTTPForbidden, HTTPInternalServerError, 29 HTTPNotFound, IRequestFilter, IRequestHandler, RequestDone) 30from trac.web.auth import IAuthenticator 31from trac.web.main import FakeSession, RequestDispatcher, Session, \ 32 dispatch_request, get_environments 33 34 35class TestStubRequestHandler(Component): 36 37 implements(IRequestHandler) 38 39 filename = 'test_stub.html' 40 41 template = textwrap.dedent("""\ 42 <!DOCTYPE html> 43 <html> 44 <body> 45 <h1>${greeting}</h1> 46 </body> 47 </html> 48 """) 49 50 def match_request(self, req): 51 return req.path_info == '/test-stub' 52 53 def process_request(self, req): 54 return self.filename, {'greeting': 'Hello World'} 55 56 57class AuthenticateTestCase(unittest.TestCase): 58 59 authenticators = {} 60 request_handlers = [] 61 62 @classmethod 63 def setUpClass(cls): 64 class UnsuccessfulAuthenticator(Component): 65 implements(IAuthenticator) 66 def authenticate(self, req): 67 return None 68 69 class RaisingAuthenticator(Component): 70 implements(IAuthenticator) 71 def authenticate(self, req): 72 raise TracError("Bad attempt") 73 74 class SuccessfulAuthenticator1(Component): 75 implements(IAuthenticator) 76 def authenticate(self, req): 77 return 'user1' 78 79 class SuccessfulAuthenticator2(Component): 80 implements(IAuthenticator) 81 def authenticate(self, req): 82 return 'user2' 83 84 class AuthenticateRequestHandler(Component): 85 implements(IRequestHandler) 86 def __init__(self): 87 self.calls = 0 88 def match_request(self, req): 89 return bool(req.perm) 90 def process_request(self, req): 91 self.calls += 1 92 req.authname 93 req.send('') 94 95 cls.authenticators['success1'] = SuccessfulAuthenticator1 96 cls.authenticators['success2'] = SuccessfulAuthenticator2 97 cls.authenticators['unsuccess'] = UnsuccessfulAuthenticator 98 cls.authenticators['raising'] = RaisingAuthenticator 99 cls.request_handlers = [AuthenticateRequestHandler] 100 101 @classmethod 102 def tearDownClass(cls): 103 from trac.core import ComponentMeta 104 for component in list(cls.authenticators.values()) + \ 105 cls.request_handlers: 106 ComponentMeta.deregister(component) 107 108 def setUp(self): 109 self.env = EnvironmentStub(enable=('trac.web.main.*',)) 110 self.req = MockRequest(self.env) 111 self.request_dispatcher = RequestDispatcher(self.env) 112 113 def test_authenticate_returns_first_successful(self): 114 self.env.enable_component(self.authenticators['success1']) 115 self.env.enable_component(self.authenticators['success2']) 116 self.assertEqual(2, len(self.request_dispatcher.authenticators)) 117 self.assertIsInstance(self.request_dispatcher.authenticators[0], 118 self.authenticators['success1']) 119 self.assertIsInstance(self.request_dispatcher.authenticators[1], 120 self.authenticators['success2']) 121 self.assertEqual('user1', 122 self.request_dispatcher.authenticate(self.req)) 123 124 def test_authenticate_skips_unsuccessful(self): 125 self.env.enable_component(self.authenticators['unsuccess']) 126 self.env.enable_component(self.authenticators['success1']) 127 self.assertEqual(2, len(self.request_dispatcher.authenticators)) 128 self.assertIsInstance(self.request_dispatcher.authenticators[0], 129 self.authenticators['unsuccess']) 130 self.assertIsInstance(self.request_dispatcher.authenticators[1], 131 self.authenticators['success1']) 132 self.assertEqual('user1', 133 self.request_dispatcher.authenticate(self.req)) 134 135 def test_authenticate_raises(self): 136 self.env.enable_component(self.authenticators['raising']) 137 self.env.enable_component(self.authenticators['success1']) 138 self.assertEqual(2, len(self.request_dispatcher.authenticators)) 139 self.assertIsInstance(self.request_dispatcher.authenticators[0], 140 self.authenticators['raising']) 141 self.assertIsInstance(self.request_dispatcher.authenticators[1], 142 self.authenticators['success1']) 143 self.assertEqual('anonymous', 144 self.request_dispatcher.authenticate(self.req)) 145 self.assertEqual(1, len(self.req.chrome['warnings'])) 146 expected = "Can't authenticate using RaisingAuthenticator: " 147 for level, message in self.env.log_messages: 148 if expected in message.split('\n'): 149 self.assertEqual('ERROR', level) 150 break 151 else: 152 self.fail("Expected log message not found: \"%s\"" % expected) 153 154 def test_authenticate_once(self): 155 self.env.enable_component(self.authenticators['success1']) 156 self.env.enable_component(self.request_handlers[0]) 157 self.env.config.set('trac', 'default_handler', 158 'AuthenticateRequestHandler') 159 self.request_dispatcher.set_default_callbacks(self.req) 160 161 with self.assertRaises(RequestDone): 162 self.request_dispatcher.dispatch(self.req) 163 164 self.assertEqual(1, len(self.request_dispatcher.authenticators)) 165 self.assertEqual(1, len(self.request_dispatcher.handlers)) 166 self.assertEqual(1, self.request_dispatcher.handlers[0].calls) 167 168 169class EnvironmentsTestCase(unittest.TestCase): 170 171 dirs = ('mydir1', 'mydir2', '.hidden_dir') 172 files = ('myfile1', 'myfile2', '.dot_file') 173 174 def setUp(self): 175 self.parent_dir = mkdtemp() 176 self.tracignore = os.path.join(self.parent_dir, '.tracignore') 177 for dname in self.dirs: 178 os.mkdir(os.path.join(self.parent_dir, dname)) 179 for fname in self.files: 180 create_file(os.path.join(self.parent_dir, fname)) 181 self.environ = { 182 'trac.env_paths': [], 183 'trac.env_parent_dir': self.parent_dir, 184 } 185 186 def tearDown(self): 187 for fname in self.files: 188 os.unlink(os.path.join(self.parent_dir, fname)) 189 for dname in self.dirs: 190 os.rmdir(os.path.join(self.parent_dir, dname)) 191 if os.path.exists(self.tracignore): 192 os.unlink(self.tracignore) 193 os.rmdir(self.parent_dir) 194 195 def env_paths(self, projects): 196 return {project: os.path.normpath(os.path.join(self.parent_dir, 197 project)) 198 for project in projects} 199 200 def test_default_tracignore(self): 201 self.assertEqual(self.env_paths(['mydir1', 'mydir2']), 202 get_environments(self.environ)) 203 204 def test_empty_tracignore(self): 205 create_file(self.tracignore) 206 self.assertEqual(self.env_paths(['mydir1', 'mydir2', '.hidden_dir']), 207 get_environments(self.environ)) 208 209 def test_qmark_pattern_tracignore(self): 210 create_file(self.tracignore, 'mydir?') 211 self.assertEqual(self.env_paths(['.hidden_dir']), 212 get_environments(self.environ)) 213 214 def test_star_pattern_tracignore(self): 215 create_file(self.tracignore, 'my*\n.hidden_dir') 216 self.assertEqual({}, get_environments(self.environ)) 217 218 def test_combined_tracignore(self): 219 create_file(self.tracignore, 'my*i?1\n\n#mydir2') 220 self.assertEqual(self.env_paths(['mydir2', '.hidden_dir']), 221 get_environments(self.environ)) 222 223 224class PreProcessRequestTestCase(unittest.TestCase): 225 226 components = [] 227 228 @classmethod 229 def setUpClass(cls): 230 231 class DefaultHandler(Component): 232 implements(IRequestHandler) 233 def match_request(self, req): 234 return True 235 def process_request(self, req): 236 pass 237 238 class RequestFilter(Component): 239 implements(IRequestFilter) 240 def pre_process_request(self, req, handler): 241 raise TracError("Raised in pre_process_request") 242 def post_process_request(self, req, template, data, metadata): 243 return template, data, metadata 244 245 cls.components = [DefaultHandler, RequestFilter] 246 247 @classmethod 248 def tearDownClass(cls): 249 from trac.core import ComponentMeta 250 for component in cls.components: 251 ComponentMeta.deregister(component) 252 253 def setUp(self): 254 self.env = EnvironmentStub(enable=['trac.web.*'] + 255 self.components) 256 self.env.config.set('trac', 'default_handler', 'DefaultHandler') 257 258 def test_trac_error_raises_http_internal_server_error(self): 259 """TracError in pre_process_request is trapped and an 260 HTTPInternalServerError is raised. 261 """ 262 req = MockRequest(self.env) 263 264 try: 265 RequestDispatcher(self.env).dispatch(req) 266 except HTTPInternalServerError as e: 267 self.assertEqual("500 Trac Error (Raised in pre_process_request)", 268 str(e)) 269 else: 270 self.fail("HTTPInternalServerError not raised") 271 272 273class ProcessRequestTestCase(unittest.TestCase): 274 275 request_handlers = [] 276 277 @classmethod 278 def setUpClass(cls): 279 280 class DefaultHandler(Component): 281 implements(IRequestHandler) 282 def match_request(self, req): 283 return True 284 def process_request(self, req): 285 raise req.exc_class("Raised in process_request") 286 287 cls.request_handlers = [DefaultHandler] 288 289 @classmethod 290 def tearDownClass(cls): 291 from trac.core import ComponentMeta 292 for component in cls.request_handlers: 293 ComponentMeta.deregister(component) 294 295 def setUp(self): 296 self.env = EnvironmentStub(enable=['trac.web.*'] + 297 self.request_handlers) 298 self.env.config.set('trac', 'default_handler', 'DefaultHandler') 299 300 def test_permission_error_raises_http_forbidden(self): 301 """TracError in process_request is trapped and an HTTPForbidden 302 error is raised. 303 """ 304 req = MockRequest(self.env) 305 req.exc_class = PermissionError 306 307 try: 308 RequestDispatcher(self.env).dispatch(req) 309 except HTTPForbidden as e: 310 self.assertEqual( 311 "403 Forbidden (Raised in process_request " 312 "privileges are required to perform this operation. You " 313 "don't have the required permissions.)", str(e)) 314 else: 315 self.fail("HTTPForbidden not raised") 316 317 def test_resource_not_found_raises_http_not_found(self): 318 """ResourceNotFound error in process_request is trapped and an 319 HTTPNotFound error is raised. 320 """ 321 req = MockRequest(self.env) 322 req.exc_class = ResourceNotFound 323 324 try: 325 RequestDispatcher(self.env).dispatch(req) 326 except HTTPNotFound as e: 327 self.assertEqual("404 Trac Error (Raised in process_request)", 328 str(e)) 329 else: 330 self.fail("HTTPNotFound not raised") 331 332 def test_trac_error_raises_http_internal_server_error(self): 333 """TracError in process_request is trapped and an 334 HTTPInternalServerError is raised. 335 """ 336 req = MockRequest(self.env) 337 req.exc_class = TracError 338 339 try: 340 RequestDispatcher(self.env).dispatch(req) 341 except HTTPInternalServerError as e: 342 self.assertEqual("500 Trac Error (Raised in process_request)", 343 str(e)) 344 else: 345 self.fail("HTTPInternalServerError not raised") 346 347 def test_not_implemented_error_raises_http_internal_server_error(self): 348 """NotImplementedError in process_request is trapped and an 349 HTTPInternalServerError is raised. 350 """ 351 req = MockRequest(self.env) 352 req.exc_class = NotImplementedError 353 354 try: 355 RequestDispatcher(self.env).dispatch(req) 356 except HTTPInternalServerError as e: 357 self.assertEqual("500 Not Implemented Error (Raised in " 358 "process_request)", str(e)) 359 else: 360 self.fail("HTTPInternalServerError not raised") 361 362 363class PostProcessRequestTestCase(unittest.TestCase): 364 """Test cases for handling of the optional `method` argument in 365 RequestDispatcher._post_process_request.""" 366 367 request_filter = {} 368 369 @classmethod 370 def setUpClass(cls): 371 class RequestFilterReturns2Args(Component): 372 implements(IRequestFilter) 373 def pre_process_request(self, req, handler): 374 return handler 375 def post_process_request(self, req, template, data, metadata): 376 if metadata is not None: 377 metadata['text'] = True 378 return template, data 379 380 class RequestFilterReturns3Args(Component): 381 implements(IRequestFilter) 382 def pre_process_request(self, req, handler): 383 return handler 384 def post_process_request(self, req, template, data, metadata): 385 if metadata is not None: 386 metadata['domain'] = 'en_US' 387 return template, data, metadata 388 389 class RequestFilterRedirectOnPermError(Component): 390 implements(IRequestHandler, IRequestFilter) 391 def match_request(self, req): 392 return re.match(r'/perm-error', req.path_info) 393 def process_request(self, req): 394 req.entered_process_request = True 395 raise PermissionError("No permission to view") 396 def pre_process_request(self, req, handler): 397 return handler 398 def post_process_request(self, req, template, data, content_type): 399 if (template, data, content_type) == (None, None, None): 400 req.entered_post_process_request = True 401 req.redirect(req.href('/redirect-target')) 402 return template, data, content_type 403 404 cls.request_filter['2Arg'] = RequestFilterReturns2Args 405 cls.request_filter['3Arg'] = RequestFilterReturns3Args 406 cls.request_filter['RedirectOnPermError'] = \ 407 RequestFilterRedirectOnPermError 408 409 @classmethod 410 def tearDownClass(cls): 411 from trac.core import ComponentMeta 412 for component in cls.request_filter.values(): 413 ComponentMeta.deregister(component) 414 415 def setUp(self): 416 self.env = EnvironmentStub(enable=('trac.web.main.*', 417 self.request_filter['2Arg'], 418 self.request_filter['3Arg'])) 419 self.req = MockRequest(self.env) 420 421 def test_post_process_request_error_handling(self): 422 """The post_process_request methods are called with a triple of 423 `None` values when an exception is raised in process_request or 424 post_process_request, or an empty response is returned by 425 process_request. 426 """ 427 request_dispatcher = RequestDispatcher(self.env) 428 args = (None,) * 3 429 resp = request_dispatcher._post_process_request(self.req, *args) 430 self.assertEqual(2, len(request_dispatcher.filters)) 431 self.assertEqual((None, None, None), resp) 432 433 def test_post_process_request_with_2_args(self): 434 request_dispatcher = RequestDispatcher(self.env) 435 args = ('template.html', {}) 436 resp = request_dispatcher._post_process_request(self.req, *args) 437 self.assertEqual(2, len(request_dispatcher.filters)) 438 self.assertEqual(3, len(resp)) 439 self.assertEqual(2, len(resp[2])) 440 self.assertEqual('en_US', resp[2]['domain']) 441 self.assertTrue(resp[2]['text']) 442 443 def test_post_process_request_with_3_args(self): 444 request_dispatcher = RequestDispatcher(self.env) 445 args = ('template.html', {}, {'content_type': 'text/html'}) 446 resp = request_dispatcher._post_process_request(self.req, *args) 447 self.assertEqual(2, len(request_dispatcher.filters)) 448 self.assertEqual(3, len(resp)) 449 self.assertEqual('text/html', resp[2]['content_type']) 450 self.assertEqual('en_US', resp[2]['domain']) 451 self.assertTrue(resp[2]['text']) 452 self.assertEqual(args, resp) 453 454 def test_redirect_on_permission_error(self): 455 """The post_process_request method can redirect during exception 456 handling from an exception raised in process_request. 457 """ 458 self.env.enable_component(self.request_filter['RedirectOnPermError']) 459 dispatcher = RequestDispatcher(self.env) 460 req = MockRequest(self.env, method='GET', path_info='/perm-error') 461 req.entered_process_request = False 462 req.entered_post_process_request = False 463 464 try: 465 dispatcher.dispatch(req) 466 except RequestDone: 467 pass 468 else: 469 self.fail("RequestDone not raised") 470 471 self.assertTrue(req.entered_process_request) 472 self.assertTrue(req.entered_post_process_request) 473 474 475class RequestDispatcherTestCase(unittest.TestCase): 476 477 def setUp(self): 478 self.env = EnvironmentStub(path=mkdtemp()) 479 os.mkdir(self.env.templates_dir) 480 filepath = os.path.join(self.env.templates_dir, 481 TestStubRequestHandler.filename) 482 create_file(filepath, TestStubRequestHandler.template) 483 self.filename = os.path.join(self.env.path, 'test.txt') 484 self.data = b'contents\n' 485 create_file(self.filename, self.data, 'wb') 486 487 def tearDown(self): 488 self.env.reset_db_and_disk() 489 490 def _insert_session(self): 491 sid = '1234567890abcdef' 492 name = 'First Last' 493 email = 'first.last@example.com' 494 self.env.insert_users([(sid, name, email, 0)]) 495 return sid, name, email 496 497 def _content(self): 498 yield b'line1,' 499 yield b'line2,' 500 yield b'line3\n' 501 502 def test_invalid_default_date_format_raises_exception(self): 503 self.env.config.set('trac', 'default_date_format', 'ĭšo8601') 504 505 self.assertEqual('ĭšo8601', 506 self.env.config.get('trac', 'default_date_format')) 507 self.assertRaises(ConfigurationError, getattr, 508 RequestDispatcher(self.env), 'default_date_format') 509 510 def test_get_session_returns_session(self): 511 """Session is returned when database is reachable.""" 512 sid, name, email = self._insert_session() 513 req = MockRequest(self.env, path_info='/test-stub', 514 cookie='trac_session=%s;' % sid) 515 request_dispatcher = RequestDispatcher(self.env) 516 request_dispatcher.set_default_callbacks(req) 517 518 self.assertRaises(RequestDone, request_dispatcher.dispatch, req) 519 520 self.assertIsInstance(req.session, Session) 521 self.assertEqual(sid, req.session.sid) 522 self.assertEqual(name, req.session['name']) 523 self.assertEqual(email, req.session['email']) 524 self.assertFalse(req.session.authenticated) 525 self.assertEqual('200 Ok', req.status_sent[0]) 526 self.assertIn(b'<h1>Hello World</h1>', req.response_sent.getvalue()) 527 528 def test_get_session_returns_fake_session(self): 529 """Fake session is returned when database is not reachable.""" 530 sid = self._insert_session()[0] 531 request_dispatcher = RequestDispatcher(self.env) 532 533 def get_session(req): 534 """Simulates an unreachable database.""" 535 _get_connector = DatabaseManager.get_connector 536 537 def get_connector(self): 538 raise TracError("Database not reachable") 539 540 DatabaseManager.get_connector = get_connector 541 DatabaseManager(self.env).shutdown() 542 session = request_dispatcher._get_session(req) 543 DatabaseManager.get_connector = _get_connector 544 return session 545 546 req = MockRequest(self.env, path_info='/test-stub', 547 cookie='trac_session=%s;' % sid) 548 req.callbacks['session'] = get_session 549 550 self.assertRaises(RequestDone, request_dispatcher.dispatch, req) 551 552 self.assertIn(('DEBUG', "Chosen handler is <Component trac.web.tests" 553 ".main.TestStubRequestHandler>"), 554 self.env.log_messages) 555 self.assertIn(('ERROR', "can't retrieve session: TracError: Database " 556 "not reachable"), self.env.log_messages) 557 self.assertIsInstance(req.session, FakeSession) 558 self.assertIsNone(req.session.sid) 559 self.assertNotIn('name', req.session) 560 self.assertNotIn('email', req.session) 561 self.assertFalse(req.session.authenticated) 562 self.assertEqual('200 Ok', req.status_sent[0]) 563 self.assertIn(b'<h1>Hello World</h1>', req.response_sent.getvalue()) 564 565 def test_invalid_session_id_returns_fake_session(self): 566 """Fake session is returned when session id is invalid.""" 567 sid = 'a' * 23 + '$' # last char invalid, sid must be alphanumeric. 568 req = MockRequest(self.env, path_info='/test-stub', 569 cookie='trac_session=%s;' % sid) 570 request_dispatcher = RequestDispatcher(self.env) 571 request_dispatcher.set_default_callbacks(req) 572 573 with self.assertRaises(RequestDone): 574 request_dispatcher.dispatch(req) 575 576 self.assertIn(('DEBUG', "Chosen handler is <Component trac.web.tests" 577 ".main.TestStubRequestHandler>"), 578 self.env.log_messages) 579 self.assertIn(('WARNING', "can't retrieve session: " 580 "Session ID must be alphanumeric."), 581 self.env.log_messages) 582 self.assertIsInstance(req.session, FakeSession) 583 self.assertIsNone(req.session.sid) 584 self.assertEqual('200 Ok', req.status_sent[0]) 585 self.assertIn(b'<h1>Hello World</h1>', req.response_sent.getvalue()) 586 587 def test_set_valid_xsendfile_header(self): 588 """Send file using xsendfile header.""" 589 self.env.config.set('trac', 'use_xsendfile', True) 590 self.env.config.set('trac', 'xsendfile_header', 'X-Accel-Redirect') 591 592 req = MockRequest(self.env) 593 request_dispatcher = RequestDispatcher(self.env) 594 request_dispatcher.set_default_callbacks(req) 595 596 # File is sent using xsendfile. 597 self.assertRaises(RequestDone, req.send_file, self.filename) 598 self.assertEqual(['200 Ok'], req.status_sent) 599 self.assertEqual('text/plain', req.headers_sent['Content-Type']) 600 self.assertEqual(self.filename, req.headers_sent['X-Accel-Redirect']) 601 self.assertNotIn('X-Sendfile', req.headers_sent) 602 self.assertIsNone(req._response) 603 self.assertEqual(b'', req.response_sent.getvalue()) 604 605 def _test_file_not_sent_using_xsendfile_header(self, xsendfile_header): 606 req = MockRequest(self.env) 607 request_dispatcher = RequestDispatcher(self.env) 608 request_dispatcher.set_default_callbacks(req) 609 610 # File is not sent using xsendfile. 611 self.assertRaises(RequestDone, req.send_file, self.filename) 612 self.assertEqual(['200 Ok'], req.status_sent) 613 self.assertEqual('text/plain', req.headers_sent['Content-Type']) 614 self.assertNotIn(xsendfile_header, req.headers_sent) 615 self.assertEqual('_FileWrapper', type(req._response).__name__) 616 self.assertEqual(b'', req.response_sent.getvalue()) 617 req._response.close() 618 619 def test_set_invalid_xsendfile_header(self): 620 """Not sent by xsendfile header because header is invalid.""" 621 xsendfile_header = '(X-SendFile)' 622 self.env.config.set('trac', 'use_xsendfile', True) 623 self.env.config.set('trac', 'xsendfile_header', xsendfile_header) 624 625 self._test_file_not_sent_using_xsendfile_header(xsendfile_header) 626 627 def test_xsendfile_is_disabled(self): 628 """Not sent by xsendfile header because xsendfile is disabled.""" 629 xsendfile_header = 'X-SendFile' 630 self.env.config.set('trac', 'use_xsendfile', False) 631 self.env.config.set('trac', 'xsendfile_header', xsendfile_header) 632 633 self._test_file_not_sent_using_xsendfile_header(xsendfile_header) 634 635 def _test_configurable_headers(self, method): 636 # Reserved headers not allowed. 637 content_type = 'not-allowed' 638 self.env.config.set('http-headers', 'Content-Type', content_type) 639 # Control code not allowed. 640 custom1 = '\x00custom1' 641 self.env.config.set('http-headers', 'X-Custom-1', custom1) 642 # Many special characters allowed in header name. 643 custom2 = 'Custom2-!#$%&\'*+.^_`|~' 644 self.env.config.set('http-headers', custom2, 'custom2') 645 # Some special characters not allowed in header name. 646 self.env.config.set('http-headers', 'X-Custom-(3)', 'custom3') 647 648 req = MockRequest(self.env, method='POST') 649 request_dispatcher = RequestDispatcher(self.env) 650 request_dispatcher.set_default_callbacks(req) 651 self.assertRaises(RequestDone, method, req) 652 653 self.assertNotEqual('not-allowed', req.headers_sent.get('Content-Type')) 654 self.assertNotIn('x-custom-1', req.headers_sent) 655 self.assertIn(custom2.lower(), req.headers_sent) 656 self.assertNotIn('x-custom-(3)', req.headers_sent) 657 self.assertIn(('WARNING', "[http-headers] invalid headers are ignored: " 658 "'content-type': 'not-allowed', " 659 "'x-custom-1': '\\x00custom1', " 660 "'x-custom-(3)': 'custom3'"), 661 self.env.log_messages) 662 663 def test_send_configurable_headers(self): 664 def send(req): 665 req.send(self._content()) 666 667 self._test_configurable_headers(send) 668 669 def test_send_error_configurable_headers(self): 670 def send_error(req): 671 req.send_error(None, self._content()) 672 673 self._test_configurable_headers(send_error) 674 675 def test_send_configurable_headers_no_override(self): 676 """Headers in request not overridden by configurable headers.""" 677 self.env.config.set('http-headers', 'X-XSS-Protection', '1; mode=block') 678 request_dispatcher = RequestDispatcher(self.env) 679 req1 = MockRequest(self.env) 680 request_dispatcher.set_default_callbacks(req1) 681 682 self.assertRaises(RequestDone, req1.send, self._content()) 683 684 self.assertNotIn('X-XSS-protection', req1.headers_sent) 685 self.assertIn('x-xss-protection', req1.headers_sent) 686 self.assertEqual('1; mode=block', 687 req1.headers_sent['x-xss-protection']) 688 689 req2 = MockRequest(self.env, method='POST') 690 request_dispatcher.set_default_callbacks(req2) 691 692 self.assertRaises(RequestDone, req2.send, self._content()) 693 694 self.assertNotIn('x-xss-protection', req2.headers_sent) 695 self.assertIn('X-XSS-Protection', req2.headers_sent) 696 self.assertEqual('0', req2.headers_sent['X-XSS-Protection']) 697 698 699class HdfdumpTestCase(unittest.TestCase): 700 701 components = [] 702 703 @classmethod 704 def setUpClass(cls): 705 class HdfdumpRequestHandler(Component): 706 implements(IRequestHandler) 707 def match_request(self, req): 708 return True 709 def process_request(self, req): 710 data = {'name': 'value'} 711 return 'error.html', data 712 713 cls.components = [HdfdumpRequestHandler] 714 715 @classmethod 716 def tearDownClass(cls): 717 from trac.core import ComponentMeta 718 for component in cls.components: 719 ComponentMeta.deregister(component) 720 721 def setUp(self): 722 self.env = EnvironmentStub(enable=['trac.web.*']) 723 self.req = MockRequest(self.env, args={'hdfdump': '1'}) 724 self.request_dispatcher = RequestDispatcher(self.env) 725 726 def test_hdfdump(self): 727 self.env.config.set('trac', 'default_handler', 'HdfdumpRequestHandler') 728 self.assertRaises(RequestDone, self.request_dispatcher.dispatch, 729 self.req) 730 self.assertIn(b"{'name': 'value'}", 731 self.req.response_sent.getvalue()) 732 self.assertEqual('text/plain;charset=utf-8', 733 self.req.headers_sent['Content-Type']) 734 735 736class SendErrorTestCase(unittest.TestCase): 737 738 use_chunked_encoding = False 739 740 components = None 741 env_path = None 742 743 @classmethod 744 def setUpClass(cls): 745 class RaiseExceptionHandler(Component): 746 implements(IRequestHandler) 747 748 def match_request(self, req): 749 if req.path_info.startswith('/raise-exception'): 750 return True 751 752 def process_request(self, req): 753 if req.args.get('type') == 'tracerror': 754 raise TracError("The TracError message") 755 else: 756 raise Exception("The Exception message") 757 758 cls.components = [RaiseExceptionHandler] 759 cls.env_path = mkdtemp() 760 env = trac.env.Environment(path=cls.env_path, create=True) 761 PermissionSystem(env).grant_permission('admin', 'TRAC_ADMIN') 762 env.shutdown() 763 764 @classmethod 765 def tearDownClass(cls): 766 from trac.core import ComponentMeta 767 for component in cls.components: 768 ComponentMeta.deregister(component) 769 if cls.env_path in trac.env.env_cache: 770 trac.env.env_cache[cls.env_path].shutdown() 771 del trac.env.env_cache[cls.env_path] 772 EnvironmentStub(path=cls.env_path, destroying=True).reset_db_and_disk() 773 774 def _make_environ(self, scheme='http', server_name='example.org', 775 server_port=80, method='GET', script_name='/', 776 env_path=None, **kwargs): 777 environ = {'wsgi.url_scheme': scheme, 'wsgi.input': io.BytesIO(), 778 'REQUEST_METHOD': method, 'SERVER_NAME': server_name, 779 'SERVER_PORT': server_port, 'SCRIPT_NAME': script_name, 780 'trac.env_path': env_path or self.env_path, 781 'wsgi.run_once': False} 782 environ.update(kwargs) 783 return environ 784 785 def _make_start_response(self): 786 self.status_sent = [] 787 self.headers_sent = {} 788 self.response_sent = io.BytesIO() 789 790 def start_response(status, headers, exc_info=None): 791 self.status_sent.append(status) 792 self.headers_sent.update(dict(headers)) 793 return self.response_sent.write 794 795 return start_response 796 797 def _set_config(self, admin_trac_url): 798 env = trac.env.open_environment(self.env_path, use_cache=True) 799 env.config.set('trac', 'use_chunked_encoding', 800 self.use_chunked_encoding) 801 env.config.set('project', 'admin_trac_url', admin_trac_url) 802 env.config.save() 803 804 def assert_internal_error(self, content): 805 self.assertEqual('500 Internal Server Error', self.status_sent[0]) 806 self.assertEqual('text/html;charset=utf-8', 807 self.headers_sent['Content-Type']) 808 self.assertIn(b'<h1>Oops\xe2\x80\xa6</h1>', content) 809 810 def test_trac_error(self): 811 self._set_config(admin_trac_url='.') 812 environ = self._make_environ(PATH_INFO='/raise-exception', 813 QUERY_STRING='type=tracerror') 814 dispatch_request(environ, self._make_start_response()) 815 816 content = self.response_sent.getvalue() 817 self.assertEqual('500 Internal Server Error', self.status_sent[0]) 818 self.assertEqual('text/html;charset=utf-8', 819 self.headers_sent['Content-Type']) 820 self.assertIn(b'<h1>Trac Error</h1>', content) 821 self.assertIn(b'<p class="message">The TracError message</p>', content) 822 self.assertNotIn(b'<strong>Trac detected an internal error:</strong>', 823 content) 824 self.assertNotIn(b'There was an internal error in Trac.', content) 825 826 def test_internal_error_for_non_admin(self): 827 self._set_config(admin_trac_url='.') 828 environ = self._make_environ(PATH_INFO='/raise-exception') 829 830 dispatch_request(environ, self._make_start_response()) 831 content = self.response_sent.getvalue() 832 833 self.assert_internal_error(content) 834 self.assertIn(b'There was an internal error in Trac.', content) 835 self.assertIn(b'<p>\nTo that end, you could', content) 836 self.assertNotIn(b'This is probably a local installation issue.', 837 content) 838 self.assertNotIn(b'<h2>Found a bug in Trac?</h2>', content) 839 840 def test_internal_error_with_admin_trac_url_for_non_admin(self): 841 self._set_config(admin_trac_url='http://example.org/admin') 842 environ = self._make_environ(PATH_INFO='/raise-exception') 843 844 dispatch_request(environ, self._make_start_response()) 845 content = self.response_sent.getvalue() 846 847 self.assert_internal_error(content) 848 self.assertIn(b'There was an internal error in Trac.', content) 849 self.assertIn(b'<p>\nTo that end, you could', content) 850 self.assertIn(b' action="http://example.org/admin/newticket#"', content) 851 self.assertNotIn(b'This is probably a local installation issue.', 852 content) 853 self.assertNotIn(b'<h2>Found a bug in Trac?</h2>', content) 854 855 def test_internal_error_without_admin_trac_url_for_non_admin(self): 856 self._set_config(admin_trac_url='') 857 environ = self._make_environ(PATH_INFO='/raise-exception') 858 859 dispatch_request(environ, self._make_start_response()) 860 content = self.response_sent.getvalue() 861 862 self.assert_internal_error(content) 863 self.assertIn(b'There was an internal error in Trac.', content) 864 self.assertNotIn(b'<p>\nTo that end, you could', content) 865 self.assertNotIn(b'This is probably a local installation issue.', 866 content) 867 self.assertNotIn(b'<h2>Found a bug in Trac?</h2>', content) 868 869 def test_internal_error_for_admin(self): 870 self._set_config(admin_trac_url='.') 871 environ = self._make_environ(PATH_INFO='/raise-exception', 872 REMOTE_USER='admin') 873 874 dispatch_request(environ, self._make_start_response()) 875 content = self.response_sent.getvalue() 876 877 self.assert_internal_error(content) 878 self.assertNotIn(b'There was an internal error in Trac.', content) 879 self.assertIn(b'This is probably a local installation issue.', content) 880 self.assertNotIn(b'a ticket at the admin Trac to report', content) 881 self.assertIn(b'<h2>Found a bug in Trac?</h2>', content) 882 self.assertIn(b'<p>\nOtherwise, please', content) 883 self.assertIn(b' action="https://trac.edgewall.org/newticket"', 884 content) 885 886 def test_internal_error_with_admin_trac_url_for_admin(self): 887 self._set_config(admin_trac_url='http://example.org/admin') 888 environ = self._make_environ(PATH_INFO='/raise-exception', 889 REMOTE_USER='admin') 890 891 dispatch_request(environ, self._make_start_response()) 892 content = self.response_sent.getvalue() 893 894 self.assert_internal_error(content) 895 self.assertNotIn(b'There was an internal error in Trac.', content) 896 self.assertIn(b'This is probably a local installation issue.', content) 897 self.assertIn(b'a ticket at the admin Trac to report', content) 898 self.assertIn(b' action="http://example.org/admin/newticket#"', content) 899 self.assertIn(b'<h2>Found a bug in Trac?</h2>', content) 900 self.assertIn(b'<p>\nOtherwise, please', content) 901 self.assertIn(b' action="https://trac.edgewall.org/newticket"', 902 content) 903 904 def test_internal_error_without_admin_trac_url_for_admin(self): 905 self._set_config(admin_trac_url='') 906 environ = self._make_environ(PATH_INFO='/raise-exception', 907 REMOTE_USER='admin') 908 909 dispatch_request(environ, self._make_start_response()) 910 content = self.response_sent.getvalue() 911 912 self.assert_internal_error(content) 913 self.assertNotIn(b'There was an internal error in Trac.', content) 914 self.assertIn(b'This is probably a local installation issue.', content) 915 self.assertNotIn(b'a ticket at the admin Trac to report', content) 916 self.assertIn(b'<h2>Found a bug in Trac?</h2>', content) 917 self.assertIn(b'<p>\nOtherwise, please', content) 918 self.assertIn(b' action="https://trac.edgewall.org/newticket"', 919 content) 920 921 def test_environment_not_found(self): 922 """User error reported when environment is not found.""" 923 env_path = self.env_path + '$' # Arbitrarily modified path 924 environ = self._make_environ(PATH_INFO='/', env_path=env_path) 925 926 dispatch_request(environ, self._make_start_response()) 927 content = self.response_sent.getvalue() 928 929 self.assertEqual( 930 "Trac Error\n" 931 "\n" 932 "TracError: No Trac environment found at {}\n" 933 "FileNotFoundError: [Errno 2] No such file or directory: '{}'" 934 .format(env_path, os.path.join(env_path, 'VERSION')) 935 .encode('utf-8'), content) 936 937 938class SendErrorUseChunkedEncodingTestCase(SendErrorTestCase): 939 940 use_chunked_encoding = True 941 942 943def test_suite(): 944 suite = unittest.TestSuite() 945 suite.addTest(unittest.makeSuite(AuthenticateTestCase)) 946 suite.addTest(unittest.makeSuite(EnvironmentsTestCase)) 947 suite.addTest(unittest.makeSuite(PreProcessRequestTestCase)) 948 suite.addTest(unittest.makeSuite(ProcessRequestTestCase)) 949 suite.addTest(unittest.makeSuite(PostProcessRequestTestCase)) 950 suite.addTest(unittest.makeSuite(RequestDispatcherTestCase)) 951 suite.addTest(unittest.makeSuite(HdfdumpTestCase)) 952 suite.addTest(unittest.makeSuite(SendErrorTestCase)) 953 suite.addTest(unittest.makeSuite(SendErrorUseChunkedEncodingTestCase)) 954 return suite 955 956 957if __name__ == '__main__': 958 unittest.main(defaultTest='test_suite') 959