1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Tests for L{twisted.web.static}. 6""" 7import errno 8import inspect 9import mimetypes 10import os 11import re 12import sys 13import warnings 14from io import BytesIO as StringIO 15from unittest import skipIf 16 17from zope.interface.verify import verifyObject 18 19from twisted.internet import abstract, interfaces 20from twisted.python import compat, log 21from twisted.python.compat import networkString 22from twisted.python.filepath import FilePath 23from twisted.python.runtime import platform 24from twisted.trial.unittest import TestCase 25from twisted.web import http, resource, script, static 26from twisted.web._responses import FOUND 27from twisted.web.server import UnsupportedMethod 28from twisted.web.test._util import _render 29from twisted.web.test.requesthelper import DummyRequest 30 31 32class StaticDataTests(TestCase): 33 """ 34 Tests for L{Data}. 35 """ 36 37 def test_headRequest(self): 38 """ 39 L{Data.render} returns an empty response body for a I{HEAD} request. 40 """ 41 data = static.Data(b"foo", "bar") 42 request = DummyRequest([""]) 43 request.method = b"HEAD" 44 d = _render(data, request) 45 46 def cbRendered(ignored): 47 self.assertEqual(b"".join(request.written), b"") 48 49 d.addCallback(cbRendered) 50 return d 51 52 def test_invalidMethod(self): 53 """ 54 L{Data.render} raises L{UnsupportedMethod} in response to a non-I{GET}, 55 non-I{HEAD} request. 56 """ 57 data = static.Data(b"foo", b"bar") 58 request = DummyRequest([b""]) 59 request.method = b"POST" 60 self.assertRaises(UnsupportedMethod, data.render, request) 61 62 63class StaticFileTests(TestCase): 64 """ 65 Tests for the basic behavior of L{File}. 66 """ 67 68 def _render(self, resource, request): 69 return _render(resource, request) 70 71 def test_ignoredExtTrue(self): 72 """ 73 Passing C{1} as the value to L{File}'s C{ignoredExts} argument 74 issues a warning and sets the ignored extensions to the 75 wildcard C{"*"}. 76 """ 77 with warnings.catch_warnings(record=True) as caughtWarnings: 78 file = static.File(self.mktemp(), ignoredExts=1) 79 self.assertEqual(file.ignoredExts, ["*"]) 80 81 self.assertEqual(len(caughtWarnings), 1) 82 83 def test_ignoredExtFalse(self): 84 """ 85 Passing C{1} as the value to L{File}'s C{ignoredExts} argument 86 issues a warning and sets the ignored extensions to the empty 87 list. 88 """ 89 with warnings.catch_warnings(record=True) as caughtWarnings: 90 file = static.File(self.mktemp(), ignoredExts=0) 91 self.assertEqual(file.ignoredExts, []) 92 93 self.assertEqual(len(caughtWarnings), 1) 94 95 def test_allowExt(self): 96 """ 97 Passing C{1} as the value to L{File}'s C{allowExt} argument 98 issues a warning and sets the ignored extensions to the 99 wildcard C{*}. 100 """ 101 with warnings.catch_warnings(record=True) as caughtWarnings: 102 file = static.File(self.mktemp(), ignoredExts=True) 103 self.assertEqual(file.ignoredExts, ["*"]) 104 105 self.assertEqual(len(caughtWarnings), 1) 106 107 def test_invalidMethod(self): 108 """ 109 L{File.render} raises L{UnsupportedMethod} in response to a non-I{GET}, 110 non-I{HEAD} request. 111 """ 112 request = DummyRequest([b""]) 113 request.method = b"POST" 114 path = FilePath(self.mktemp()) 115 path.setContent(b"foo") 116 file = static.File(path.path) 117 self.assertRaises(UnsupportedMethod, file.render, request) 118 119 def test_notFound(self): 120 """ 121 If a request is made which encounters a L{File} before a final segment 122 which does not correspond to any file in the path the L{File} was 123 created with, a not found response is sent. 124 """ 125 base = FilePath(self.mktemp()) 126 base.makedirs() 127 file = static.File(base.path) 128 129 request = DummyRequest([b"foobar"]) 130 child = resource.getChildForRequest(file, request) 131 132 d = self._render(child, request) 133 134 def cbRendered(ignored): 135 self.assertEqual(request.responseCode, 404) 136 137 d.addCallback(cbRendered) 138 return d 139 140 def test_emptyChild(self): 141 """ 142 The C{''} child of a L{File} which corresponds to a directory in the 143 filesystem is a L{DirectoryLister}. 144 """ 145 base = FilePath(self.mktemp()) 146 base.makedirs() 147 file = static.File(base.path) 148 149 request = DummyRequest([b""]) 150 child = resource.getChildForRequest(file, request) 151 self.assertIsInstance(child, static.DirectoryLister) 152 self.assertEqual(child.path, base.path) 153 154 def test_emptyChildUnicodeParent(self): 155 """ 156 The C{u''} child of a L{File} which corresponds to a directory 157 whose path is text is a L{DirectoryLister} that renders to a 158 binary listing. 159 160 @see: U{https://twistedmatrix.com/trac/ticket/9438} 161 """ 162 textBase = FilePath(self.mktemp()).asTextMode() 163 textBase.makedirs() 164 textBase.child("text-file").open("w").close() 165 textFile = static.File(textBase.path) 166 167 request = DummyRequest([b""]) 168 child = resource.getChildForRequest(textFile, request) 169 self.assertIsInstance(child, static.DirectoryLister) 170 171 nativePath = compat.nativeString(textBase.path) 172 self.assertEqual(child.path, nativePath) 173 174 response = child.render(request) 175 self.assertIsInstance(response, bytes) 176 177 def test_securityViolationNotFound(self): 178 """ 179 If a request is made which encounters a L{File} before a final segment 180 which cannot be looked up in the filesystem due to security 181 considerations, a not found response is sent. 182 """ 183 base = FilePath(self.mktemp()) 184 base.makedirs() 185 file = static.File(base.path) 186 187 request = DummyRequest([b".."]) 188 child = resource.getChildForRequest(file, request) 189 190 d = self._render(child, request) 191 192 def cbRendered(ignored): 193 self.assertEqual(request.responseCode, 404) 194 195 d.addCallback(cbRendered) 196 return d 197 198 @skipIf(platform.isWindows(), "Cannot remove read permission on Windows") 199 def test_forbiddenResource(self): 200 """ 201 If the file in the filesystem which would satisfy a request cannot be 202 read, L{File.render} sets the HTTP response code to I{FORBIDDEN}. 203 """ 204 base = FilePath(self.mktemp()) 205 base.setContent(b"") 206 # Make sure we can delete the file later. 207 self.addCleanup(base.chmod, 0o700) 208 209 # Get rid of our own read permission. 210 base.chmod(0) 211 212 file = static.File(base.path) 213 request = DummyRequest([b""]) 214 d = self._render(file, request) 215 216 def cbRendered(ignored): 217 self.assertEqual(request.responseCode, 403) 218 219 d.addCallback(cbRendered) 220 return d 221 222 def test_undecodablePath(self): 223 """ 224 A request whose path cannot be decoded as UTF-8 receives a not 225 found response, and the failure is logged. 226 """ 227 path = self.mktemp() 228 if isinstance(path, bytes): 229 path = path.decode("ascii") 230 base = FilePath(path) 231 base.makedirs() 232 233 file = static.File(base.path) 234 request = DummyRequest([b"\xff"]) 235 child = resource.getChildForRequest(file, request) 236 237 d = self._render(child, request) 238 239 def cbRendered(ignored): 240 self.assertEqual(request.responseCode, 404) 241 self.assertEqual(len(self.flushLoggedErrors(UnicodeDecodeError)), 1) 242 243 d.addCallback(cbRendered) 244 return d 245 246 def test_forbiddenResource_default(self): 247 """ 248 L{File.forbidden} defaults to L{resource.ForbiddenResource}. 249 """ 250 self.assertIsInstance(static.File(b".").forbidden, resource.ForbiddenResource) 251 252 def test_forbiddenResource_customize(self): 253 """ 254 The resource rendered for forbidden requests is stored as a class 255 member so that users can customize it. 256 """ 257 base = FilePath(self.mktemp()) 258 base.setContent(b"") 259 markerResponse = b"custom-forbidden-response" 260 261 def failingOpenForReading(): 262 raise OSError(errno.EACCES, "") 263 264 class CustomForbiddenResource(resource.Resource): 265 def render(self, request): 266 return markerResponse 267 268 class CustomStaticFile(static.File): 269 forbidden = CustomForbiddenResource() 270 271 fileResource = CustomStaticFile(base.path) 272 fileResource.openForReading = failingOpenForReading 273 request = DummyRequest([b""]) 274 275 result = fileResource.render(request) 276 277 self.assertEqual(markerResponse, result) 278 279 def test_indexNames(self): 280 """ 281 If a request is made which encounters a L{File} before a final empty 282 segment, a file in the L{File} instance's C{indexNames} list which 283 exists in the path the L{File} was created with is served as the 284 response to the request. 285 """ 286 base = FilePath(self.mktemp()) 287 base.makedirs() 288 base.child("foo.bar").setContent(b"baz") 289 file = static.File(base.path) 290 file.indexNames = ["foo.bar"] 291 292 request = DummyRequest([b""]) 293 child = resource.getChildForRequest(file, request) 294 295 d = self._render(child, request) 296 297 def cbRendered(ignored): 298 self.assertEqual(b"".join(request.written), b"baz") 299 self.assertEqual( 300 request.responseHeaders.getRawHeaders(b"content-length")[0], b"3" 301 ) 302 303 d.addCallback(cbRendered) 304 return d 305 306 def test_staticFile(self): 307 """ 308 If a request is made which encounters a L{File} before a final segment 309 which names a file in the path the L{File} was created with, that file 310 is served as the response to the request. 311 """ 312 base = FilePath(self.mktemp()) 313 base.makedirs() 314 base.child("foo.bar").setContent(b"baz") 315 file = static.File(base.path) 316 317 request = DummyRequest([b"foo.bar"]) 318 child = resource.getChildForRequest(file, request) 319 320 d = self._render(child, request) 321 322 def cbRendered(ignored): 323 self.assertEqual(b"".join(request.written), b"baz") 324 self.assertEqual( 325 request.responseHeaders.getRawHeaders(b"content-length")[0], b"3" 326 ) 327 328 d.addCallback(cbRendered) 329 return d 330 331 @skipIf( 332 sys.getfilesystemencoding().lower() not in ("utf-8", "mcbs"), 333 "Cannot write unicode filenames with file system encoding of" 334 " {}".format(sys.getfilesystemencoding()), 335 ) 336 def test_staticFileUnicodeFileName(self): 337 """ 338 A request for a existing unicode file path encoded as UTF-8 339 returns the contents of that file. 340 """ 341 name = "\N{GREEK SMALL LETTER ETA WITH PERISPOMENI}" 342 content = b"content" 343 344 base = FilePath(self.mktemp()) 345 base.makedirs() 346 base.child(name).setContent(content) 347 file = static.File(base.path) 348 349 request = DummyRequest([name.encode("utf-8")]) 350 child = resource.getChildForRequest(file, request) 351 352 d = self._render(child, request) 353 354 def cbRendered(ignored): 355 self.assertEqual(b"".join(request.written), content) 356 self.assertEqual( 357 request.responseHeaders.getRawHeaders(b"content-length")[0], 358 networkString(str(len(content))), 359 ) 360 361 d.addCallback(cbRendered) 362 return d 363 364 def test_staticFileDeletedGetChild(self): 365 """ 366 A L{static.File} created for a directory which does not exist should 367 return childNotFound from L{static.File.getChild}. 368 """ 369 staticFile = static.File(self.mktemp()) 370 request = DummyRequest([b"foo.bar"]) 371 child = staticFile.getChild(b"foo.bar", request) 372 self.assertEqual(child, staticFile.childNotFound) 373 374 def test_staticFileDeletedRender(self): 375 """ 376 A L{static.File} created for a file which does not exist should render 377 its C{childNotFound} page. 378 """ 379 staticFile = static.File(self.mktemp()) 380 request = DummyRequest([b"foo.bar"]) 381 request2 = DummyRequest([b"foo.bar"]) 382 d = self._render(staticFile, request) 383 d2 = self._render(staticFile.childNotFound, request2) 384 385 def cbRendered2(ignored): 386 def cbRendered(ignored): 387 self.assertEqual(b"".join(request.written), b"".join(request2.written)) 388 389 d.addCallback(cbRendered) 390 return d 391 392 d2.addCallback(cbRendered2) 393 return d2 394 395 def test_getChildChildNotFound_customize(self): 396 """ 397 The resource rendered for child not found requests can be customize 398 using a class member. 399 """ 400 base = FilePath(self.mktemp()) 401 base.setContent(b"") 402 markerResponse = b"custom-child-not-found-response" 403 404 class CustomChildNotFoundResource(resource.Resource): 405 def render(self, request): 406 return markerResponse 407 408 class CustomStaticFile(static.File): 409 childNotFound = CustomChildNotFoundResource() 410 411 fileResource = CustomStaticFile(base.path) 412 request = DummyRequest([b"no-child.txt"]) 413 414 child = fileResource.getChild(b"no-child.txt", request) 415 result = child.render(request) 416 417 self.assertEqual(markerResponse, result) 418 419 def test_headRequest(self): 420 """ 421 L{static.File.render} returns an empty response body for I{HEAD} 422 requests. 423 """ 424 path = FilePath(self.mktemp()) 425 path.setContent(b"foo") 426 file = static.File(path.path) 427 request = DummyRequest([b""]) 428 request.method = b"HEAD" 429 d = _render(file, request) 430 431 def cbRendered(ignored): 432 self.assertEqual(b"".join(request.written), b"") 433 434 d.addCallback(cbRendered) 435 return d 436 437 def test_processors(self): 438 """ 439 If a request is made which encounters a L{File} before a final segment 440 which names a file with an extension which is in the L{File}'s 441 C{processors} mapping, the processor associated with that extension is 442 used to serve the response to the request. 443 """ 444 base = FilePath(self.mktemp()) 445 base.makedirs() 446 base.child("foo.bar").setContent( 447 b"from twisted.web.static import Data\n" 448 b"resource = Data(b'dynamic world', 'text/plain')\n" 449 ) 450 451 file = static.File(base.path) 452 file.processors = {".bar": script.ResourceScript} 453 request = DummyRequest([b"foo.bar"]) 454 child = resource.getChildForRequest(file, request) 455 456 d = self._render(child, request) 457 458 def cbRendered(ignored): 459 self.assertEqual(b"".join(request.written), b"dynamic world") 460 self.assertEqual( 461 request.responseHeaders.getRawHeaders(b"content-length")[0], b"13" 462 ) 463 464 d.addCallback(cbRendered) 465 return d 466 467 def test_ignoreExt(self): 468 """ 469 The list of ignored extensions can be set by passing a value to 470 L{File.__init__} or by calling L{File.ignoreExt} later. 471 """ 472 file = static.File(b".") 473 self.assertEqual(file.ignoredExts, []) 474 file.ignoreExt(".foo") 475 file.ignoreExt(".bar") 476 self.assertEqual(file.ignoredExts, [".foo", ".bar"]) 477 478 file = static.File(b".", ignoredExts=(".bar", ".baz")) 479 self.assertEqual(file.ignoredExts, [".bar", ".baz"]) 480 481 def test_ignoredExtensionsIgnored(self): 482 """ 483 A request for the I{base} child of a L{File} succeeds with a resource 484 for the I{base<extension>} file in the path the L{File} was created 485 with if such a file exists and the L{File} has been configured to 486 ignore the I{<extension>} extension. 487 """ 488 base = FilePath(self.mktemp()) 489 base.makedirs() 490 base.child("foo.bar").setContent(b"baz") 491 base.child("foo.quux").setContent(b"foobar") 492 file = static.File(base.path, ignoredExts=(".bar",)) 493 494 request = DummyRequest([b"foo"]) 495 child = resource.getChildForRequest(file, request) 496 497 d = self._render(child, request) 498 499 def cbRendered(ignored): 500 self.assertEqual(b"".join(request.written), b"baz") 501 502 d.addCallback(cbRendered) 503 return d 504 505 def test_directoryWithoutTrailingSlashRedirects(self): 506 """ 507 A request for a path which is a directory but does not have a trailing 508 slash will be redirected to a URL which does have a slash by L{File}. 509 """ 510 base = FilePath(self.mktemp()) 511 base.makedirs() 512 base.child("folder").makedirs() 513 file = static.File(base.path) 514 515 request = DummyRequest([b"folder"]) 516 request.uri = b"http://dummy/folder#baz?foo=bar" 517 child = resource.getChildForRequest(file, request) 518 519 self.successResultOf(self._render(child, request)) 520 self.assertEqual(request.responseCode, FOUND) 521 self.assertEqual( 522 request.responseHeaders.getRawHeaders(b"location"), 523 [b"http://dummy/folder/#baz?foo=bar"], 524 ) 525 526 def _makeFilePathWithStringIO(self): 527 """ 528 Create a L{File} that when opened for reading, returns a L{StringIO}. 529 530 @return: 2-tuple of the opened "file" and the L{File}. 531 @rtype: L{tuple} 532 """ 533 fakeFile = StringIO() 534 path = FilePath(self.mktemp()) 535 path.touch() 536 file = static.File(path.path) 537 # Open our file instead of a real one 538 file.open = lambda: fakeFile 539 return fakeFile, file 540 541 def test_HEADClosesFile(self): 542 """ 543 A HEAD request opens the file, gets the size, and then closes it after 544 the request. 545 """ 546 fakeFile, file = self._makeFilePathWithStringIO() 547 request = DummyRequest([""]) 548 request.method = b"HEAD" 549 self.successResultOf(_render(file, request)) 550 self.assertEqual(b"".join(request.written), b"") 551 self.assertTrue(fakeFile.closed) 552 553 def test_cachedRequestClosesFile(self): 554 """ 555 A GET request that is cached closes the file after the request. 556 """ 557 fakeFile, file = self._makeFilePathWithStringIO() 558 request = DummyRequest([""]) 559 request.method = b"GET" 560 # This request will always return saying that it is cached 561 request.setLastModified = lambda _: http.CACHED 562 self.successResultOf(_render(file, request)) 563 self.assertEqual(b"".join(request.written), b"") 564 self.assertTrue(fakeFile.closed) 565 566 567class StaticMakeProducerTests(TestCase): 568 """ 569 Tests for L{File.makeProducer}. 570 """ 571 572 def makeResourceWithContent(self, content, type=None, encoding=None): 573 """ 574 Make a L{static.File} resource that has C{content} for its content. 575 576 @param content: The L{bytes} to use as the contents of the resource. 577 @param type: Optional value for the content type of the resource. 578 """ 579 fileName = FilePath(self.mktemp()) 580 fileName.setContent(content) 581 resource = static.File(fileName._asBytesPath()) 582 resource.encoding = encoding 583 resource.type = type 584 return resource 585 586 def contentHeaders(self, request): 587 """ 588 Extract the content-* headers from the L{DummyRequest} C{request}. 589 590 This returns the subset of C{request.outgoingHeaders} of headers that 591 start with 'content-'. 592 """ 593 contentHeaders = {} 594 for k, v in request.responseHeaders.getAllRawHeaders(): 595 if k.lower().startswith(b"content-"): 596 contentHeaders[k.lower()] = v[0] 597 return contentHeaders 598 599 def test_noRangeHeaderGivesNoRangeStaticProducer(self): 600 """ 601 makeProducer when no Range header is set returns an instance of 602 NoRangeStaticProducer. 603 """ 604 resource = self.makeResourceWithContent(b"") 605 request = DummyRequest([]) 606 with resource.openForReading() as file: 607 producer = resource.makeProducer(request, file) 608 self.assertIsInstance(producer, static.NoRangeStaticProducer) 609 610 def test_noRangeHeaderSets200OK(self): 611 """ 612 makeProducer when no Range header is set sets the responseCode on the 613 request to 'OK'. 614 """ 615 resource = self.makeResourceWithContent(b"") 616 request = DummyRequest([]) 617 with resource.openForReading() as file: 618 resource.makeProducer(request, file) 619 self.assertEqual(http.OK, request.responseCode) 620 621 def test_noRangeHeaderSetsContentHeaders(self): 622 """ 623 makeProducer when no Range header is set sets the Content-* headers 624 for the response. 625 """ 626 length = 123 627 contentType = "text/plain" 628 contentEncoding = "gzip" 629 resource = self.makeResourceWithContent( 630 b"a" * length, type=contentType, encoding=contentEncoding 631 ) 632 request = DummyRequest([]) 633 with resource.openForReading() as file: 634 resource.makeProducer(request, file) 635 self.assertEqual( 636 { 637 b"content-type": networkString(contentType), 638 b"content-length": b"%d" % (length,), 639 b"content-encoding": networkString(contentEncoding), 640 }, 641 self.contentHeaders(request), 642 ) 643 644 def test_singleRangeGivesSingleRangeStaticProducer(self): 645 """ 646 makeProducer when the Range header requests a single byte range 647 returns an instance of SingleRangeStaticProducer. 648 """ 649 request = DummyRequest([]) 650 request.requestHeaders.addRawHeader(b"range", b"bytes=1-3") 651 resource = self.makeResourceWithContent(b"abcdef") 652 with resource.openForReading() as file: 653 producer = resource.makeProducer(request, file) 654 self.assertIsInstance(producer, static.SingleRangeStaticProducer) 655 656 def test_singleRangeSets206PartialContent(self): 657 """ 658 makeProducer when the Range header requests a single, satisfiable byte 659 range sets the response code on the request to 'Partial Content'. 660 """ 661 request = DummyRequest([]) 662 request.requestHeaders.addRawHeader(b"range", b"bytes=1-3") 663 resource = self.makeResourceWithContent(b"abcdef") 664 with resource.openForReading() as file: 665 resource.makeProducer(request, file) 666 self.assertEqual(http.PARTIAL_CONTENT, request.responseCode) 667 668 def test_singleRangeSetsContentHeaders(self): 669 """ 670 makeProducer when the Range header requests a single, satisfiable byte 671 range sets the Content-* headers appropriately. 672 """ 673 request = DummyRequest([]) 674 request.requestHeaders.addRawHeader(b"range", b"bytes=1-3") 675 contentType = "text/plain" 676 contentEncoding = "gzip" 677 resource = self.makeResourceWithContent( 678 b"abcdef", type=contentType, encoding=contentEncoding 679 ) 680 with resource.openForReading() as file: 681 resource.makeProducer(request, file) 682 self.assertEqual( 683 { 684 b"content-type": networkString(contentType), 685 b"content-encoding": networkString(contentEncoding), 686 b"content-range": b"bytes 1-3/6", 687 b"content-length": b"3", 688 }, 689 self.contentHeaders(request), 690 ) 691 692 def test_singleUnsatisfiableRangeReturnsSingleRangeStaticProducer(self): 693 """ 694 makeProducer still returns an instance of L{SingleRangeStaticProducer} 695 when the Range header requests a single unsatisfiable byte range. 696 """ 697 request = DummyRequest([]) 698 request.requestHeaders.addRawHeader(b"range", b"bytes=4-10") 699 resource = self.makeResourceWithContent(b"abc") 700 with resource.openForReading() as file: 701 producer = resource.makeProducer(request, file) 702 self.assertIsInstance(producer, static.SingleRangeStaticProducer) 703 704 def test_singleUnsatisfiableRangeSets416ReqestedRangeNotSatisfiable(self): 705 """ 706 makeProducer sets the response code of the request to of 'Requested 707 Range Not Satisfiable' when the Range header requests a single 708 unsatisfiable byte range. 709 """ 710 request = DummyRequest([]) 711 request.requestHeaders.addRawHeader(b"range", b"bytes=4-10") 712 resource = self.makeResourceWithContent(b"abc") 713 with resource.openForReading() as file: 714 resource.makeProducer(request, file) 715 self.assertEqual(http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode) 716 717 def test_singleUnsatisfiableRangeSetsContentHeaders(self): 718 """ 719 makeProducer when the Range header requests a single, unsatisfiable 720 byte range sets the Content-* headers appropriately. 721 """ 722 request = DummyRequest([]) 723 request.requestHeaders.addRawHeader(b"range", b"bytes=4-10") 724 contentType = "text/plain" 725 resource = self.makeResourceWithContent(b"abc", type=contentType) 726 with resource.openForReading() as file: 727 resource.makeProducer(request, file) 728 self.assertEqual( 729 { 730 b"content-type": b"text/plain", 731 b"content-length": b"0", 732 b"content-range": b"bytes */3", 733 }, 734 self.contentHeaders(request), 735 ) 736 737 def test_singlePartiallyOverlappingRangeSetsContentHeaders(self): 738 """ 739 makeProducer when the Range header requests a single byte range that 740 partly overlaps the resource sets the Content-* headers appropriately. 741 """ 742 request = DummyRequest([]) 743 request.requestHeaders.addRawHeader(b"range", b"bytes=2-10") 744 contentType = "text/plain" 745 resource = self.makeResourceWithContent(b"abc", type=contentType) 746 with resource.openForReading() as file: 747 resource.makeProducer(request, file) 748 self.assertEqual( 749 { 750 b"content-type": b"text/plain", 751 b"content-length": b"1", 752 b"content-range": b"bytes 2-2/3", 753 }, 754 self.contentHeaders(request), 755 ) 756 757 def test_multipleRangeGivesMultipleRangeStaticProducer(self): 758 """ 759 makeProducer when the Range header requests a single byte range 760 returns an instance of MultipleRangeStaticProducer. 761 """ 762 request = DummyRequest([]) 763 request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,5-6") 764 resource = self.makeResourceWithContent(b"abcdef") 765 with resource.openForReading() as file: 766 producer = resource.makeProducer(request, file) 767 self.assertIsInstance(producer, static.MultipleRangeStaticProducer) 768 769 def test_multipleRangeSets206PartialContent(self): 770 """ 771 makeProducer when the Range header requests a multiple satisfiable 772 byte ranges sets the response code on the request to 'Partial 773 Content'. 774 """ 775 request = DummyRequest([]) 776 request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,5-6") 777 resource = self.makeResourceWithContent(b"abcdef") 778 with resource.openForReading() as file: 779 resource.makeProducer(request, file) 780 self.assertEqual(http.PARTIAL_CONTENT, request.responseCode) 781 782 def test_mutipleRangeSetsContentHeaders(self): 783 """ 784 makeProducer when the Range header requests a single, satisfiable byte 785 range sets the Content-* headers appropriately. 786 """ 787 request = DummyRequest([]) 788 request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,5-6") 789 resource = self.makeResourceWithContent(b"abcdefghijkl", encoding="gzip") 790 with resource.openForReading() as file: 791 producer = resource.makeProducer(request, file) 792 contentHeaders = self.contentHeaders(request) 793 # The only content-* headers set are content-type and content-length. 794 self.assertEqual( 795 {b"content-length", b"content-type"}, set(contentHeaders.keys()) 796 ) 797 # The content-length depends on the boundary used in the response. 798 expectedLength = 5 799 for boundary, offset, size in producer.rangeInfo: 800 expectedLength += len(boundary) 801 self.assertEqual( 802 b"%d" % (expectedLength,), contentHeaders[b"content-length"] 803 ) 804 # Content-type should be set to a value indicating a multipart 805 # response and the boundary used to separate the parts. 806 self.assertIn(b"content-type", contentHeaders) 807 contentType = contentHeaders[b"content-type"] 808 self.assertNotIdentical( 809 None, 810 re.match(br'multipart/byteranges; boundary="[^"]*"\Z', contentType), 811 ) 812 # Content-encoding is not set in the response to a multiple range 813 # response, which is a bit wussy but works well enough with the way 814 # static.File does content-encodings... 815 self.assertNotIn(b"content-encoding", contentHeaders) 816 817 def test_multipleUnsatisfiableRangesReturnsMultipleRangeStaticProducer(self): 818 """ 819 makeProducer still returns an instance of L{SingleRangeStaticProducer} 820 when the Range header requests multiple ranges, none of which are 821 satisfiable. 822 """ 823 request = DummyRequest([]) 824 request.requestHeaders.addRawHeader(b"range", b"bytes=10-12,15-20") 825 resource = self.makeResourceWithContent(b"abc") 826 with resource.openForReading() as file: 827 producer = resource.makeProducer(request, file) 828 self.assertIsInstance(producer, static.MultipleRangeStaticProducer) 829 830 def test_multipleUnsatisfiableRangesSets416ReqestedRangeNotSatisfiable(self): 831 """ 832 makeProducer sets the response code of the request to of 'Requested 833 Range Not Satisfiable' when the Range header requests multiple ranges, 834 none of which are satisfiable. 835 """ 836 request = DummyRequest([]) 837 request.requestHeaders.addRawHeader(b"range", b"bytes=10-12,15-20") 838 resource = self.makeResourceWithContent(b"abc") 839 with resource.openForReading() as file: 840 resource.makeProducer(request, file) 841 self.assertEqual(http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode) 842 843 def test_multipleUnsatisfiableRangeSetsContentHeaders(self): 844 """ 845 makeProducer when the Range header requests multiple ranges, none of 846 which are satisfiable, sets the Content-* headers appropriately. 847 """ 848 request = DummyRequest([]) 849 request.requestHeaders.addRawHeader(b"range", b"bytes=4-10") 850 contentType = "text/plain" 851 request.requestHeaders.addRawHeader(b"range", b"bytes=10-12,15-20") 852 resource = self.makeResourceWithContent(b"abc", type=contentType) 853 with resource.openForReading() as file: 854 resource.makeProducer(request, file) 855 self.assertEqual( 856 { 857 b"content-length": b"0", 858 b"content-range": b"bytes */3", 859 b"content-type": b"text/plain", 860 }, 861 self.contentHeaders(request), 862 ) 863 864 def test_oneSatisfiableRangeIsEnough(self): 865 """ 866 makeProducer when the Range header requests multiple ranges, at least 867 one of which matches, sets the response code to 'Partial Content'. 868 """ 869 request = DummyRequest([]) 870 request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,100-200") 871 resource = self.makeResourceWithContent(b"abcdef") 872 with resource.openForReading() as file: 873 resource.makeProducer(request, file) 874 self.assertEqual(http.PARTIAL_CONTENT, request.responseCode) 875 876 877class StaticProducerTests(TestCase): 878 """ 879 Tests for the abstract L{StaticProducer}. 880 """ 881 882 def test_stopProducingClosesFile(self): 883 """ 884 L{StaticProducer.stopProducing} closes the file object the producer is 885 producing data from. 886 """ 887 fileObject = StringIO() 888 producer = static.StaticProducer(None, fileObject) 889 producer.stopProducing() 890 self.assertTrue(fileObject.closed) 891 892 def test_stopProducingSetsRequestToNone(self): 893 """ 894 L{StaticProducer.stopProducing} sets the request instance variable to 895 None, which indicates to subclasses' resumeProducing methods that no 896 more data should be produced. 897 """ 898 fileObject = StringIO() 899 producer = static.StaticProducer(DummyRequest([]), fileObject) 900 producer.stopProducing() 901 self.assertIdentical(None, producer.request) 902 903 904class NoRangeStaticProducerTests(TestCase): 905 """ 906 Tests for L{NoRangeStaticProducer}. 907 """ 908 909 def test_implementsIPullProducer(self): 910 """ 911 L{NoRangeStaticProducer} implements L{IPullProducer}. 912 """ 913 verifyObject(interfaces.IPullProducer, static.NoRangeStaticProducer(None, None)) 914 915 def test_resumeProducingProducesContent(self): 916 """ 917 L{NoRangeStaticProducer.resumeProducing} writes content from the 918 resource to the request. 919 """ 920 request = DummyRequest([]) 921 content = b"abcdef" 922 producer = static.NoRangeStaticProducer(request, StringIO(content)) 923 # start calls registerProducer on the DummyRequest, which pulls all 924 # output from the producer and so we just need this one call. 925 producer.start() 926 self.assertEqual(content, b"".join(request.written)) 927 928 def test_resumeProducingBuffersOutput(self): 929 """ 930 L{NoRangeStaticProducer.start} writes at most 931 C{abstract.FileDescriptor.bufferSize} bytes of content from the 932 resource to the request at once. 933 """ 934 request = DummyRequest([]) 935 bufferSize = abstract.FileDescriptor.bufferSize 936 content = b"a" * (2 * bufferSize + 1) 937 producer = static.NoRangeStaticProducer(request, StringIO(content)) 938 # start calls registerProducer on the DummyRequest, which pulls all 939 # output from the producer and so we just need this one call. 940 producer.start() 941 expected = [ 942 content[0:bufferSize], 943 content[bufferSize : 2 * bufferSize], 944 content[2 * bufferSize :], 945 ] 946 self.assertEqual(expected, request.written) 947 948 def test_finishCalledWhenDone(self): 949 """ 950 L{NoRangeStaticProducer.resumeProducing} calls finish() on the request 951 after it is done producing content. 952 """ 953 request = DummyRequest([]) 954 finishDeferred = request.notifyFinish() 955 callbackList = [] 956 finishDeferred.addCallback(callbackList.append) 957 producer = static.NoRangeStaticProducer(request, StringIO(b"abcdef")) 958 # start calls registerProducer on the DummyRequest, which pulls all 959 # output from the producer and so we just need this one call. 960 producer.start() 961 self.assertEqual([None], callbackList) 962 963 964class SingleRangeStaticProducerTests(TestCase): 965 """ 966 Tests for L{SingleRangeStaticProducer}. 967 """ 968 969 def test_implementsIPullProducer(self): 970 """ 971 L{SingleRangeStaticProducer} implements L{IPullProducer}. 972 """ 973 verifyObject( 974 interfaces.IPullProducer, 975 static.SingleRangeStaticProducer(None, None, None, None), 976 ) 977 978 def test_resumeProducingProducesContent(self): 979 """ 980 L{SingleRangeStaticProducer.resumeProducing} writes the given amount 981 of content, starting at the given offset, from the resource to the 982 request. 983 """ 984 request = DummyRequest([]) 985 content = b"abcdef" 986 producer = static.SingleRangeStaticProducer(request, StringIO(content), 1, 3) 987 # DummyRequest.registerProducer pulls all output from the producer, so 988 # we just need to call start. 989 producer.start() 990 self.assertEqual(content[1:4], b"".join(request.written)) 991 992 def test_resumeProducingBuffersOutput(self): 993 """ 994 L{SingleRangeStaticProducer.start} writes at most 995 C{abstract.FileDescriptor.bufferSize} bytes of content from the 996 resource to the request at once. 997 """ 998 request = DummyRequest([]) 999 bufferSize = abstract.FileDescriptor.bufferSize 1000 content = b"abc" * bufferSize 1001 producer = static.SingleRangeStaticProducer( 1002 request, StringIO(content), 1, bufferSize + 10 1003 ) 1004 # DummyRequest.registerProducer pulls all output from the producer, so 1005 # we just need to call start. 1006 producer.start() 1007 expected = [ 1008 content[1 : bufferSize + 1], 1009 content[bufferSize + 1 : bufferSize + 11], 1010 ] 1011 self.assertEqual(expected, request.written) 1012 1013 def test_finishCalledWhenDone(self): 1014 """ 1015 L{SingleRangeStaticProducer.resumeProducing} calls finish() on the 1016 request after it is done producing content. 1017 """ 1018 request = DummyRequest([]) 1019 finishDeferred = request.notifyFinish() 1020 callbackList = [] 1021 finishDeferred.addCallback(callbackList.append) 1022 producer = static.SingleRangeStaticProducer(request, StringIO(b"abcdef"), 1, 1) 1023 # start calls registerProducer on the DummyRequest, which pulls all 1024 # output from the producer and so we just need this one call. 1025 producer.start() 1026 self.assertEqual([None], callbackList) 1027 1028 1029class MultipleRangeStaticProducerTests(TestCase): 1030 """ 1031 Tests for L{MultipleRangeStaticProducer}. 1032 """ 1033 1034 def test_implementsIPullProducer(self): 1035 """ 1036 L{MultipleRangeStaticProducer} implements L{IPullProducer}. 1037 """ 1038 verifyObject( 1039 interfaces.IPullProducer, 1040 static.MultipleRangeStaticProducer(None, None, None), 1041 ) 1042 1043 def test_resumeProducingProducesContent(self): 1044 """ 1045 L{MultipleRangeStaticProducer.resumeProducing} writes the requested 1046 chunks of content from the resource to the request, with the supplied 1047 boundaries in between each chunk. 1048 """ 1049 request = DummyRequest([]) 1050 content = b"abcdef" 1051 producer = static.MultipleRangeStaticProducer( 1052 request, StringIO(content), [(b"1", 1, 3), (b"2", 5, 1)] 1053 ) 1054 # DummyRequest.registerProducer pulls all output from the producer, so 1055 # we just need to call start. 1056 producer.start() 1057 self.assertEqual(b"1bcd2f", b"".join(request.written)) 1058 1059 def test_resumeProducingBuffersOutput(self): 1060 """ 1061 L{MultipleRangeStaticProducer.start} writes about 1062 C{abstract.FileDescriptor.bufferSize} bytes of content from the 1063 resource to the request at once. 1064 1065 To be specific about the 'about' above: it can write slightly more, 1066 for example in the case where the first boundary plus the first chunk 1067 is less than C{bufferSize} but first boundary plus the first chunk 1068 plus the second boundary is more, but this is unimportant as in 1069 practice the boundaries are fairly small. On the other side, it is 1070 important for performance to bundle up several small chunks into one 1071 call to request.write. 1072 """ 1073 request = DummyRequest([]) 1074 content = b"0123456789" * 2 1075 producer = static.MultipleRangeStaticProducer( 1076 request, StringIO(content), [(b"a", 0, 2), (b"b", 5, 10), (b"c", 0, 0)] 1077 ) 1078 producer.bufferSize = 10 1079 # DummyRequest.registerProducer pulls all output from the producer, so 1080 # we just need to call start. 1081 producer.start() 1082 expected = [ 1083 b"a" + content[0:2] + b"b" + content[5:11], 1084 content[11:15] + b"c", 1085 ] 1086 self.assertEqual(expected, request.written) 1087 1088 def test_finishCalledWhenDone(self): 1089 """ 1090 L{MultipleRangeStaticProducer.resumeProducing} calls finish() on the 1091 request after it is done producing content. 1092 """ 1093 request = DummyRequest([]) 1094 finishDeferred = request.notifyFinish() 1095 callbackList = [] 1096 finishDeferred.addCallback(callbackList.append) 1097 producer = static.MultipleRangeStaticProducer( 1098 request, StringIO(b"abcdef"), [(b"", 1, 2)] 1099 ) 1100 # start calls registerProducer on the DummyRequest, which pulls all 1101 # output from the producer and so we just need this one call. 1102 producer.start() 1103 self.assertEqual([None], callbackList) 1104 1105 1106class RangeTests(TestCase): 1107 """ 1108 Tests for I{Range-Header} support in L{twisted.web.static.File}. 1109 1110 @type file: L{file} 1111 @ivar file: Temporary (binary) file containing the content to be served. 1112 1113 @type resource: L{static.File} 1114 @ivar resource: A leaf web resource using C{file} as content. 1115 1116 @type request: L{DummyRequest} 1117 @ivar request: A fake request, requesting C{resource}. 1118 1119 @type catcher: L{list} 1120 @ivar catcher: List which gathers all log information. 1121 """ 1122 1123 def setUp(self): 1124 """ 1125 Create a temporary file with a fixed payload of 64 bytes. Create a 1126 resource for that file and create a request which will be for that 1127 resource. Each test can set a different range header to test different 1128 aspects of the implementation. 1129 """ 1130 path = FilePath(self.mktemp()) 1131 # This is just a jumble of random stuff. It's supposed to be a good 1132 # set of data for this test, particularly in order to avoid 1133 # accidentally seeing the right result by having a byte sequence 1134 # repeated at different locations or by having byte values which are 1135 # somehow correlated with their position in the string. 1136 self.payload = ( 1137 b"\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7" 1138 b"\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0" 1139 b"\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d" 1140 b"&\xfd%\xdd\x82q/A\x10Y\x8b" 1141 ) 1142 path.setContent(self.payload) 1143 self.file = path.open() 1144 self.resource = static.File(self.file.name) 1145 self.resource.isLeaf = 1 1146 self.request = DummyRequest([b""]) 1147 self.request.uri = self.file.name 1148 self.catcher = [] 1149 log.addObserver(self.catcher.append) 1150 1151 def tearDown(self): 1152 """ 1153 Clean up the resource file and the log observer. 1154 """ 1155 self.file.close() 1156 log.removeObserver(self.catcher.append) 1157 1158 def _assertLogged(self, expected): 1159 """ 1160 Asserts that a given log message occurred with an expected message. 1161 """ 1162 logItem = self.catcher.pop() 1163 self.assertEqual(logItem["message"][0], expected) 1164 self.assertEqual(self.catcher, [], f"An additional log occurred: {logItem!r}") 1165 1166 def test_invalidRanges(self): 1167 """ 1168 L{File._parseRangeHeader} raises L{ValueError} when passed 1169 syntactically invalid byte ranges. 1170 """ 1171 f = self.resource._parseRangeHeader 1172 1173 # there's no = 1174 self.assertRaises(ValueError, f, b"bytes") 1175 1176 # unknown isn't a valid Bytes-Unit 1177 self.assertRaises(ValueError, f, b"unknown=1-2") 1178 1179 # there's no - in =stuff 1180 self.assertRaises(ValueError, f, b"bytes=3") 1181 1182 # both start and end are empty 1183 self.assertRaises(ValueError, f, b"bytes=-") 1184 1185 # start isn't an integer 1186 self.assertRaises(ValueError, f, b"bytes=foo-") 1187 1188 # end isn't an integer 1189 self.assertRaises(ValueError, f, b"bytes=-foo") 1190 1191 # end isn't equal to or greater than start 1192 self.assertRaises(ValueError, f, b"bytes=5-4") 1193 1194 def test_rangeMissingStop(self): 1195 """ 1196 A single bytes range without an explicit stop position is parsed into a 1197 two-tuple giving the start position and L{None}. 1198 """ 1199 self.assertEqual(self.resource._parseRangeHeader(b"bytes=0-"), [(0, None)]) 1200 1201 def test_rangeMissingStart(self): 1202 """ 1203 A single bytes range without an explicit start position is parsed into 1204 a two-tuple of L{None} and the end position. 1205 """ 1206 self.assertEqual(self.resource._parseRangeHeader(b"bytes=-3"), [(None, 3)]) 1207 1208 def test_range(self): 1209 """ 1210 A single bytes range with explicit start and stop positions is parsed 1211 into a two-tuple of those positions. 1212 """ 1213 self.assertEqual(self.resource._parseRangeHeader(b"bytes=2-5"), [(2, 5)]) 1214 1215 def test_rangeWithSpace(self): 1216 """ 1217 A single bytes range with whitespace in allowed places is parsed in 1218 the same way as it would be without the whitespace. 1219 """ 1220 self.assertEqual(self.resource._parseRangeHeader(b" bytes=1-2 "), [(1, 2)]) 1221 self.assertEqual(self.resource._parseRangeHeader(b"bytes =1-2 "), [(1, 2)]) 1222 self.assertEqual(self.resource._parseRangeHeader(b"bytes= 1-2"), [(1, 2)]) 1223 self.assertEqual(self.resource._parseRangeHeader(b"bytes=1 -2"), [(1, 2)]) 1224 self.assertEqual(self.resource._parseRangeHeader(b"bytes=1- 2"), [(1, 2)]) 1225 self.assertEqual(self.resource._parseRangeHeader(b"bytes=1-2 "), [(1, 2)]) 1226 1227 def test_nullRangeElements(self): 1228 """ 1229 If there are multiple byte ranges but only one is non-null, the 1230 non-null range is parsed and its start and stop returned. 1231 """ 1232 self.assertEqual( 1233 self.resource._parseRangeHeader(b"bytes=1-2,\r\n, ,\t"), [(1, 2)] 1234 ) 1235 1236 def test_multipleRanges(self): 1237 """ 1238 If multiple byte ranges are specified their starts and stops are 1239 returned. 1240 """ 1241 self.assertEqual( 1242 self.resource._parseRangeHeader(b"bytes=1-2,3-4"), [(1, 2), (3, 4)] 1243 ) 1244 1245 def test_bodyLength(self): 1246 """ 1247 A correct response to a range request is as long as the length of the 1248 requested range. 1249 """ 1250 self.request.requestHeaders.addRawHeader(b"range", b"bytes=0-43") 1251 self.resource.render(self.request) 1252 self.assertEqual(len(b"".join(self.request.written)), 44) 1253 1254 def test_invalidRangeRequest(self): 1255 """ 1256 An incorrect range request (RFC 2616 defines a correct range request as 1257 a Bytes-Unit followed by a '=' character followed by a specific range. 1258 Only 'bytes' is defined) results in the range header value being logged 1259 and a normal 200 response being sent. 1260 """ 1261 range = b"foobar=0-43" 1262 self.request.requestHeaders.addRawHeader(b"range", range) 1263 self.resource.render(self.request) 1264 expected = f"Ignoring malformed Range header {range.decode()!r}" 1265 self._assertLogged(expected) 1266 self.assertEqual(b"".join(self.request.written), self.payload) 1267 self.assertEqual(self.request.responseCode, http.OK) 1268 self.assertEqual( 1269 self.request.responseHeaders.getRawHeaders(b"content-length")[0], 1270 b"%d" % (len(self.payload),), 1271 ) 1272 1273 def parseMultipartBody(self, body, boundary): 1274 """ 1275 Parse C{body} as a multipart MIME response separated by C{boundary}. 1276 1277 Note that this with fail the calling test on certain syntactic 1278 problems. 1279 """ 1280 sep = b"\r\n--" + boundary 1281 parts = body.split(sep) 1282 self.assertEqual(b"", parts[0]) 1283 self.assertEqual(b"--\r\n", parts[-1]) 1284 parsed_parts = [] 1285 for part in parts[1:-1]: 1286 before, header1, header2, blank, partBody = part.split(b"\r\n", 4) 1287 headers = header1 + b"\n" + header2 1288 self.assertEqual(b"", before) 1289 self.assertEqual(b"", blank) 1290 partContentTypeValue = re.search( 1291 b"^content-type: (.*)$", headers, re.I | re.M 1292 ).group(1) 1293 start, end, size = re.search( 1294 b"^content-range: bytes ([0-9]+)-([0-9]+)/([0-9]+)$", 1295 headers, 1296 re.I | re.M, 1297 ).groups() 1298 parsed_parts.append( 1299 { 1300 b"contentType": partContentTypeValue, 1301 b"contentRange": (start, end, size), 1302 b"body": partBody, 1303 } 1304 ) 1305 return parsed_parts 1306 1307 def test_multipleRangeRequest(self): 1308 """ 1309 The response to a request for multiple bytes ranges is a MIME-ish 1310 multipart response. 1311 """ 1312 startEnds = [(0, 2), (20, 30), (40, 50)] 1313 rangeHeaderValue = b",".join( 1314 [networkString(f"{s}-{e}") for (s, e) in startEnds] 1315 ) 1316 self.request.requestHeaders.addRawHeader(b"range", b"bytes=" + rangeHeaderValue) 1317 self.resource.render(self.request) 1318 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT) 1319 boundary = re.match( 1320 b'^multipart/byteranges; boundary="(.*)"$', 1321 self.request.responseHeaders.getRawHeaders(b"content-type")[0], 1322 ).group(1) 1323 parts = self.parseMultipartBody(b"".join(self.request.written), boundary) 1324 self.assertEqual(len(startEnds), len(parts)) 1325 for part, (s, e) in zip(parts, startEnds): 1326 self.assertEqual(networkString(self.resource.type), part[b"contentType"]) 1327 start, end, size = part[b"contentRange"] 1328 self.assertEqual(int(start), s) 1329 self.assertEqual(int(end), e) 1330 self.assertEqual(int(size), self.resource.getFileSize()) 1331 self.assertEqual(self.payload[s : e + 1], part[b"body"]) 1332 1333 def test_multipleRangeRequestWithRangeOverlappingEnd(self): 1334 """ 1335 The response to a request for multiple bytes ranges is a MIME-ish 1336 multipart response, even when one of the ranged falls off the end of 1337 the resource. 1338 """ 1339 startEnds = [(0, 2), (40, len(self.payload) + 10)] 1340 rangeHeaderValue = b",".join( 1341 [networkString(f"{s}-{e}") for (s, e) in startEnds] 1342 ) 1343 self.request.requestHeaders.addRawHeader(b"range", b"bytes=" + rangeHeaderValue) 1344 self.resource.render(self.request) 1345 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT) 1346 boundary = re.match( 1347 b'^multipart/byteranges; boundary="(.*)"$', 1348 self.request.responseHeaders.getRawHeaders(b"content-type")[0], 1349 ).group(1) 1350 parts = self.parseMultipartBody(b"".join(self.request.written), boundary) 1351 self.assertEqual(len(startEnds), len(parts)) 1352 for part, (s, e) in zip(parts, startEnds): 1353 self.assertEqual(networkString(self.resource.type), part[b"contentType"]) 1354 start, end, size = part[b"contentRange"] 1355 self.assertEqual(int(start), s) 1356 self.assertEqual(int(end), min(e, self.resource.getFileSize() - 1)) 1357 self.assertEqual(int(size), self.resource.getFileSize()) 1358 self.assertEqual(self.payload[s : e + 1], part[b"body"]) 1359 1360 def test_implicitEnd(self): 1361 """ 1362 If the end byte position is omitted, then it is treated as if the 1363 length of the resource was specified by the end byte position. 1364 """ 1365 self.request.requestHeaders.addRawHeader(b"range", b"bytes=23-") 1366 self.resource.render(self.request) 1367 self.assertEqual(b"".join(self.request.written), self.payload[23:]) 1368 self.assertEqual(len(b"".join(self.request.written)), 41) 1369 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT) 1370 self.assertEqual( 1371 self.request.responseHeaders.getRawHeaders(b"content-range")[0], 1372 b"bytes 23-63/64", 1373 ) 1374 self.assertEqual( 1375 self.request.responseHeaders.getRawHeaders(b"content-length")[0], b"41" 1376 ) 1377 1378 def test_implicitStart(self): 1379 """ 1380 If the start byte position is omitted but the end byte position is 1381 supplied, then the range is treated as requesting the last -N bytes of 1382 the resource, where N is the end byte position. 1383 """ 1384 self.request.requestHeaders.addRawHeader(b"range", b"bytes=-17") 1385 self.resource.render(self.request) 1386 self.assertEqual(b"".join(self.request.written), self.payload[-17:]) 1387 self.assertEqual(len(b"".join(self.request.written)), 17) 1388 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT) 1389 self.assertEqual( 1390 self.request.responseHeaders.getRawHeaders(b"content-range")[0], 1391 b"bytes 47-63/64", 1392 ) 1393 self.assertEqual( 1394 self.request.responseHeaders.getRawHeaders(b"content-length")[0], b"17" 1395 ) 1396 1397 def test_explicitRange(self): 1398 """ 1399 A correct response to a bytes range header request from A to B starts 1400 with the A'th byte and ends with (including) the B'th byte. The first 1401 byte of a page is numbered with 0. 1402 """ 1403 self.request.requestHeaders.addRawHeader(b"range", b"bytes=3-43") 1404 self.resource.render(self.request) 1405 written = b"".join(self.request.written) 1406 self.assertEqual(written, self.payload[3:44]) 1407 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT) 1408 self.assertEqual( 1409 self.request.responseHeaders.getRawHeaders(b"content-range")[0], 1410 b"bytes 3-43/64", 1411 ) 1412 self.assertEqual( 1413 b"%d" % (len(written),), 1414 self.request.responseHeaders.getRawHeaders(b"content-length")[0], 1415 ) 1416 1417 def test_explicitRangeOverlappingEnd(self): 1418 """ 1419 A correct response to a bytes range header request from A to B when B 1420 is past the end of the resource starts with the A'th byte and ends 1421 with the last byte of the resource. The first byte of a page is 1422 numbered with 0. 1423 """ 1424 self.request.requestHeaders.addRawHeader(b"range", b"bytes=40-100") 1425 self.resource.render(self.request) 1426 written = b"".join(self.request.written) 1427 self.assertEqual(written, self.payload[40:]) 1428 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT) 1429 self.assertEqual( 1430 self.request.responseHeaders.getRawHeaders(b"content-range")[0], 1431 b"bytes 40-63/64", 1432 ) 1433 self.assertEqual( 1434 b"%d" % (len(written),), 1435 self.request.responseHeaders.getRawHeaders(b"content-length")[0], 1436 ) 1437 1438 def test_statusCodeRequestedRangeNotSatisfiable(self): 1439 """ 1440 If a range is syntactically invalid due to the start being greater than 1441 the end, the range header is ignored (the request is responded to as if 1442 it were not present). 1443 """ 1444 self.request.requestHeaders.addRawHeader(b"range", b"bytes=20-13") 1445 self.resource.render(self.request) 1446 self.assertEqual(self.request.responseCode, http.OK) 1447 self.assertEqual(b"".join(self.request.written), self.payload) 1448 self.assertEqual( 1449 self.request.responseHeaders.getRawHeaders(b"content-length")[0], 1450 b"%d" % (len(self.payload),), 1451 ) 1452 1453 def test_invalidStartBytePos(self): 1454 """ 1455 If a range is unsatisfiable due to the start not being less than the 1456 length of the resource, the response is 416 (Requested range not 1457 satisfiable) and no data is written to the response body (RFC 2616, 1458 section 14.35.1). 1459 """ 1460 self.request.requestHeaders.addRawHeader(b"range", b"bytes=67-108") 1461 self.resource.render(self.request) 1462 self.assertEqual( 1463 self.request.responseCode, http.REQUESTED_RANGE_NOT_SATISFIABLE 1464 ) 1465 self.assertEqual(b"".join(self.request.written), b"") 1466 self.assertEqual( 1467 self.request.responseHeaders.getRawHeaders(b"content-length")[0], b"0" 1468 ) 1469 # Sections 10.4.17 and 14.16 1470 self.assertEqual( 1471 self.request.responseHeaders.getRawHeaders(b"content-range")[0], 1472 networkString("bytes */%d" % (len(self.payload),)), 1473 ) 1474 1475 1476class DirectoryListerTests(TestCase): 1477 """ 1478 Tests for L{static.DirectoryLister}. 1479 """ 1480 1481 def _request(self, uri): 1482 request = DummyRequest([b""]) 1483 request.uri = uri 1484 return request 1485 1486 def test_renderHeader(self): 1487 """ 1488 L{static.DirectoryLister} prints the request uri as header of the 1489 rendered content. 1490 """ 1491 path = FilePath(self.mktemp()) 1492 path.makedirs() 1493 1494 lister = static.DirectoryLister(path.path) 1495 data = lister.render(self._request(b"foo")) 1496 self.assertIn(b"<h1>Directory listing for foo</h1>", data) 1497 self.assertIn(b"<title>Directory listing for foo</title>", data) 1498 1499 def test_renderUnquoteHeader(self): 1500 """ 1501 L{static.DirectoryLister} unquote the request uri before printing it. 1502 """ 1503 path = FilePath(self.mktemp()) 1504 path.makedirs() 1505 1506 lister = static.DirectoryLister(path.path) 1507 data = lister.render(self._request(b"foo%20bar")) 1508 self.assertIn(b"<h1>Directory listing for foo bar</h1>", data) 1509 self.assertIn(b"<title>Directory listing for foo bar</title>", data) 1510 1511 def test_escapeHeader(self): 1512 """ 1513 L{static.DirectoryLister} escape "&", "<" and ">" after unquoting the 1514 request uri. 1515 """ 1516 path = FilePath(self.mktemp()) 1517 path.makedirs() 1518 1519 lister = static.DirectoryLister(path.path) 1520 data = lister.render(self._request(b"foo%26bar")) 1521 self.assertIn(b"<h1>Directory listing for foo&bar</h1>", data) 1522 self.assertIn(b"<title>Directory listing for foo&bar</title>", data) 1523 1524 def test_renderFiles(self): 1525 """ 1526 L{static.DirectoryLister} is able to list all the files inside a 1527 directory. 1528 """ 1529 path = FilePath(self.mktemp()) 1530 path.makedirs() 1531 path.child("file1").setContent(b"content1") 1532 path.child("file2").setContent(b"content2" * 1000) 1533 1534 lister = static.DirectoryLister(path.path) 1535 data = lister.render(self._request(b"foo")) 1536 body = b"""<tr class="odd"> 1537 <td><a href="file1">file1</a></td> 1538 <td>8B</td> 1539 <td>[text/html]</td> 1540 <td></td> 1541</tr> 1542<tr class="even"> 1543 <td><a href="file2">file2</a></td> 1544 <td>7K</td> 1545 <td>[text/html]</td> 1546 <td></td> 1547</tr>""" 1548 self.assertIn(body, data) 1549 1550 def test_renderDirectories(self): 1551 """ 1552 L{static.DirectoryLister} is able to list all the directories inside 1553 a directory. 1554 """ 1555 path = FilePath(self.mktemp()) 1556 path.makedirs() 1557 path.child("dir1").makedirs() 1558 path.child("dir2 & 3").makedirs() 1559 1560 lister = static.DirectoryLister(path.path) 1561 data = lister.render(self._request(b"foo")) 1562 body = b"""<tr class="odd"> 1563 <td><a href="dir1/">dir1/</a></td> 1564 <td></td> 1565 <td>[Directory]</td> 1566 <td></td> 1567</tr> 1568<tr class="even"> 1569 <td><a href="dir2%20%26%203/">dir2 & 3/</a></td> 1570 <td></td> 1571 <td>[Directory]</td> 1572 <td></td> 1573</tr>""" 1574 self.assertIn(body, data) 1575 1576 def test_renderFiltered(self): 1577 """ 1578 L{static.DirectoryLister} takes an optional C{dirs} argument that 1579 filter out the list of directories and files printed. 1580 """ 1581 path = FilePath(self.mktemp()) 1582 path.makedirs() 1583 path.child("dir1").makedirs() 1584 path.child("dir2").makedirs() 1585 path.child("dir3").makedirs() 1586 lister = static.DirectoryLister(path.path, dirs=["dir1", "dir3"]) 1587 data = lister.render(self._request(b"foo")) 1588 body = b"""<tr class="odd"> 1589 <td><a href="dir1/">dir1/</a></td> 1590 <td></td> 1591 <td>[Directory]</td> 1592 <td></td> 1593</tr> 1594<tr class="even"> 1595 <td><a href="dir3/">dir3/</a></td> 1596 <td></td> 1597 <td>[Directory]</td> 1598 <td></td> 1599</tr>""" 1600 self.assertIn(body, data) 1601 1602 def test_oddAndEven(self): 1603 """ 1604 L{static.DirectoryLister} gives an alternate class for each odd and 1605 even rows in the table. 1606 """ 1607 lister = static.DirectoryLister(None) 1608 elements = [ 1609 {"href": "", "text": "", "size": "", "type": "", "encoding": ""} 1610 for i in range(5) 1611 ] 1612 content = lister._buildTableContent(elements) 1613 1614 self.assertEqual(len(content), 5) 1615 self.assertTrue(content[0].startswith('<tr class="odd">')) 1616 self.assertTrue(content[1].startswith('<tr class="even">')) 1617 self.assertTrue(content[2].startswith('<tr class="odd">')) 1618 self.assertTrue(content[3].startswith('<tr class="even">')) 1619 self.assertTrue(content[4].startswith('<tr class="odd">')) 1620 1621 def test_contentType(self): 1622 """ 1623 L{static.DirectoryLister} produces a MIME-type that indicates that it is 1624 HTML, and includes its charset (UTF-8). 1625 """ 1626 path = FilePath(self.mktemp()) 1627 path.makedirs() 1628 lister = static.DirectoryLister(path.path) 1629 req = self._request(b"") 1630 lister.render(req) 1631 self.assertEqual( 1632 req.responseHeaders.getRawHeaders(b"content-type")[0], 1633 b"text/html; charset=utf-8", 1634 ) 1635 1636 def test_mimeTypeAndEncodings(self): 1637 """ 1638 L{static.DirectoryLister} is able to detect mimetype and encoding of 1639 listed files. 1640 """ 1641 path = FilePath(self.mktemp()) 1642 path.makedirs() 1643 path.child("file1.txt").setContent(b"file1") 1644 path.child("file2.py").setContent(b"python") 1645 path.child("file3.conf.gz").setContent(b"conf compressed") 1646 path.child("file4.diff.bz2").setContent(b"diff compressed") 1647 directory = os.listdir(path.path) 1648 directory.sort() 1649 1650 contentTypes = { 1651 ".txt": "text/plain", 1652 ".py": "text/python", 1653 ".conf": "text/configuration", 1654 ".diff": "text/diff", 1655 } 1656 1657 lister = static.DirectoryLister(path.path, contentTypes=contentTypes) 1658 dirs, files = lister._getFilesAndDirectories(directory) 1659 self.assertEqual(dirs, []) 1660 self.assertEqual( 1661 files, 1662 [ 1663 { 1664 "encoding": "", 1665 "href": "file1.txt", 1666 "size": "5B", 1667 "text": "file1.txt", 1668 "type": "[text/plain]", 1669 }, 1670 { 1671 "encoding": "", 1672 "href": "file2.py", 1673 "size": "6B", 1674 "text": "file2.py", 1675 "type": "[text/python]", 1676 }, 1677 { 1678 "encoding": "[gzip]", 1679 "href": "file3.conf.gz", 1680 "size": "15B", 1681 "text": "file3.conf.gz", 1682 "type": "[text/configuration]", 1683 }, 1684 { 1685 "encoding": "[bzip2]", 1686 "href": "file4.diff.bz2", 1687 "size": "15B", 1688 "text": "file4.diff.bz2", 1689 "type": "[text/diff]", 1690 }, 1691 ], 1692 ) 1693 1694 @skipIf(not platform._supportsSymlinks(), "No symlink support") 1695 def test_brokenSymlink(self): 1696 """ 1697 If on the file in the listing points to a broken symlink, it should not 1698 be returned by L{static.DirectoryLister._getFilesAndDirectories}. 1699 """ 1700 path = FilePath(self.mktemp()) 1701 path.makedirs() 1702 file1 = path.child("file1") 1703 file1.setContent(b"file1") 1704 file1.linkTo(path.child("file2")) 1705 file1.remove() 1706 1707 lister = static.DirectoryLister(path.path) 1708 directory = os.listdir(path.path) 1709 directory.sort() 1710 dirs, files = lister._getFilesAndDirectories(directory) 1711 self.assertEqual(dirs, []) 1712 self.assertEqual(files, []) 1713 1714 def test_childrenNotFound(self): 1715 """ 1716 Any child resource of L{static.DirectoryLister} renders an HTTP 1717 I{NOT FOUND} response code. 1718 """ 1719 path = FilePath(self.mktemp()) 1720 path.makedirs() 1721 lister = static.DirectoryLister(path.path) 1722 request = self._request(b"") 1723 child = resource.getChildForRequest(lister, request) 1724 result = _render(child, request) 1725 1726 def cbRendered(ignored): 1727 self.assertEqual(request.responseCode, http.NOT_FOUND) 1728 1729 result.addCallback(cbRendered) 1730 return result 1731 1732 def test_repr(self): 1733 """ 1734 L{static.DirectoryLister.__repr__} gives the path of the lister. 1735 """ 1736 path = FilePath(self.mktemp()) 1737 lister = static.DirectoryLister(path.path) 1738 self.assertEqual(repr(lister), f"<DirectoryLister of {path.path!r}>") 1739 self.assertEqual(str(lister), f"<DirectoryLister of {path.path!r}>") 1740 1741 def test_formatFileSize(self): 1742 """ 1743 L{static.formatFileSize} format an amount of bytes into a more readable 1744 format. 1745 """ 1746 self.assertEqual(static.formatFileSize(0), "0B") 1747 self.assertEqual(static.formatFileSize(123), "123B") 1748 self.assertEqual(static.formatFileSize(4567), "4K") 1749 self.assertEqual(static.formatFileSize(8900000), "8M") 1750 self.assertEqual(static.formatFileSize(1234000000), "1G") 1751 self.assertEqual(static.formatFileSize(1234567890000), "1149G") 1752 1753 1754class LoadMimeTypesTests(TestCase): 1755 """ 1756 Tests for the MIME type loading routine. 1757 1758 @cvar UNSET: A sentinel to signify that C{self.paths} has not been set by 1759 the mock init. 1760 """ 1761 1762 UNSET = object() 1763 1764 def setUp(self): 1765 self.paths = self.UNSET 1766 1767 def _fakeInit(self, paths): 1768 """ 1769 A mock L{mimetypes.init} that records the value of the passed C{paths} 1770 argument. 1771 1772 @param paths: The paths that will be recorded. 1773 """ 1774 self.paths = paths 1775 1776 def test_defaultArgumentIsNone(self): 1777 """ 1778 By default, L{None} is passed to C{mimetypes.init}. 1779 """ 1780 static.loadMimeTypes(init=self._fakeInit) 1781 self.assertIdentical(self.paths, None) 1782 1783 def test_extraLocationsWork(self): 1784 """ 1785 Passed MIME type files are passed to C{mimetypes.init}. 1786 """ 1787 paths = ["x", "y", "z"] 1788 static.loadMimeTypes(paths, init=self._fakeInit) 1789 self.assertIdentical(self.paths, paths) 1790 1791 def test_usesGlobalInitFunction(self): 1792 """ 1793 By default, C{mimetypes.init} is called. 1794 """ 1795 # Checking mimetypes.inited doesn't always work, because 1796 # something, somewhere, calls mimetypes.init. Yay global 1797 # mutable state :) 1798 if getattr(inspect, "signature", None): 1799 signature = inspect.signature(static.loadMimeTypes) 1800 self.assertIs(signature.parameters["init"].default, mimetypes.init) 1801 else: 1802 args, _, _, defaults = inspect.getargspec(static.loadMimeTypes) 1803 defaultInit = defaults[args.index("init")] 1804 self.assertIs(defaultInit, mimetypes.init) 1805 1806 1807class StaticDeprecationTests(TestCase): 1808 def test_addSlashDeprecated(self): 1809 """ 1810 L{twisted.web.static.addSlash} is deprecated. 1811 """ 1812 from twisted.web.static import addSlash 1813 1814 addSlash(DummyRequest([b""])) 1815 1816 warnings = self.flushWarnings([self.test_addSlashDeprecated]) 1817 self.assertEqual(len(warnings), 1) 1818 self.assertEqual( 1819 warnings[0]["message"], 1820 "twisted.web.static.addSlash was deprecated in Twisted 16.0.0", 1821 ) 1822