1# -*- coding: utf-8 -*- 2import contextlib 3import io 4import os 5import sys 6import platform 7import tempfile 8 9from six import text_type as str 10from six.moves import urllib 11from six.moves.http_client import HTTPConnection 12 13import pytest 14import py.path 15 16import cherrypy 17from cherrypy.lib import static 18from cherrypy._cpcompat import HTTPSConnection, ntou, tonative 19from cherrypy.test import helper 20 21 22@pytest.fixture 23def unicode_filesystem(tmpdir): 24 _check_unicode_filesystem(tmpdir) 25 26 27def _check_unicode_filesystem(tmpdir): 28 filename = tmpdir / ntou('☃', 'utf-8') 29 tmpl = 'File system encoding ({encoding}) cannot support unicode filenames' 30 msg = tmpl.format(encoding=sys.getfilesystemencoding()) 31 try: 32 io.open(str(filename), 'w').close() 33 except UnicodeEncodeError: 34 pytest.skip(msg) 35 36 37def ensure_unicode_filesystem(): 38 """ 39 TODO: replace with simply pytest fixtures once webtest.TestCase 40 no longer implies unittest. 41 """ 42 tmpdir = py.path.local(tempfile.mkdtemp()) 43 try: 44 _check_unicode_filesystem(tmpdir) 45 finally: 46 tmpdir.remove() 47 48 49curdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) 50has_space_filepath = os.path.join(curdir, 'static', 'has space.html') 51bigfile_filepath = os.path.join(curdir, 'static', 'bigfile.log') 52 53# The file size needs to be big enough such that half the size of it 54# won't be socket-buffered (or server-buffered) all in one go. See 55# test_file_stream. 56MB = 2 ** 20 57BIGFILE_SIZE = 32 * MB 58 59 60class StaticTest(helper.CPWebCase): 61 62 @staticmethod 63 def setup_server(): 64 if not os.path.exists(has_space_filepath): 65 with open(has_space_filepath, 'wb') as f: 66 f.write(b'Hello, world\r\n') 67 needs_bigfile = ( 68 not os.path.exists(bigfile_filepath) or 69 os.path.getsize(bigfile_filepath) != BIGFILE_SIZE 70 ) 71 if needs_bigfile: 72 with open(bigfile_filepath, 'wb') as f: 73 f.write(b'x' * BIGFILE_SIZE) 74 75 class Root: 76 77 @cherrypy.expose 78 @cherrypy.config(**{'response.stream': True}) 79 def bigfile(self): 80 self.f = static.serve_file(bigfile_filepath) 81 return self.f 82 83 @cherrypy.expose 84 def tell(self): 85 if self.f.input.closed: 86 return '' 87 return repr(self.f.input.tell()).rstrip('L') 88 89 @cherrypy.expose 90 def fileobj(self): 91 f = open(os.path.join(curdir, 'style.css'), 'rb') 92 return static.serve_fileobj(f, content_type='text/css') 93 94 @cherrypy.expose 95 def bytesio(self): 96 f = io.BytesIO(b'Fee\nfie\nfo\nfum') 97 return static.serve_fileobj(f, content_type='text/plain') 98 99 class Static: 100 101 @cherrypy.expose 102 def index(self): 103 return 'You want the Baron? You can have the Baron!' 104 105 @cherrypy.expose 106 def dynamic(self): 107 return 'This is a DYNAMIC page' 108 109 root = Root() 110 root.static = Static() 111 112 rootconf = { 113 '/static': { 114 'tools.staticdir.on': True, 115 'tools.staticdir.dir': 'static', 116 'tools.staticdir.root': curdir, 117 }, 118 '/static-long': { 119 'tools.staticdir.on': True, 120 'tools.staticdir.dir': r'\\?\%s' % curdir, 121 }, 122 '/style.css': { 123 'tools.staticfile.on': True, 124 'tools.staticfile.filename': os.path.join(curdir, 'style.css'), 125 }, 126 '/docroot': { 127 'tools.staticdir.on': True, 128 'tools.staticdir.root': curdir, 129 'tools.staticdir.dir': 'static', 130 'tools.staticdir.index': 'index.html', 131 }, 132 '/error': { 133 'tools.staticdir.on': True, 134 'request.show_tracebacks': True, 135 }, 136 '/404test': { 137 'tools.staticdir.on': True, 138 'tools.staticdir.root': curdir, 139 'tools.staticdir.dir': 'static', 140 'error_page.404': error_page_404, 141 } 142 } 143 rootApp = cherrypy.Application(root) 144 rootApp.merge(rootconf) 145 146 test_app_conf = { 147 '/test': { 148 'tools.staticdir.index': 'index.html', 149 'tools.staticdir.on': True, 150 'tools.staticdir.root': curdir, 151 'tools.staticdir.dir': 'static', 152 }, 153 } 154 testApp = cherrypy.Application(Static()) 155 testApp.merge(test_app_conf) 156 157 vhost = cherrypy._cpwsgi.VirtualHost(rootApp, {'virt.net': testApp}) 158 cherrypy.tree.graft(vhost) 159 160 @staticmethod 161 def teardown_server(): 162 for f in (has_space_filepath, bigfile_filepath): 163 if os.path.exists(f): 164 try: 165 os.unlink(f) 166 except Exception: 167 pass 168 169 def test_static(self): 170 self.getPage('/static/index.html') 171 self.assertStatus('200 OK') 172 self.assertHeader('Content-Type', 'text/html') 173 self.assertBody('Hello, world\r\n') 174 175 # Using a staticdir.root value in a subdir... 176 self.getPage('/docroot/index.html') 177 self.assertStatus('200 OK') 178 self.assertHeader('Content-Type', 'text/html') 179 self.assertBody('Hello, world\r\n') 180 181 # Check a filename with spaces in it 182 self.getPage('/static/has%20space.html') 183 self.assertStatus('200 OK') 184 self.assertHeader('Content-Type', 'text/html') 185 self.assertBody('Hello, world\r\n') 186 187 self.getPage('/style.css') 188 self.assertStatus('200 OK') 189 self.assertHeader('Content-Type', 'text/css') 190 # Note: The body should be exactly 'Dummy stylesheet\n', but 191 # unfortunately some tools such as WinZip sometimes turn \n 192 # into \r\n on Windows when extracting the CherryPy tarball so 193 # we just check the content 194 self.assertMatchesBody('^Dummy stylesheet') 195 196 @pytest.mark.skipif(platform.system() != 'Windows', reason='Windows only') 197 def test_static_longpath(self): 198 """Test serving of a file in subdir of a Windows long-path 199 staticdir.""" 200 self.getPage('/static-long/static/index.html') 201 self.assertStatus('200 OK') 202 self.assertHeader('Content-Type', 'text/html') 203 self.assertBody('Hello, world\r\n') 204 205 def test_fallthrough(self): 206 # Test that NotFound will then try dynamic handlers (see [878]). 207 self.getPage('/static/dynamic') 208 self.assertBody('This is a DYNAMIC page') 209 210 # Check a directory via fall-through to dynamic handler. 211 self.getPage('/static/') 212 self.assertStatus('200 OK') 213 self.assertHeader('Content-Type', 'text/html;charset=utf-8') 214 self.assertBody('You want the Baron? You can have the Baron!') 215 216 def test_index(self): 217 # Check a directory via "staticdir.index". 218 self.getPage('/docroot/') 219 self.assertStatus('200 OK') 220 self.assertHeader('Content-Type', 'text/html') 221 self.assertBody('Hello, world\r\n') 222 # The same page should be returned even if redirected. 223 self.getPage('/docroot') 224 self.assertStatus(301) 225 self.assertHeader('Location', '%s/docroot/' % self.base()) 226 self.assertMatchesBody( 227 "This resource .* <a href=(['\"])%s/docroot/\\1>" 228 '%s/docroot/</a>.' 229 % (self.base(), self.base()) 230 ) 231 232 def test_config_errors(self): 233 # Check that we get an error if no .file or .dir 234 self.getPage('/error/thing.html') 235 self.assertErrorPage(500) 236 if sys.version_info >= (3, 3): 237 errmsg = ( 238 r'TypeError: staticdir\(\) missing 2 ' 239 'required positional arguments' 240 ) 241 else: 242 errmsg = ( 243 r'TypeError: staticdir\(\) takes at least 2 ' 244 r'(positional )?arguments \(0 given\)' 245 ) 246 self.assertMatchesBody(errmsg.encode('ascii')) 247 248 def test_security(self): 249 # Test up-level security 250 self.getPage('/static/../../test/style.css') 251 self.assertStatus((400, 403)) 252 253 def test_modif(self): 254 # Test modified-since on a reasonably-large file 255 self.getPage('/static/dirback.jpg') 256 self.assertStatus('200 OK') 257 lastmod = '' 258 for k, v in self.headers: 259 if k == 'Last-Modified': 260 lastmod = v 261 ims = ('If-Modified-Since', lastmod) 262 self.getPage('/static/dirback.jpg', headers=[ims]) 263 self.assertStatus(304) 264 self.assertNoHeader('Content-Type') 265 self.assertNoHeader('Content-Length') 266 self.assertNoHeader('Content-Disposition') 267 self.assertBody('') 268 269 def test_755_vhost(self): 270 self.getPage('/test/', [('Host', 'virt.net')]) 271 self.assertStatus(200) 272 self.getPage('/test', [('Host', 'virt.net')]) 273 self.assertStatus(301) 274 self.assertHeader('Location', self.scheme + '://virt.net/test/') 275 276 def test_serve_fileobj(self): 277 self.getPage('/fileobj') 278 self.assertStatus('200 OK') 279 self.assertHeader('Content-Type', 'text/css;charset=utf-8') 280 self.assertMatchesBody('^Dummy stylesheet') 281 282 def test_serve_bytesio(self): 283 self.getPage('/bytesio') 284 self.assertStatus('200 OK') 285 self.assertHeader('Content-Type', 'text/plain;charset=utf-8') 286 self.assertHeader('Content-Length', 14) 287 self.assertMatchesBody('Fee\nfie\nfo\nfum') 288 289 @pytest.mark.xfail(reason='#1475') 290 def test_file_stream(self): 291 if cherrypy.server.protocol_version != 'HTTP/1.1': 292 return self.skip() 293 294 self.PROTOCOL = 'HTTP/1.1' 295 296 # Make an initial request 297 self.persistent = True 298 conn = self.HTTP_CONN 299 conn.putrequest('GET', '/bigfile', skip_host=True) 300 conn.putheader('Host', self.HOST) 301 conn.endheaders() 302 response = conn.response_class(conn.sock, method='GET') 303 response.begin() 304 self.assertEqual(response.status, 200) 305 306 body = b'' 307 remaining = BIGFILE_SIZE 308 while remaining > 0: 309 data = response.fp.read(65536) 310 if not data: 311 break 312 body += data 313 remaining -= len(data) 314 315 if self.scheme == 'https': 316 newconn = HTTPSConnection 317 else: 318 newconn = HTTPConnection 319 s, h, b = helper.webtest.openURL( 320 b'/tell', headers=[], host=self.HOST, port=self.PORT, 321 http_conn=newconn) 322 if not b: 323 # The file was closed on the server. 324 tell_position = BIGFILE_SIZE 325 else: 326 tell_position = int(b) 327 328 read_so_far = len(body) 329 330 # It is difficult for us to force the server to only read 331 # the bytes that we ask for - there are going to be buffers 332 # inbetween. 333 # 334 # CherryPy will attempt to write as much data as it can to 335 # the socket, and we don't have a way to determine what that 336 # size will be. So we make the following assumption - by 337 # the time we have read in the entire file on the server, 338 # we will have at least received half of it. If this is not 339 # the case, then this is an indicator that either: 340 # - machines that are running this test are using buffer 341 # sizes greater than half of BIGFILE_SIZE; or 342 # - streaming is broken. 343 # 344 # At the time of writing, we seem to have encountered 345 # buffer sizes bigger than 512K, so we've increased 346 # BIGFILE_SIZE to 4MB and in 2016 to 20MB and then 32MB. 347 # This test is going to keep failing according to the 348 # improvements in hardware and OS buffers. 349 if tell_position >= BIGFILE_SIZE: 350 if read_so_far < (BIGFILE_SIZE / 2): 351 self.fail( 352 'The file should have advanced to position %r, but ' 353 'has already advanced to the end of the file. It ' 354 'may not be streamed as intended, or at the wrong ' 355 'chunk size (64k)' % read_so_far) 356 elif tell_position < read_so_far: 357 self.fail( 358 'The file should have advanced to position %r, but has ' 359 'only advanced to position %r. It may not be streamed ' 360 'as intended, or at the wrong chunk size (64k)' % 361 (read_so_far, tell_position)) 362 363 if body != b'x' * BIGFILE_SIZE: 364 self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." % 365 (BIGFILE_SIZE, body[:50], len(body))) 366 conn.close() 367 368 def test_file_stream_deadlock(self): 369 if cherrypy.server.protocol_version != 'HTTP/1.1': 370 return self.skip() 371 372 self.PROTOCOL = 'HTTP/1.1' 373 374 # Make an initial request but abort early. 375 self.persistent = True 376 conn = self.HTTP_CONN 377 conn.putrequest('GET', '/bigfile', skip_host=True) 378 conn.putheader('Host', self.HOST) 379 conn.endheaders() 380 response = conn.response_class(conn.sock, method='GET') 381 response.begin() 382 self.assertEqual(response.status, 200) 383 body = response.fp.read(65536) 384 if body != b'x' * len(body): 385 self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." % 386 (65536, body[:50], len(body))) 387 response.close() 388 conn.close() 389 390 # Make a second request, which should fetch the whole file. 391 self.persistent = False 392 self.getPage('/bigfile') 393 if self.body != b'x' * BIGFILE_SIZE: 394 self.fail("Body != 'x' * %d. Got %r instead (%d bytes)." % 395 (BIGFILE_SIZE, self.body[:50], len(body))) 396 397 def test_error_page_with_serve_file(self): 398 self.getPage('/404test/yunyeen') 399 self.assertStatus(404) 400 self.assertInBody("I couldn't find that thing") 401 402 def test_null_bytes(self): 403 self.getPage('/static/\x00') 404 self.assertStatus('404 Not Found') 405 406 @staticmethod 407 @contextlib.contextmanager 408 def unicode_file(): 409 filename = ntou('Слава Україні.html', 'utf-8') 410 filepath = os.path.join(curdir, 'static', filename) 411 with io.open(filepath, 'w', encoding='utf-8') as strm: 412 strm.write(ntou('Героям Слава!', 'utf-8')) 413 try: 414 yield 415 finally: 416 os.remove(filepath) 417 418 py27_on_windows = ( 419 platform.system() == 'Windows' and 420 sys.version_info < (3,) 421 ) 422 @pytest.mark.xfail(py27_on_windows, reason='#1544') # noqa: E301 423 def test_unicode(self): 424 ensure_unicode_filesystem() 425 with self.unicode_file(): 426 url = ntou('/static/Слава Україні.html', 'utf-8') 427 # quote function requires str 428 url = tonative(url, 'utf-8') 429 url = urllib.parse.quote(url) 430 self.getPage(url) 431 432 expected = ntou('Героям Слава!', 'utf-8') 433 self.assertInBody(expected) 434 435 436def error_page_404(status, message, traceback, version): 437 path = os.path.join(curdir, 'static', '404.html') 438 return static.serve_file(path, content_type='text/html') 439