1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2010-2021 Edgewall Software
4# All rights reserved.
5#
6# This software is licensed as described in the file COPYING, which
7# you should have received as part of this distribution. The terms
8# are also available at https://trac.edgewall.org/wiki/TracLicense.
9#
10# This software consists of voluntary contributions made by many
11# individuals. For the exact contribution history, see the revision
12# history and logs, available at https://trac.edgewall.org/log/.
13
14import io
15import os.path
16import re
17import textwrap
18import unittest
19
20import trac.env
21from trac.config import ConfigurationError
22from trac.core import Component, TracError, implements
23from trac.db.api import DatabaseManager
24from trac.perm import PermissionError, PermissionSystem
25from trac.resource import ResourceNotFound
26from trac.test import EnvironmentStub, MockRequest, mkdtemp
27from trac.util import create_file
28from trac.web.api import (HTTPForbidden, HTTPInternalServerError,
29    HTTPNotFound, IRequestFilter, IRequestHandler, RequestDone)
30from trac.web.auth import IAuthenticator
31from trac.web.main import FakeSession, RequestDispatcher, Session, \
32                          dispatch_request, get_environments
33
34
35class TestStubRequestHandler(Component):
36
37    implements(IRequestHandler)
38
39    filename = 'test_stub.html'
40
41    template = textwrap.dedent("""\
42        <!DOCTYPE html>
43        <html>
44          <body>
45            <h1>${greeting}</h1>
46          </body>
47        </html>
48        """)
49
50    def match_request(self, req):
51        return req.path_info == '/test-stub'
52
53    def process_request(self, req):
54        return self.filename, {'greeting': 'Hello World'}
55
56
57class AuthenticateTestCase(unittest.TestCase):
58
59    authenticators = {}
60    request_handlers = []
61
62    @classmethod
63    def setUpClass(cls):
64        class UnsuccessfulAuthenticator(Component):
65            implements(IAuthenticator)
66            def authenticate(self, req):
67                return None
68
69        class RaisingAuthenticator(Component):
70            implements(IAuthenticator)
71            def authenticate(self, req):
72                raise TracError("Bad attempt")
73
74        class SuccessfulAuthenticator1(Component):
75            implements(IAuthenticator)
76            def authenticate(self, req):
77                return 'user1'
78
79        class SuccessfulAuthenticator2(Component):
80            implements(IAuthenticator)
81            def authenticate(self, req):
82                return 'user2'
83
84        class AuthenticateRequestHandler(Component):
85            implements(IRequestHandler)
86            def __init__(self):
87                self.calls = 0
88            def match_request(self, req):
89                return bool(req.perm)
90            def process_request(self, req):
91                self.calls += 1
92                req.authname
93                req.send('')
94
95        cls.authenticators['success1'] = SuccessfulAuthenticator1
96        cls.authenticators['success2'] = SuccessfulAuthenticator2
97        cls.authenticators['unsuccess'] = UnsuccessfulAuthenticator
98        cls.authenticators['raising'] = RaisingAuthenticator
99        cls.request_handlers = [AuthenticateRequestHandler]
100
101    @classmethod
102    def tearDownClass(cls):
103        from trac.core import ComponentMeta
104        for component in list(cls.authenticators.values()) + \
105                         cls.request_handlers:
106            ComponentMeta.deregister(component)
107
108    def setUp(self):
109        self.env = EnvironmentStub(enable=('trac.web.main.*',))
110        self.req = MockRequest(self.env)
111        self.request_dispatcher = RequestDispatcher(self.env)
112
113    def test_authenticate_returns_first_successful(self):
114        self.env.enable_component(self.authenticators['success1'])
115        self.env.enable_component(self.authenticators['success2'])
116        self.assertEqual(2, len(self.request_dispatcher.authenticators))
117        self.assertIsInstance(self.request_dispatcher.authenticators[0],
118                              self.authenticators['success1'])
119        self.assertIsInstance(self.request_dispatcher.authenticators[1],
120                              self.authenticators['success2'])
121        self.assertEqual('user1',
122                         self.request_dispatcher.authenticate(self.req))
123
124    def test_authenticate_skips_unsuccessful(self):
125        self.env.enable_component(self.authenticators['unsuccess'])
126        self.env.enable_component(self.authenticators['success1'])
127        self.assertEqual(2, len(self.request_dispatcher.authenticators))
128        self.assertIsInstance(self.request_dispatcher.authenticators[0],
129                              self.authenticators['unsuccess'])
130        self.assertIsInstance(self.request_dispatcher.authenticators[1],
131                              self.authenticators['success1'])
132        self.assertEqual('user1',
133                         self.request_dispatcher.authenticate(self.req))
134
135    def test_authenticate_raises(self):
136        self.env.enable_component(self.authenticators['raising'])
137        self.env.enable_component(self.authenticators['success1'])
138        self.assertEqual(2, len(self.request_dispatcher.authenticators))
139        self.assertIsInstance(self.request_dispatcher.authenticators[0],
140                              self.authenticators['raising'])
141        self.assertIsInstance(self.request_dispatcher.authenticators[1],
142                              self.authenticators['success1'])
143        self.assertEqual('anonymous',
144                         self.request_dispatcher.authenticate(self.req))
145        self.assertEqual(1, len(self.req.chrome['warnings']))
146        expected = "Can't authenticate using RaisingAuthenticator: "
147        for level, message in self.env.log_messages:
148            if expected in message.split('\n'):
149                self.assertEqual('ERROR', level)
150                break
151        else:
152            self.fail("Expected log message not found: \"%s\"" % expected)
153
154    def test_authenticate_once(self):
155        self.env.enable_component(self.authenticators['success1'])
156        self.env.enable_component(self.request_handlers[0])
157        self.env.config.set('trac', 'default_handler',
158                            'AuthenticateRequestHandler')
159        self.request_dispatcher.set_default_callbacks(self.req)
160
161        with self.assertRaises(RequestDone):
162            self.request_dispatcher.dispatch(self.req)
163
164        self.assertEqual(1, len(self.request_dispatcher.authenticators))
165        self.assertEqual(1, len(self.request_dispatcher.handlers))
166        self.assertEqual(1, self.request_dispatcher.handlers[0].calls)
167
168
169class EnvironmentsTestCase(unittest.TestCase):
170
171    dirs = ('mydir1', 'mydir2', '.hidden_dir')
172    files = ('myfile1', 'myfile2', '.dot_file')
173
174    def setUp(self):
175        self.parent_dir = mkdtemp()
176        self.tracignore = os.path.join(self.parent_dir, '.tracignore')
177        for dname in self.dirs:
178            os.mkdir(os.path.join(self.parent_dir, dname))
179        for fname in self.files:
180            create_file(os.path.join(self.parent_dir, fname))
181        self.environ = {
182           'trac.env_paths': [],
183           'trac.env_parent_dir': self.parent_dir,
184        }
185
186    def tearDown(self):
187        for fname in self.files:
188            os.unlink(os.path.join(self.parent_dir, fname))
189        for dname in self.dirs:
190            os.rmdir(os.path.join(self.parent_dir, dname))
191        if os.path.exists(self.tracignore):
192            os.unlink(self.tracignore)
193        os.rmdir(self.parent_dir)
194
195    def env_paths(self, projects):
196        return {project: os.path.normpath(os.path.join(self.parent_dir,
197                                                       project))
198                for project in projects}
199
200    def test_default_tracignore(self):
201        self.assertEqual(self.env_paths(['mydir1', 'mydir2']),
202                         get_environments(self.environ))
203
204    def test_empty_tracignore(self):
205        create_file(self.tracignore)
206        self.assertEqual(self.env_paths(['mydir1', 'mydir2', '.hidden_dir']),
207                         get_environments(self.environ))
208
209    def test_qmark_pattern_tracignore(self):
210        create_file(self.tracignore, 'mydir?')
211        self.assertEqual(self.env_paths(['.hidden_dir']),
212                         get_environments(self.environ))
213
214    def test_star_pattern_tracignore(self):
215        create_file(self.tracignore, 'my*\n.hidden_dir')
216        self.assertEqual({}, get_environments(self.environ))
217
218    def test_combined_tracignore(self):
219        create_file(self.tracignore, 'my*i?1\n\n#mydir2')
220        self.assertEqual(self.env_paths(['mydir2', '.hidden_dir']),
221                         get_environments(self.environ))
222
223
224class PreProcessRequestTestCase(unittest.TestCase):
225
226    components = []
227
228    @classmethod
229    def setUpClass(cls):
230
231        class DefaultHandler(Component):
232            implements(IRequestHandler)
233            def match_request(self, req):
234                return True
235            def process_request(self, req):
236                pass
237
238        class RequestFilter(Component):
239            implements(IRequestFilter)
240            def pre_process_request(self, req, handler):
241                raise TracError("Raised in pre_process_request")
242            def post_process_request(self, req, template, data, metadata):
243                return template, data, metadata
244
245        cls.components = [DefaultHandler, RequestFilter]
246
247    @classmethod
248    def tearDownClass(cls):
249        from trac.core import ComponentMeta
250        for component in cls.components:
251            ComponentMeta.deregister(component)
252
253    def setUp(self):
254        self.env = EnvironmentStub(enable=['trac.web.*'] +
255                                          self.components)
256        self.env.config.set('trac', 'default_handler', 'DefaultHandler')
257
258    def test_trac_error_raises_http_internal_server_error(self):
259        """TracError in pre_process_request is trapped and an
260        HTTPInternalServerError is raised.
261        """
262        req = MockRequest(self.env)
263
264        try:
265            RequestDispatcher(self.env).dispatch(req)
266        except HTTPInternalServerError as e:
267            self.assertEqual("500 Trac Error (Raised in pre_process_request)",
268                             str(e))
269        else:
270            self.fail("HTTPInternalServerError not raised")
271
272
273class ProcessRequestTestCase(unittest.TestCase):
274
275    request_handlers = []
276
277    @classmethod
278    def setUpClass(cls):
279
280        class DefaultHandler(Component):
281            implements(IRequestHandler)
282            def match_request(self, req):
283                return True
284            def process_request(self, req):
285                raise req.exc_class("Raised in process_request")
286
287        cls.request_handlers = [DefaultHandler]
288
289    @classmethod
290    def tearDownClass(cls):
291        from trac.core import ComponentMeta
292        for component in cls.request_handlers:
293            ComponentMeta.deregister(component)
294
295    def setUp(self):
296        self.env = EnvironmentStub(enable=['trac.web.*'] +
297                                          self.request_handlers)
298        self.env.config.set('trac', 'default_handler', 'DefaultHandler')
299
300    def test_permission_error_raises_http_forbidden(self):
301        """TracError in process_request is trapped and an HTTPForbidden
302        error is raised.
303        """
304        req = MockRequest(self.env)
305        req.exc_class = PermissionError
306
307        try:
308            RequestDispatcher(self.env).dispatch(req)
309        except HTTPForbidden as e:
310            self.assertEqual(
311                "403 Forbidden (Raised in process_request "
312                "privileges are required to perform this operation. You "
313                "don't have the required permissions.)", str(e))
314        else:
315            self.fail("HTTPForbidden not raised")
316
317    def test_resource_not_found_raises_http_not_found(self):
318        """ResourceNotFound error in process_request is trapped and an
319        HTTPNotFound error is raised.
320        """
321        req = MockRequest(self.env)
322        req.exc_class = ResourceNotFound
323
324        try:
325            RequestDispatcher(self.env).dispatch(req)
326        except HTTPNotFound as e:
327            self.assertEqual("404 Trac Error (Raised in process_request)",
328                             str(e))
329        else:
330            self.fail("HTTPNotFound not raised")
331
332    def test_trac_error_raises_http_internal_server_error(self):
333        """TracError in process_request is trapped and an
334        HTTPInternalServerError is raised.
335        """
336        req = MockRequest(self.env)
337        req.exc_class = TracError
338
339        try:
340            RequestDispatcher(self.env).dispatch(req)
341        except HTTPInternalServerError as e:
342            self.assertEqual("500 Trac Error (Raised in process_request)",
343                             str(e))
344        else:
345            self.fail("HTTPInternalServerError not raised")
346
347    def test_not_implemented_error_raises_http_internal_server_error(self):
348        """NotImplementedError in process_request is trapped and an
349        HTTPInternalServerError is raised.
350        """
351        req = MockRequest(self.env)
352        req.exc_class = NotImplementedError
353
354        try:
355            RequestDispatcher(self.env).dispatch(req)
356        except HTTPInternalServerError as e:
357            self.assertEqual("500 Not Implemented Error (Raised in "
358                             "process_request)", str(e))
359        else:
360            self.fail("HTTPInternalServerError not raised")
361
362
363class PostProcessRequestTestCase(unittest.TestCase):
364    """Test cases for handling of the optional `method` argument in
365    RequestDispatcher._post_process_request."""
366
367    request_filter = {}
368
369    @classmethod
370    def setUpClass(cls):
371        class RequestFilterReturns2Args(Component):
372            implements(IRequestFilter)
373            def pre_process_request(self, req, handler):
374                return handler
375            def post_process_request(self, req, template, data, metadata):
376                if metadata is not None:
377                    metadata['text'] = True
378                return template, data
379
380        class RequestFilterReturns3Args(Component):
381            implements(IRequestFilter)
382            def pre_process_request(self, req, handler):
383                return handler
384            def post_process_request(self, req, template, data, metadata):
385                if metadata is not None:
386                    metadata['domain'] = 'en_US'
387                return template, data, metadata
388
389        class RequestFilterRedirectOnPermError(Component):
390            implements(IRequestHandler, IRequestFilter)
391            def match_request(self, req):
392                return re.match(r'/perm-error', req.path_info)
393            def process_request(self, req):
394                req.entered_process_request = True
395                raise PermissionError("No permission to view")
396            def pre_process_request(self, req, handler):
397                return handler
398            def post_process_request(self, req, template, data, content_type):
399                if (template, data, content_type) == (None, None, None):
400                    req.entered_post_process_request = True
401                    req.redirect(req.href('/redirect-target'))
402                return template, data, content_type
403
404        cls.request_filter['2Arg'] = RequestFilterReturns2Args
405        cls.request_filter['3Arg'] = RequestFilterReturns3Args
406        cls.request_filter['RedirectOnPermError'] = \
407            RequestFilterRedirectOnPermError
408
409    @classmethod
410    def tearDownClass(cls):
411        from trac.core import ComponentMeta
412        for component in cls.request_filter.values():
413            ComponentMeta.deregister(component)
414
415    def setUp(self):
416        self.env = EnvironmentStub(enable=('trac.web.main.*',
417                                           self.request_filter['2Arg'],
418                                           self.request_filter['3Arg']))
419        self.req = MockRequest(self.env)
420
421    def test_post_process_request_error_handling(self):
422        """The post_process_request methods are called with a triple of
423        `None` values when an exception is raised in process_request or
424        post_process_request, or an empty response is returned by
425        process_request.
426        """
427        request_dispatcher = RequestDispatcher(self.env)
428        args = (None,) * 3
429        resp = request_dispatcher._post_process_request(self.req, *args)
430        self.assertEqual(2, len(request_dispatcher.filters))
431        self.assertEqual((None, None, None), resp)
432
433    def test_post_process_request_with_2_args(self):
434        request_dispatcher = RequestDispatcher(self.env)
435        args = ('template.html', {})
436        resp = request_dispatcher._post_process_request(self.req, *args)
437        self.assertEqual(2, len(request_dispatcher.filters))
438        self.assertEqual(3, len(resp))
439        self.assertEqual(2, len(resp[2]))
440        self.assertEqual('en_US', resp[2]['domain'])
441        self.assertTrue(resp[2]['text'])
442
443    def test_post_process_request_with_3_args(self):
444        request_dispatcher = RequestDispatcher(self.env)
445        args = ('template.html', {}, {'content_type': 'text/html'})
446        resp = request_dispatcher._post_process_request(self.req, *args)
447        self.assertEqual(2, len(request_dispatcher.filters))
448        self.assertEqual(3, len(resp))
449        self.assertEqual('text/html', resp[2]['content_type'])
450        self.assertEqual('en_US', resp[2]['domain'])
451        self.assertTrue(resp[2]['text'])
452        self.assertEqual(args, resp)
453
454    def test_redirect_on_permission_error(self):
455        """The post_process_request method can redirect during exception
456        handling from an exception raised in process_request.
457        """
458        self.env.enable_component(self.request_filter['RedirectOnPermError'])
459        dispatcher = RequestDispatcher(self.env)
460        req = MockRequest(self.env, method='GET', path_info='/perm-error')
461        req.entered_process_request = False
462        req.entered_post_process_request = False
463
464        try:
465            dispatcher.dispatch(req)
466        except RequestDone:
467            pass
468        else:
469            self.fail("RequestDone not raised")
470
471        self.assertTrue(req.entered_process_request)
472        self.assertTrue(req.entered_post_process_request)
473
474
475class RequestDispatcherTestCase(unittest.TestCase):
476
477    def setUp(self):
478        self.env = EnvironmentStub(path=mkdtemp())
479        os.mkdir(self.env.templates_dir)
480        filepath = os.path.join(self.env.templates_dir,
481                                TestStubRequestHandler.filename)
482        create_file(filepath, TestStubRequestHandler.template)
483        self.filename = os.path.join(self.env.path, 'test.txt')
484        self.data = b'contents\n'
485        create_file(self.filename, self.data, 'wb')
486
487    def tearDown(self):
488        self.env.reset_db_and_disk()
489
490    def _insert_session(self):
491        sid = '1234567890abcdef'
492        name = 'First Last'
493        email = 'first.last@example.com'
494        self.env.insert_users([(sid, name, email, 0)])
495        return sid, name, email
496
497    def _content(self):
498        yield b'line1,'
499        yield b'line2,'
500        yield b'line3\n'
501
502    def test_invalid_default_date_format_raises_exception(self):
503        self.env.config.set('trac', 'default_date_format', 'ĭšo8601')
504
505        self.assertEqual('ĭšo8601',
506                         self.env.config.get('trac', 'default_date_format'))
507        self.assertRaises(ConfigurationError, getattr,
508                          RequestDispatcher(self.env), 'default_date_format')
509
510    def test_get_session_returns_session(self):
511        """Session is returned when database is reachable."""
512        sid, name, email = self._insert_session()
513        req = MockRequest(self.env, path_info='/test-stub',
514                          cookie='trac_session=%s;' % sid)
515        request_dispatcher = RequestDispatcher(self.env)
516        request_dispatcher.set_default_callbacks(req)
517
518        self.assertRaises(RequestDone, request_dispatcher.dispatch, req)
519
520        self.assertIsInstance(req.session, Session)
521        self.assertEqual(sid, req.session.sid)
522        self.assertEqual(name, req.session['name'])
523        self.assertEqual(email, req.session['email'])
524        self.assertFalse(req.session.authenticated)
525        self.assertEqual('200 Ok', req.status_sent[0])
526        self.assertIn(b'<h1>Hello World</h1>', req.response_sent.getvalue())
527
528    def test_get_session_returns_fake_session(self):
529        """Fake session is returned when database is not reachable."""
530        sid = self._insert_session()[0]
531        request_dispatcher = RequestDispatcher(self.env)
532
533        def get_session(req):
534            """Simulates an unreachable database."""
535            _get_connector = DatabaseManager.get_connector
536
537            def get_connector(self):
538                raise TracError("Database not reachable")
539
540            DatabaseManager.get_connector = get_connector
541            DatabaseManager(self.env).shutdown()
542            session = request_dispatcher._get_session(req)
543            DatabaseManager.get_connector = _get_connector
544            return session
545
546        req = MockRequest(self.env, path_info='/test-stub',
547                          cookie='trac_session=%s;' % sid)
548        req.callbacks['session'] = get_session
549
550        self.assertRaises(RequestDone, request_dispatcher.dispatch, req)
551
552        self.assertIn(('DEBUG', "Chosen handler is <Component trac.web.tests"
553                                ".main.TestStubRequestHandler>"),
554                       self.env.log_messages)
555        self.assertIn(('ERROR', "can't retrieve session: TracError: Database "
556                                "not reachable"), self.env.log_messages)
557        self.assertIsInstance(req.session, FakeSession)
558        self.assertIsNone(req.session.sid)
559        self.assertNotIn('name', req.session)
560        self.assertNotIn('email', req.session)
561        self.assertFalse(req.session.authenticated)
562        self.assertEqual('200 Ok', req.status_sent[0])
563        self.assertIn(b'<h1>Hello World</h1>', req.response_sent.getvalue())
564
565    def test_invalid_session_id_returns_fake_session(self):
566        """Fake session is returned when session id is invalid."""
567        sid = 'a' * 23 + '$'  # last char invalid, sid must be alphanumeric.
568        req = MockRequest(self.env, path_info='/test-stub',
569                          cookie='trac_session=%s;' % sid)
570        request_dispatcher = RequestDispatcher(self.env)
571        request_dispatcher.set_default_callbacks(req)
572
573        with self.assertRaises(RequestDone):
574            request_dispatcher.dispatch(req)
575
576        self.assertIn(('DEBUG', "Chosen handler is <Component trac.web.tests"
577                                ".main.TestStubRequestHandler>"),
578                      self.env.log_messages)
579        self.assertIn(('WARNING', "can't retrieve session: "
580                                  "Session ID must be alphanumeric."),
581                      self.env.log_messages)
582        self.assertIsInstance(req.session, FakeSession)
583        self.assertIsNone(req.session.sid)
584        self.assertEqual('200 Ok', req.status_sent[0])
585        self.assertIn(b'<h1>Hello World</h1>', req.response_sent.getvalue())
586
587    def test_set_valid_xsendfile_header(self):
588        """Send file using xsendfile header."""
589        self.env.config.set('trac', 'use_xsendfile', True)
590        self.env.config.set('trac', 'xsendfile_header', 'X-Accel-Redirect')
591
592        req = MockRequest(self.env)
593        request_dispatcher = RequestDispatcher(self.env)
594        request_dispatcher.set_default_callbacks(req)
595
596        # File is sent using xsendfile.
597        self.assertRaises(RequestDone, req.send_file, self.filename)
598        self.assertEqual(['200 Ok'], req.status_sent)
599        self.assertEqual('text/plain', req.headers_sent['Content-Type'])
600        self.assertEqual(self.filename, req.headers_sent['X-Accel-Redirect'])
601        self.assertNotIn('X-Sendfile', req.headers_sent)
602        self.assertIsNone(req._response)
603        self.assertEqual(b'', req.response_sent.getvalue())
604
605    def _test_file_not_sent_using_xsendfile_header(self, xsendfile_header):
606        req = MockRequest(self.env)
607        request_dispatcher = RequestDispatcher(self.env)
608        request_dispatcher.set_default_callbacks(req)
609
610        # File is not sent using xsendfile.
611        self.assertRaises(RequestDone, req.send_file, self.filename)
612        self.assertEqual(['200 Ok'], req.status_sent)
613        self.assertEqual('text/plain', req.headers_sent['Content-Type'])
614        self.assertNotIn(xsendfile_header, req.headers_sent)
615        self.assertEqual('_FileWrapper', type(req._response).__name__)
616        self.assertEqual(b'', req.response_sent.getvalue())
617        req._response.close()
618
619    def test_set_invalid_xsendfile_header(self):
620        """Not sent by xsendfile header because header is invalid."""
621        xsendfile_header = '(X-SendFile)'
622        self.env.config.set('trac', 'use_xsendfile', True)
623        self.env.config.set('trac', 'xsendfile_header', xsendfile_header)
624
625        self._test_file_not_sent_using_xsendfile_header(xsendfile_header)
626
627    def test_xsendfile_is_disabled(self):
628        """Not sent by xsendfile header because xsendfile is disabled."""
629        xsendfile_header = 'X-SendFile'
630        self.env.config.set('trac', 'use_xsendfile', False)
631        self.env.config.set('trac', 'xsendfile_header', xsendfile_header)
632
633        self._test_file_not_sent_using_xsendfile_header(xsendfile_header)
634
635    def _test_configurable_headers(self, method):
636        # Reserved headers not allowed.
637        content_type = 'not-allowed'
638        self.env.config.set('http-headers', 'Content-Type', content_type)
639        # Control code not allowed.
640        custom1 = '\x00custom1'
641        self.env.config.set('http-headers', 'X-Custom-1', custom1)
642        # Many special characters allowed in header name.
643        custom2 = 'Custom2-!#$%&\'*+.^_`|~'
644        self.env.config.set('http-headers', custom2, 'custom2')
645        # Some special characters not allowed in header name.
646        self.env.config.set('http-headers', 'X-Custom-(3)', 'custom3')
647
648        req = MockRequest(self.env, method='POST')
649        request_dispatcher = RequestDispatcher(self.env)
650        request_dispatcher.set_default_callbacks(req)
651        self.assertRaises(RequestDone, method, req)
652
653        self.assertNotEqual('not-allowed', req.headers_sent.get('Content-Type'))
654        self.assertNotIn('x-custom-1', req.headers_sent)
655        self.assertIn(custom2.lower(), req.headers_sent)
656        self.assertNotIn('x-custom-(3)', req.headers_sent)
657        self.assertIn(('WARNING', "[http-headers] invalid headers are ignored: "
658                                  "'content-type': 'not-allowed', "
659                                  "'x-custom-1': '\\x00custom1', "
660                                  "'x-custom-(3)': 'custom3'"),
661                      self.env.log_messages)
662
663    def test_send_configurable_headers(self):
664        def send(req):
665            req.send(self._content())
666
667        self._test_configurable_headers(send)
668
669    def test_send_error_configurable_headers(self):
670        def send_error(req):
671            req.send_error(None, self._content())
672
673        self._test_configurable_headers(send_error)
674
675    def test_send_configurable_headers_no_override(self):
676        """Headers in request not overridden by configurable headers."""
677        self.env.config.set('http-headers', 'X-XSS-Protection', '1; mode=block')
678        request_dispatcher = RequestDispatcher(self.env)
679        req1 = MockRequest(self.env)
680        request_dispatcher.set_default_callbacks(req1)
681
682        self.assertRaises(RequestDone, req1.send, self._content())
683
684        self.assertNotIn('X-XSS-protection', req1.headers_sent)
685        self.assertIn('x-xss-protection', req1.headers_sent)
686        self.assertEqual('1; mode=block',
687                         req1.headers_sent['x-xss-protection'])
688
689        req2 = MockRequest(self.env, method='POST')
690        request_dispatcher.set_default_callbacks(req2)
691
692        self.assertRaises(RequestDone, req2.send, self._content())
693
694        self.assertNotIn('x-xss-protection', req2.headers_sent)
695        self.assertIn('X-XSS-Protection', req2.headers_sent)
696        self.assertEqual('0', req2.headers_sent['X-XSS-Protection'])
697
698
699class HdfdumpTestCase(unittest.TestCase):
700
701    components = []
702
703    @classmethod
704    def setUpClass(cls):
705        class HdfdumpRequestHandler(Component):
706            implements(IRequestHandler)
707            def match_request(self, req):
708                return True
709            def process_request(self, req):
710                data = {'name': 'value'}
711                return 'error.html', data
712
713        cls.components = [HdfdumpRequestHandler]
714
715    @classmethod
716    def tearDownClass(cls):
717        from trac.core import ComponentMeta
718        for component in cls.components:
719            ComponentMeta.deregister(component)
720
721    def setUp(self):
722        self.env = EnvironmentStub(enable=['trac.web.*'])
723        self.req = MockRequest(self.env, args={'hdfdump': '1'})
724        self.request_dispatcher = RequestDispatcher(self.env)
725
726    def test_hdfdump(self):
727        self.env.config.set('trac', 'default_handler', 'HdfdumpRequestHandler')
728        self.assertRaises(RequestDone, self.request_dispatcher.dispatch,
729                          self.req)
730        self.assertIn(b"{'name': 'value'}",
731                      self.req.response_sent.getvalue())
732        self.assertEqual('text/plain;charset=utf-8',
733                         self.req.headers_sent['Content-Type'])
734
735
736class SendErrorTestCase(unittest.TestCase):
737
738    use_chunked_encoding = False
739
740    components = None
741    env_path = None
742
743    @classmethod
744    def setUpClass(cls):
745        class RaiseExceptionHandler(Component):
746            implements(IRequestHandler)
747
748            def match_request(self, req):
749                if req.path_info.startswith('/raise-exception'):
750                    return True
751
752            def process_request(self, req):
753                if req.args.get('type') == 'tracerror':
754                    raise TracError("The TracError message")
755                else:
756                    raise Exception("The Exception message")
757
758        cls.components = [RaiseExceptionHandler]
759        cls.env_path = mkdtemp()
760        env = trac.env.Environment(path=cls.env_path, create=True)
761        PermissionSystem(env).grant_permission('admin', 'TRAC_ADMIN')
762        env.shutdown()
763
764    @classmethod
765    def tearDownClass(cls):
766        from trac.core import ComponentMeta
767        for component in cls.components:
768            ComponentMeta.deregister(component)
769        if cls.env_path in trac.env.env_cache:
770            trac.env.env_cache[cls.env_path].shutdown()
771            del trac.env.env_cache[cls.env_path]
772        EnvironmentStub(path=cls.env_path, destroying=True).reset_db_and_disk()
773
774    def _make_environ(self, scheme='http', server_name='example.org',
775                      server_port=80, method='GET', script_name='/',
776                      env_path=None, **kwargs):
777        environ = {'wsgi.url_scheme': scheme, 'wsgi.input': io.BytesIO(),
778                   'REQUEST_METHOD': method, 'SERVER_NAME': server_name,
779                   'SERVER_PORT': server_port, 'SCRIPT_NAME': script_name,
780                   'trac.env_path': env_path or self.env_path,
781                   'wsgi.run_once': False}
782        environ.update(kwargs)
783        return environ
784
785    def _make_start_response(self):
786        self.status_sent = []
787        self.headers_sent = {}
788        self.response_sent = io.BytesIO()
789
790        def start_response(status, headers, exc_info=None):
791            self.status_sent.append(status)
792            self.headers_sent.update(dict(headers))
793            return self.response_sent.write
794
795        return start_response
796
797    def _set_config(self, admin_trac_url):
798        env = trac.env.open_environment(self.env_path, use_cache=True)
799        env.config.set('trac', 'use_chunked_encoding',
800                       self.use_chunked_encoding)
801        env.config.set('project', 'admin_trac_url', admin_trac_url)
802        env.config.save()
803
804    def assert_internal_error(self, content):
805        self.assertEqual('500 Internal Server Error', self.status_sent[0])
806        self.assertEqual('text/html;charset=utf-8',
807                         self.headers_sent['Content-Type'])
808        self.assertIn(b'<h1>Oops\xe2\x80\xa6</h1>', content)
809
810    def test_trac_error(self):
811        self._set_config(admin_trac_url='.')
812        environ = self._make_environ(PATH_INFO='/raise-exception',
813                                     QUERY_STRING='type=tracerror')
814        dispatch_request(environ, self._make_start_response())
815
816        content = self.response_sent.getvalue()
817        self.assertEqual('500 Internal Server Error', self.status_sent[0])
818        self.assertEqual('text/html;charset=utf-8',
819                         self.headers_sent['Content-Type'])
820        self.assertIn(b'<h1>Trac Error</h1>', content)
821        self.assertIn(b'<p class="message">The TracError message</p>', content)
822        self.assertNotIn(b'<strong>Trac detected an internal error:</strong>',
823                         content)
824        self.assertNotIn(b'There was an internal error in Trac.', content)
825
826    def test_internal_error_for_non_admin(self):
827        self._set_config(admin_trac_url='.')
828        environ = self._make_environ(PATH_INFO='/raise-exception')
829
830        dispatch_request(environ, self._make_start_response())
831        content = self.response_sent.getvalue()
832
833        self.assert_internal_error(content)
834        self.assertIn(b'There was an internal error in Trac.', content)
835        self.assertIn(b'<p>\nTo that end, you could', content)
836        self.assertNotIn(b'This is probably a local installation issue.',
837                         content)
838        self.assertNotIn(b'<h2>Found a bug in Trac?</h2>', content)
839
840    def test_internal_error_with_admin_trac_url_for_non_admin(self):
841        self._set_config(admin_trac_url='http://example.org/admin')
842        environ = self._make_environ(PATH_INFO='/raise-exception')
843
844        dispatch_request(environ, self._make_start_response())
845        content = self.response_sent.getvalue()
846
847        self.assert_internal_error(content)
848        self.assertIn(b'There was an internal error in Trac.', content)
849        self.assertIn(b'<p>\nTo that end, you could', content)
850        self.assertIn(b' action="http://example.org/admin/newticket#"', content)
851        self.assertNotIn(b'This is probably a local installation issue.',
852                         content)
853        self.assertNotIn(b'<h2>Found a bug in Trac?</h2>', content)
854
855    def test_internal_error_without_admin_trac_url_for_non_admin(self):
856        self._set_config(admin_trac_url='')
857        environ = self._make_environ(PATH_INFO='/raise-exception')
858
859        dispatch_request(environ, self._make_start_response())
860        content = self.response_sent.getvalue()
861
862        self.assert_internal_error(content)
863        self.assertIn(b'There was an internal error in Trac.', content)
864        self.assertNotIn(b'<p>\nTo that end, you could', content)
865        self.assertNotIn(b'This is probably a local installation issue.',
866                         content)
867        self.assertNotIn(b'<h2>Found a bug in Trac?</h2>', content)
868
869    def test_internal_error_for_admin(self):
870        self._set_config(admin_trac_url='.')
871        environ = self._make_environ(PATH_INFO='/raise-exception',
872                                     REMOTE_USER='admin')
873
874        dispatch_request(environ, self._make_start_response())
875        content = self.response_sent.getvalue()
876
877        self.assert_internal_error(content)
878        self.assertNotIn(b'There was an internal error in Trac.', content)
879        self.assertIn(b'This is probably a local installation issue.', content)
880        self.assertNotIn(b'a ticket at the admin Trac to report', content)
881        self.assertIn(b'<h2>Found a bug in Trac?</h2>', content)
882        self.assertIn(b'<p>\nOtherwise, please', content)
883        self.assertIn(b' action="https://trac.edgewall.org/newticket"',
884                      content)
885
886    def test_internal_error_with_admin_trac_url_for_admin(self):
887        self._set_config(admin_trac_url='http://example.org/admin')
888        environ = self._make_environ(PATH_INFO='/raise-exception',
889                                     REMOTE_USER='admin')
890
891        dispatch_request(environ, self._make_start_response())
892        content = self.response_sent.getvalue()
893
894        self.assert_internal_error(content)
895        self.assertNotIn(b'There was an internal error in Trac.', content)
896        self.assertIn(b'This is probably a local installation issue.', content)
897        self.assertIn(b'a ticket at the admin Trac to report', content)
898        self.assertIn(b' action="http://example.org/admin/newticket#"', content)
899        self.assertIn(b'<h2>Found a bug in Trac?</h2>', content)
900        self.assertIn(b'<p>\nOtherwise, please', content)
901        self.assertIn(b' action="https://trac.edgewall.org/newticket"',
902                      content)
903
904    def test_internal_error_without_admin_trac_url_for_admin(self):
905        self._set_config(admin_trac_url='')
906        environ = self._make_environ(PATH_INFO='/raise-exception',
907                                     REMOTE_USER='admin')
908
909        dispatch_request(environ, self._make_start_response())
910        content = self.response_sent.getvalue()
911
912        self.assert_internal_error(content)
913        self.assertNotIn(b'There was an internal error in Trac.', content)
914        self.assertIn(b'This is probably a local installation issue.', content)
915        self.assertNotIn(b'a ticket at the admin Trac to report', content)
916        self.assertIn(b'<h2>Found a bug in Trac?</h2>', content)
917        self.assertIn(b'<p>\nOtherwise, please', content)
918        self.assertIn(b' action="https://trac.edgewall.org/newticket"',
919                      content)
920
921    def test_environment_not_found(self):
922        """User error reported when environment is not found."""
923        env_path = self.env_path + '$'  # Arbitrarily modified path
924        environ = self._make_environ(PATH_INFO='/', env_path=env_path)
925
926        dispatch_request(environ, self._make_start_response())
927        content = self.response_sent.getvalue()
928
929        self.assertEqual(
930            "Trac Error\n"
931            "\n"
932            "TracError: No Trac environment found at {}\n"
933            "FileNotFoundError: [Errno 2] No such file or directory: '{}'"
934            .format(env_path, os.path.join(env_path, 'VERSION'))
935            .encode('utf-8'), content)
936
937
938class SendErrorUseChunkedEncodingTestCase(SendErrorTestCase):
939
940    use_chunked_encoding = True
941
942
943def test_suite():
944    suite = unittest.TestSuite()
945    suite.addTest(unittest.makeSuite(AuthenticateTestCase))
946    suite.addTest(unittest.makeSuite(EnvironmentsTestCase))
947    suite.addTest(unittest.makeSuite(PreProcessRequestTestCase))
948    suite.addTest(unittest.makeSuite(ProcessRequestTestCase))
949    suite.addTest(unittest.makeSuite(PostProcessRequestTestCase))
950    suite.addTest(unittest.makeSuite(RequestDispatcherTestCase))
951    suite.addTest(unittest.makeSuite(HdfdumpTestCase))
952    suite.addTest(unittest.makeSuite(SendErrorTestCase))
953    suite.addTest(unittest.makeSuite(SendErrorUseChunkedEncodingTestCase))
954    return suite
955
956
957if __name__ == '__main__':
958    unittest.main(defaultTest='test_suite')
959