1# -*- coding: utf-8 -*-
2import contextlib
3import io
4import os
5import sys
6import platform
7import tempfile
8
9from six import text_type as str
10from six.moves import urllib
11from six.moves.http_client import HTTPConnection
12
13import pytest
14import py.path
15
16import cherrypy
17from cherrypy.lib import static
18from cherrypy._cpcompat import HTTPSConnection, ntou, tonative
19from cherrypy.test import helper
20
21
22@pytest.fixture
23def unicode_filesystem(tmpdir):
24    _check_unicode_filesystem(tmpdir)
25
26
27def _check_unicode_filesystem(tmpdir):
28    filename = tmpdir / ntou('☃', 'utf-8')
29    tmpl = 'File system encoding ({encoding}) cannot support unicode filenames'
30    msg = tmpl.format(encoding=sys.getfilesystemencoding())
31    try:
32        io.open(str(filename), 'w').close()
33    except UnicodeEncodeError:
34        pytest.skip(msg)
35
36
37def ensure_unicode_filesystem():
38    """
39    TODO: replace with simply pytest fixtures once webtest.TestCase
40    no longer implies unittest.
41    """
42    tmpdir = py.path.local(tempfile.mkdtemp())
43    try:
44        _check_unicode_filesystem(tmpdir)
45    finally:
46        tmpdir.remove()
47
48
49curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
50has_space_filepath = os.path.join(curdir, 'static', 'has space.html')
51bigfile_filepath = os.path.join(curdir, 'static', 'bigfile.log')
52
53# The file size needs to be big enough such that half the size of it
54# won't be socket-buffered (or server-buffered) all in one go. See
55# test_file_stream.
56MB = 2 ** 20
57BIGFILE_SIZE = 32 * MB
58
59
60class StaticTest(helper.CPWebCase):
61
62    @staticmethod
63    def setup_server():
64        if not os.path.exists(has_space_filepath):
65            with open(has_space_filepath, 'wb') as f:
66                f.write(b'Hello, world\r\n')
67        needs_bigfile = (
68            not os.path.exists(bigfile_filepath) or
69            os.path.getsize(bigfile_filepath) != BIGFILE_SIZE
70        )
71        if needs_bigfile:
72            with open(bigfile_filepath, 'wb') as f:
73                f.write(b'x' * BIGFILE_SIZE)
74
75        class Root:
76
77            @cherrypy.expose
78            @cherrypy.config(**{'response.stream': True})
79            def bigfile(self):
80                self.f = static.serve_file(bigfile_filepath)
81                return self.f
82
83            @cherrypy.expose
84            def tell(self):
85                if self.f.input.closed:
86                    return ''
87                return repr(self.f.input.tell()).rstrip('L')
88
89            @cherrypy.expose
90            def fileobj(self):
91                f = open(os.path.join(curdir, 'style.css'), 'rb')
92                return static.serve_fileobj(f, content_type='text/css')
93
94            @cherrypy.expose
95            def bytesio(self):
96                f = io.BytesIO(b'Fee\nfie\nfo\nfum')
97                return static.serve_fileobj(f, content_type='text/plain')
98
99        class Static:
100
101            @cherrypy.expose
102            def index(self):
103                return 'You want the Baron? You can have the Baron!'
104
105            @cherrypy.expose
106            def dynamic(self):
107                return 'This is a DYNAMIC page'
108
109        root = Root()
110        root.static = Static()
111
112        rootconf = {
113            '/static': {
114                'tools.staticdir.on': True,
115                'tools.staticdir.dir': 'static',
116                'tools.staticdir.root': curdir,
117            },
118            '/static-long': {
119                'tools.staticdir.on': True,
120                'tools.staticdir.dir': r'\\?\%s' % curdir,
121            },
122            '/style.css': {
123                'tools.staticfile.on': True,
124                'tools.staticfile.filename': os.path.join(curdir, 'style.css'),
125            },
126            '/docroot': {
127                'tools.staticdir.on': True,
128                'tools.staticdir.root': curdir,
129                'tools.staticdir.dir': 'static',
130                'tools.staticdir.index': 'index.html',
131            },
132            '/error': {
133                'tools.staticdir.on': True,
134                'request.show_tracebacks': True,
135            },
136            '/404test': {
137                'tools.staticdir.on': True,
138                'tools.staticdir.root': curdir,
139                'tools.staticdir.dir': 'static',
140                'error_page.404': error_page_404,
141            }
142        }
143        rootApp = cherrypy.Application(root)
144        rootApp.merge(rootconf)
145
146        test_app_conf = {
147            '/test': {
148                'tools.staticdir.index': 'index.html',
149                'tools.staticdir.on': True,
150                'tools.staticdir.root': curdir,
151                'tools.staticdir.dir': 'static',
152            },
153        }
154        testApp = cherrypy.Application(Static())
155        testApp.merge(test_app_conf)
156
157        vhost = cherrypy._cpwsgi.VirtualHost(rootApp, {'virt.net': testApp})
158        cherrypy.tree.graft(vhost)
159
160    @staticmethod
161    def teardown_server():
162        for f in (has_space_filepath, bigfile_filepath):
163            if os.path.exists(f):
164                try:
165                    os.unlink(f)
166                except Exception:
167                    pass
168
169    def test_static(self):
170        self.getPage('/static/index.html')
171        self.assertStatus('200 OK')
172        self.assertHeader('Content-Type', 'text/html')
173        self.assertBody('Hello, world\r\n')
174
175        # Using a staticdir.root value in a subdir...
176        self.getPage('/docroot/index.html')
177        self.assertStatus('200 OK')
178        self.assertHeader('Content-Type', 'text/html')
179        self.assertBody('Hello, world\r\n')
180
181        # Check a filename with spaces in it
182        self.getPage('/static/has%20space.html')
183        self.assertStatus('200 OK')
184        self.assertHeader('Content-Type', 'text/html')
185        self.assertBody('Hello, world\r\n')
186
187        self.getPage('/style.css')
188        self.assertStatus('200 OK')
189        self.assertHeader('Content-Type', 'text/css')
190        # Note: The body should be exactly 'Dummy stylesheet\n', but
191        #   unfortunately some tools such as WinZip sometimes turn \n
192        #   into \r\n on Windows when extracting the CherryPy tarball so
193        #   we just check the content
194        self.assertMatchesBody('^Dummy stylesheet')
195
196    @pytest.mark.skipif(platform.system() != 'Windows', reason='Windows only')
197    def test_static_longpath(self):
198        """Test serving of a file in subdir of a Windows long-path
199        staticdir."""
200        self.getPage('/static-long/static/index.html')
201        self.assertStatus('200 OK')
202        self.assertHeader('Content-Type', 'text/html')
203        self.assertBody('Hello, world\r\n')
204
205    def test_fallthrough(self):
206        # Test that NotFound will then try dynamic handlers (see [878]).
207        self.getPage('/static/dynamic')
208        self.assertBody('This is a DYNAMIC page')
209
210        # Check a directory via fall-through to dynamic handler.
211        self.getPage('/static/')
212        self.assertStatus('200 OK')
213        self.assertHeader('Content-Type', 'text/html;charset=utf-8')
214        self.assertBody('You want the Baron? You can have the Baron!')
215
216    def test_index(self):
217        # Check a directory via "staticdir.index".
218        self.getPage('/docroot/')
219        self.assertStatus('200 OK')
220        self.assertHeader('Content-Type', 'text/html')
221        self.assertBody('Hello, world\r\n')
222        # The same page should be returned even if redirected.
223        self.getPage('/docroot')
224        self.assertStatus(301)
225        self.assertHeader('Location', '%s/docroot/' % self.base())
226        self.assertMatchesBody(
227            "This resource .* <a href=(['\"])%s/docroot/\\1>"
228            '%s/docroot/</a>.'
229            % (self.base(), self.base())
230        )
231
232    def test_config_errors(self):
233        # Check that we get an error if no .file or .dir
234        self.getPage('/error/thing.html')
235        self.assertErrorPage(500)
236        if sys.version_info >= (3, 3):
237            errmsg = (
238                r'TypeError: staticdir\(\) missing 2 '
239                'required positional arguments'
240            )
241        else:
242            errmsg = (
243                r'TypeError: staticdir\(\) takes at least 2 '
244                r'(positional )?arguments \(0 given\)'
245            )
246        self.assertMatchesBody(errmsg.encode('ascii'))
247
248    def test_security(self):
249        # Test up-level security
250        self.getPage('/static/../../test/style.css')
251        self.assertStatus((400, 403))
252
253    def test_modif(self):
254        # Test modified-since on a reasonably-large file
255        self.getPage('/static/dirback.jpg')
256        self.assertStatus('200 OK')
257        lastmod = ''
258        for k, v in self.headers:
259            if k == 'Last-Modified':
260                lastmod = v
261        ims = ('If-Modified-Since', lastmod)
262        self.getPage('/static/dirback.jpg', headers=[ims])
263        self.assertStatus(304)
264        self.assertNoHeader('Content-Type')
265        self.assertNoHeader('Content-Length')
266        self.assertNoHeader('Content-Disposition')
267        self.assertBody('')
268
269    def test_755_vhost(self):
270        self.getPage('/test/', [('Host', 'virt.net')])
271        self.assertStatus(200)
272        self.getPage('/test', [('Host', 'virt.net')])
273        self.assertStatus(301)
274        self.assertHeader('Location', self.scheme + '://virt.net/test/')
275
276    def test_serve_fileobj(self):
277        self.getPage('/fileobj')
278        self.assertStatus('200 OK')
279        self.assertHeader('Content-Type', 'text/css;charset=utf-8')
280        self.assertMatchesBody('^Dummy stylesheet')
281
282    def test_serve_bytesio(self):
283        self.getPage('/bytesio')
284        self.assertStatus('200 OK')
285        self.assertHeader('Content-Type', 'text/plain;charset=utf-8')
286        self.assertHeader('Content-Length', 14)
287        self.assertMatchesBody('Fee\nfie\nfo\nfum')
288
289    @pytest.mark.xfail(reason='#1475')
290    def test_file_stream(self):
291        if cherrypy.server.protocol_version != 'HTTP/1.1':
292            return self.skip()
293
294        self.PROTOCOL = 'HTTP/1.1'
295
296        # Make an initial request
297        self.persistent = True
298        conn = self.HTTP_CONN
299        conn.putrequest('GET', '/bigfile', skip_host=True)
300        conn.putheader('Host', self.HOST)
301        conn.endheaders()
302        response = conn.response_class(conn.sock, method='GET')
303        response.begin()
304        self.assertEqual(response.status, 200)
305
306        body = b''
307        remaining = BIGFILE_SIZE
308        while remaining > 0:
309            data = response.fp.read(65536)
310            if not data:
311                break
312            body += data
313            remaining -= len(data)
314
315            if self.scheme == 'https':
316                newconn = HTTPSConnection
317            else:
318                newconn = HTTPConnection
319            s, h, b = helper.webtest.openURL(
320                b'/tell', headers=[], host=self.HOST, port=self.PORT,
321                http_conn=newconn)
322            if not b:
323                # The file was closed on the server.
324                tell_position = BIGFILE_SIZE
325            else:
326                tell_position = int(b)
327
328            read_so_far = len(body)
329
330            # It is difficult for us to force the server to only read
331            # the bytes that we ask for - there are going to be buffers
332            # inbetween.
333            #
334            # CherryPy will attempt to write as much data as it can to
335            # the socket, and we don't have a way to determine what that
336            # size will be. So we make the following assumption - by
337            # the time we have read in the entire file on the server,
338            # we will have at least received half of it. If this is not
339            # the case, then this is an indicator that either:
340            #   - machines that are running this test are using buffer
341            #     sizes greater than half of BIGFILE_SIZE; or
342            #   - streaming is broken.
343            #
344            # At the time of writing, we seem to have encountered
345            # buffer sizes bigger than 512K, so we've increased
346            # BIGFILE_SIZE to 4MB and in 2016 to 20MB and then 32MB.
347            # This test is going to keep failing according to the
348            # improvements in hardware and OS buffers.
349            if tell_position >= BIGFILE_SIZE:
350                if read_so_far < (BIGFILE_SIZE / 2):
351                    self.fail(
352                        'The file should have advanced to position %r, but '
353                        'has already advanced to the end of the file. It '
354                        'may not be streamed as intended, or at the wrong '
355                        'chunk size (64k)' % read_so_far)
356            elif tell_position < read_so_far:
357                self.fail(
358                    'The file should have advanced to position %r, but has '
359                    'only advanced to position %r. It may not be streamed '
360                    'as intended, or at the wrong chunk size (64k)' %
361                    (read_so_far, tell_position))
362
363        if body != b'x' * BIGFILE_SIZE:
364            self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
365                      (BIGFILE_SIZE, body[:50], len(body)))
366        conn.close()
367
368    def test_file_stream_deadlock(self):
369        if cherrypy.server.protocol_version != 'HTTP/1.1':
370            return self.skip()
371
372        self.PROTOCOL = 'HTTP/1.1'
373
374        # Make an initial request but abort early.
375        self.persistent = True
376        conn = self.HTTP_CONN
377        conn.putrequest('GET', '/bigfile', skip_host=True)
378        conn.putheader('Host', self.HOST)
379        conn.endheaders()
380        response = conn.response_class(conn.sock, method='GET')
381        response.begin()
382        self.assertEqual(response.status, 200)
383        body = response.fp.read(65536)
384        if body != b'x' * len(body):
385            self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
386                      (65536, body[:50], len(body)))
387        response.close()
388        conn.close()
389
390        # Make a second request, which should fetch the whole file.
391        self.persistent = False
392        self.getPage('/bigfile')
393        if self.body != b'x' * BIGFILE_SIZE:
394            self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." %
395                      (BIGFILE_SIZE, self.body[:50], len(body)))
396
397    def test_error_page_with_serve_file(self):
398        self.getPage('/404test/yunyeen')
399        self.assertStatus(404)
400        self.assertInBody("I couldn't find that thing")
401
402    def test_null_bytes(self):
403        self.getPage('/static/\x00')
404        self.assertStatus('404 Not Found')
405
406    @staticmethod
407    @contextlib.contextmanager
408    def unicode_file():
409        filename = ntou('Слава Україні.html', 'utf-8')
410        filepath = os.path.join(curdir, 'static', filename)
411        with io.open(filepath, 'w', encoding='utf-8') as strm:
412            strm.write(ntou('Героям Слава!', 'utf-8'))
413        try:
414            yield
415        finally:
416            os.remove(filepath)
417
418    py27_on_windows = (
419        platform.system() == 'Windows' and
420        sys.version_info < (3,)
421    )
422    @pytest.mark.xfail(py27_on_windows, reason='#1544')  # noqa: E301
423    def test_unicode(self):
424        ensure_unicode_filesystem()
425        with self.unicode_file():
426            url = ntou('/static/Слава Україні.html', 'utf-8')
427            # quote function requires str
428            url = tonative(url, 'utf-8')
429            url = urllib.parse.quote(url)
430            self.getPage(url)
431
432            expected = ntou('Героям Слава!', 'utf-8')
433            self.assertInBody(expected)
434
435
436def error_page_404(status, message, traceback, version):
437    path = os.path.join(curdir, 'static', '404.html')
438    return static.serve_file(path, content_type='text/html')
439