1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for L{twisted.web.static}.
6"""
7import errno
8import inspect
9import mimetypes
10import os
11import re
12import sys
13import warnings
14from io import BytesIO as StringIO
15from unittest import skipIf
16
17from zope.interface.verify import verifyObject
18
19from twisted.internet import abstract, interfaces
20from twisted.python import compat, log
21from twisted.python.compat import networkString
22from twisted.python.filepath import FilePath
23from twisted.python.runtime import platform
24from twisted.trial.unittest import TestCase
25from twisted.web import http, resource, script, static
26from twisted.web._responses import FOUND
27from twisted.web.server import UnsupportedMethod
28from twisted.web.test._util import _render
29from twisted.web.test.requesthelper import DummyRequest
30
31
32class StaticDataTests(TestCase):
33    """
34    Tests for L{Data}.
35    """
36
37    def test_headRequest(self):
38        """
39        L{Data.render} returns an empty response body for a I{HEAD} request.
40        """
41        data = static.Data(b"foo", "bar")
42        request = DummyRequest([""])
43        request.method = b"HEAD"
44        d = _render(data, request)
45
46        def cbRendered(ignored):
47            self.assertEqual(b"".join(request.written), b"")
48
49        d.addCallback(cbRendered)
50        return d
51
52    def test_invalidMethod(self):
53        """
54        L{Data.render} raises L{UnsupportedMethod} in response to a non-I{GET},
55        non-I{HEAD} request.
56        """
57        data = static.Data(b"foo", b"bar")
58        request = DummyRequest([b""])
59        request.method = b"POST"
60        self.assertRaises(UnsupportedMethod, data.render, request)
61
62
63class StaticFileTests(TestCase):
64    """
65    Tests for the basic behavior of L{File}.
66    """
67
68    def _render(self, resource, request):
69        return _render(resource, request)
70
71    def test_ignoredExtTrue(self):
72        """
73        Passing C{1} as the value to L{File}'s C{ignoredExts} argument
74        issues a warning and sets the ignored extensions to the
75        wildcard C{"*"}.
76        """
77        with warnings.catch_warnings(record=True) as caughtWarnings:
78            file = static.File(self.mktemp(), ignoredExts=1)
79            self.assertEqual(file.ignoredExts, ["*"])
80
81        self.assertEqual(len(caughtWarnings), 1)
82
83    def test_ignoredExtFalse(self):
84        """
85        Passing C{1} as the value to L{File}'s C{ignoredExts} argument
86        issues a warning and sets the ignored extensions to the empty
87        list.
88        """
89        with warnings.catch_warnings(record=True) as caughtWarnings:
90            file = static.File(self.mktemp(), ignoredExts=0)
91            self.assertEqual(file.ignoredExts, [])
92
93        self.assertEqual(len(caughtWarnings), 1)
94
95    def test_allowExt(self):
96        """
97        Passing C{1} as the value to L{File}'s C{allowExt} argument
98        issues a warning and sets the ignored extensions to the
99        wildcard C{*}.
100        """
101        with warnings.catch_warnings(record=True) as caughtWarnings:
102            file = static.File(self.mktemp(), ignoredExts=True)
103            self.assertEqual(file.ignoredExts, ["*"])
104
105        self.assertEqual(len(caughtWarnings), 1)
106
107    def test_invalidMethod(self):
108        """
109        L{File.render} raises L{UnsupportedMethod} in response to a non-I{GET},
110        non-I{HEAD} request.
111        """
112        request = DummyRequest([b""])
113        request.method = b"POST"
114        path = FilePath(self.mktemp())
115        path.setContent(b"foo")
116        file = static.File(path.path)
117        self.assertRaises(UnsupportedMethod, file.render, request)
118
119    def test_notFound(self):
120        """
121        If a request is made which encounters a L{File} before a final segment
122        which does not correspond to any file in the path the L{File} was
123        created with, a not found response is sent.
124        """
125        base = FilePath(self.mktemp())
126        base.makedirs()
127        file = static.File(base.path)
128
129        request = DummyRequest([b"foobar"])
130        child = resource.getChildForRequest(file, request)
131
132        d = self._render(child, request)
133
134        def cbRendered(ignored):
135            self.assertEqual(request.responseCode, 404)
136
137        d.addCallback(cbRendered)
138        return d
139
140    def test_emptyChild(self):
141        """
142        The C{''} child of a L{File} which corresponds to a directory in the
143        filesystem is a L{DirectoryLister}.
144        """
145        base = FilePath(self.mktemp())
146        base.makedirs()
147        file = static.File(base.path)
148
149        request = DummyRequest([b""])
150        child = resource.getChildForRequest(file, request)
151        self.assertIsInstance(child, static.DirectoryLister)
152        self.assertEqual(child.path, base.path)
153
154    def test_emptyChildUnicodeParent(self):
155        """
156        The C{u''} child of a L{File} which corresponds to a directory
157        whose path is text is a L{DirectoryLister} that renders to a
158        binary listing.
159
160        @see: U{https://twistedmatrix.com/trac/ticket/9438}
161        """
162        textBase = FilePath(self.mktemp()).asTextMode()
163        textBase.makedirs()
164        textBase.child("text-file").open("w").close()
165        textFile = static.File(textBase.path)
166
167        request = DummyRequest([b""])
168        child = resource.getChildForRequest(textFile, request)
169        self.assertIsInstance(child, static.DirectoryLister)
170
171        nativePath = compat.nativeString(textBase.path)
172        self.assertEqual(child.path, nativePath)
173
174        response = child.render(request)
175        self.assertIsInstance(response, bytes)
176
177    def test_securityViolationNotFound(self):
178        """
179        If a request is made which encounters a L{File} before a final segment
180        which cannot be looked up in the filesystem due to security
181        considerations, a not found response is sent.
182        """
183        base = FilePath(self.mktemp())
184        base.makedirs()
185        file = static.File(base.path)
186
187        request = DummyRequest([b".."])
188        child = resource.getChildForRequest(file, request)
189
190        d = self._render(child, request)
191
192        def cbRendered(ignored):
193            self.assertEqual(request.responseCode, 404)
194
195        d.addCallback(cbRendered)
196        return d
197
198    @skipIf(platform.isWindows(), "Cannot remove read permission on Windows")
199    def test_forbiddenResource(self):
200        """
201        If the file in the filesystem which would satisfy a request cannot be
202        read, L{File.render} sets the HTTP response code to I{FORBIDDEN}.
203        """
204        base = FilePath(self.mktemp())
205        base.setContent(b"")
206        # Make sure we can delete the file later.
207        self.addCleanup(base.chmod, 0o700)
208
209        # Get rid of our own read permission.
210        base.chmod(0)
211
212        file = static.File(base.path)
213        request = DummyRequest([b""])
214        d = self._render(file, request)
215
216        def cbRendered(ignored):
217            self.assertEqual(request.responseCode, 403)
218
219        d.addCallback(cbRendered)
220        return d
221
222    def test_undecodablePath(self):
223        """
224        A request whose path cannot be decoded as UTF-8 receives a not
225        found response, and the failure is logged.
226        """
227        path = self.mktemp()
228        if isinstance(path, bytes):
229            path = path.decode("ascii")
230        base = FilePath(path)
231        base.makedirs()
232
233        file = static.File(base.path)
234        request = DummyRequest([b"\xff"])
235        child = resource.getChildForRequest(file, request)
236
237        d = self._render(child, request)
238
239        def cbRendered(ignored):
240            self.assertEqual(request.responseCode, 404)
241            self.assertEqual(len(self.flushLoggedErrors(UnicodeDecodeError)), 1)
242
243        d.addCallback(cbRendered)
244        return d
245
246    def test_forbiddenResource_default(self):
247        """
248        L{File.forbidden} defaults to L{resource.ForbiddenResource}.
249        """
250        self.assertIsInstance(static.File(b".").forbidden, resource.ForbiddenResource)
251
252    def test_forbiddenResource_customize(self):
253        """
254        The resource rendered for forbidden requests is stored as a class
255        member so that users can customize it.
256        """
257        base = FilePath(self.mktemp())
258        base.setContent(b"")
259        markerResponse = b"custom-forbidden-response"
260
261        def failingOpenForReading():
262            raise OSError(errno.EACCES, "")
263
264        class CustomForbiddenResource(resource.Resource):
265            def render(self, request):
266                return markerResponse
267
268        class CustomStaticFile(static.File):
269            forbidden = CustomForbiddenResource()
270
271        fileResource = CustomStaticFile(base.path)
272        fileResource.openForReading = failingOpenForReading
273        request = DummyRequest([b""])
274
275        result = fileResource.render(request)
276
277        self.assertEqual(markerResponse, result)
278
279    def test_indexNames(self):
280        """
281        If a request is made which encounters a L{File} before a final empty
282        segment, a file in the L{File} instance's C{indexNames} list which
283        exists in the path the L{File} was created with is served as the
284        response to the request.
285        """
286        base = FilePath(self.mktemp())
287        base.makedirs()
288        base.child("foo.bar").setContent(b"baz")
289        file = static.File(base.path)
290        file.indexNames = ["foo.bar"]
291
292        request = DummyRequest([b""])
293        child = resource.getChildForRequest(file, request)
294
295        d = self._render(child, request)
296
297        def cbRendered(ignored):
298            self.assertEqual(b"".join(request.written), b"baz")
299            self.assertEqual(
300                request.responseHeaders.getRawHeaders(b"content-length")[0], b"3"
301            )
302
303        d.addCallback(cbRendered)
304        return d
305
306    def test_staticFile(self):
307        """
308        If a request is made which encounters a L{File} before a final segment
309        which names a file in the path the L{File} was created with, that file
310        is served as the response to the request.
311        """
312        base = FilePath(self.mktemp())
313        base.makedirs()
314        base.child("foo.bar").setContent(b"baz")
315        file = static.File(base.path)
316
317        request = DummyRequest([b"foo.bar"])
318        child = resource.getChildForRequest(file, request)
319
320        d = self._render(child, request)
321
322        def cbRendered(ignored):
323            self.assertEqual(b"".join(request.written), b"baz")
324            self.assertEqual(
325                request.responseHeaders.getRawHeaders(b"content-length")[0], b"3"
326            )
327
328        d.addCallback(cbRendered)
329        return d
330
331    @skipIf(
332        sys.getfilesystemencoding().lower() not in ("utf-8", "mcbs"),
333        "Cannot write unicode filenames with file system encoding of"
334        " {}".format(sys.getfilesystemencoding()),
335    )
336    def test_staticFileUnicodeFileName(self):
337        """
338        A request for a existing unicode file path encoded as UTF-8
339        returns the contents of that file.
340        """
341        name = "\N{GREEK SMALL LETTER ETA WITH PERISPOMENI}"
342        content = b"content"
343
344        base = FilePath(self.mktemp())
345        base.makedirs()
346        base.child(name).setContent(content)
347        file = static.File(base.path)
348
349        request = DummyRequest([name.encode("utf-8")])
350        child = resource.getChildForRequest(file, request)
351
352        d = self._render(child, request)
353
354        def cbRendered(ignored):
355            self.assertEqual(b"".join(request.written), content)
356            self.assertEqual(
357                request.responseHeaders.getRawHeaders(b"content-length")[0],
358                networkString(str(len(content))),
359            )
360
361        d.addCallback(cbRendered)
362        return d
363
364    def test_staticFileDeletedGetChild(self):
365        """
366        A L{static.File} created for a directory which does not exist should
367        return childNotFound from L{static.File.getChild}.
368        """
369        staticFile = static.File(self.mktemp())
370        request = DummyRequest([b"foo.bar"])
371        child = staticFile.getChild(b"foo.bar", request)
372        self.assertEqual(child, staticFile.childNotFound)
373
374    def test_staticFileDeletedRender(self):
375        """
376        A L{static.File} created for a file which does not exist should render
377        its C{childNotFound} page.
378        """
379        staticFile = static.File(self.mktemp())
380        request = DummyRequest([b"foo.bar"])
381        request2 = DummyRequest([b"foo.bar"])
382        d = self._render(staticFile, request)
383        d2 = self._render(staticFile.childNotFound, request2)
384
385        def cbRendered2(ignored):
386            def cbRendered(ignored):
387                self.assertEqual(b"".join(request.written), b"".join(request2.written))
388
389            d.addCallback(cbRendered)
390            return d
391
392        d2.addCallback(cbRendered2)
393        return d2
394
395    def test_getChildChildNotFound_customize(self):
396        """
397        The resource rendered for child not found requests can be customize
398        using a class member.
399        """
400        base = FilePath(self.mktemp())
401        base.setContent(b"")
402        markerResponse = b"custom-child-not-found-response"
403
404        class CustomChildNotFoundResource(resource.Resource):
405            def render(self, request):
406                return markerResponse
407
408        class CustomStaticFile(static.File):
409            childNotFound = CustomChildNotFoundResource()
410
411        fileResource = CustomStaticFile(base.path)
412        request = DummyRequest([b"no-child.txt"])
413
414        child = fileResource.getChild(b"no-child.txt", request)
415        result = child.render(request)
416
417        self.assertEqual(markerResponse, result)
418
419    def test_headRequest(self):
420        """
421        L{static.File.render} returns an empty response body for I{HEAD}
422        requests.
423        """
424        path = FilePath(self.mktemp())
425        path.setContent(b"foo")
426        file = static.File(path.path)
427        request = DummyRequest([b""])
428        request.method = b"HEAD"
429        d = _render(file, request)
430
431        def cbRendered(ignored):
432            self.assertEqual(b"".join(request.written), b"")
433
434        d.addCallback(cbRendered)
435        return d
436
437    def test_processors(self):
438        """
439        If a request is made which encounters a L{File} before a final segment
440        which names a file with an extension which is in the L{File}'s
441        C{processors} mapping, the processor associated with that extension is
442        used to serve the response to the request.
443        """
444        base = FilePath(self.mktemp())
445        base.makedirs()
446        base.child("foo.bar").setContent(
447            b"from twisted.web.static import Data\n"
448            b"resource = Data(b'dynamic world', 'text/plain')\n"
449        )
450
451        file = static.File(base.path)
452        file.processors = {".bar": script.ResourceScript}
453        request = DummyRequest([b"foo.bar"])
454        child = resource.getChildForRequest(file, request)
455
456        d = self._render(child, request)
457
458        def cbRendered(ignored):
459            self.assertEqual(b"".join(request.written), b"dynamic world")
460            self.assertEqual(
461                request.responseHeaders.getRawHeaders(b"content-length")[0], b"13"
462            )
463
464        d.addCallback(cbRendered)
465        return d
466
467    def test_ignoreExt(self):
468        """
469        The list of ignored extensions can be set by passing a value to
470        L{File.__init__} or by calling L{File.ignoreExt} later.
471        """
472        file = static.File(b".")
473        self.assertEqual(file.ignoredExts, [])
474        file.ignoreExt(".foo")
475        file.ignoreExt(".bar")
476        self.assertEqual(file.ignoredExts, [".foo", ".bar"])
477
478        file = static.File(b".", ignoredExts=(".bar", ".baz"))
479        self.assertEqual(file.ignoredExts, [".bar", ".baz"])
480
481    def test_ignoredExtensionsIgnored(self):
482        """
483        A request for the I{base} child of a L{File} succeeds with a resource
484        for the I{base<extension>} file in the path the L{File} was created
485        with if such a file exists and the L{File} has been configured to
486        ignore the I{<extension>} extension.
487        """
488        base = FilePath(self.mktemp())
489        base.makedirs()
490        base.child("foo.bar").setContent(b"baz")
491        base.child("foo.quux").setContent(b"foobar")
492        file = static.File(base.path, ignoredExts=(".bar",))
493
494        request = DummyRequest([b"foo"])
495        child = resource.getChildForRequest(file, request)
496
497        d = self._render(child, request)
498
499        def cbRendered(ignored):
500            self.assertEqual(b"".join(request.written), b"baz")
501
502        d.addCallback(cbRendered)
503        return d
504
505    def test_directoryWithoutTrailingSlashRedirects(self):
506        """
507        A request for a path which is a directory but does not have a trailing
508        slash will be redirected to a URL which does have a slash by L{File}.
509        """
510        base = FilePath(self.mktemp())
511        base.makedirs()
512        base.child("folder").makedirs()
513        file = static.File(base.path)
514
515        request = DummyRequest([b"folder"])
516        request.uri = b"http://dummy/folder#baz?foo=bar"
517        child = resource.getChildForRequest(file, request)
518
519        self.successResultOf(self._render(child, request))
520        self.assertEqual(request.responseCode, FOUND)
521        self.assertEqual(
522            request.responseHeaders.getRawHeaders(b"location"),
523            [b"http://dummy/folder/#baz?foo=bar"],
524        )
525
526    def _makeFilePathWithStringIO(self):
527        """
528        Create a L{File} that when opened for reading, returns a L{StringIO}.
529
530        @return: 2-tuple of the opened "file" and the L{File}.
531        @rtype: L{tuple}
532        """
533        fakeFile = StringIO()
534        path = FilePath(self.mktemp())
535        path.touch()
536        file = static.File(path.path)
537        # Open our file instead of a real one
538        file.open = lambda: fakeFile
539        return fakeFile, file
540
541    def test_HEADClosesFile(self):
542        """
543        A HEAD request opens the file, gets the size, and then closes it after
544        the request.
545        """
546        fakeFile, file = self._makeFilePathWithStringIO()
547        request = DummyRequest([""])
548        request.method = b"HEAD"
549        self.successResultOf(_render(file, request))
550        self.assertEqual(b"".join(request.written), b"")
551        self.assertTrue(fakeFile.closed)
552
553    def test_cachedRequestClosesFile(self):
554        """
555        A GET request that is cached closes the file after the request.
556        """
557        fakeFile, file = self._makeFilePathWithStringIO()
558        request = DummyRequest([""])
559        request.method = b"GET"
560        # This request will always return saying that it is cached
561        request.setLastModified = lambda _: http.CACHED
562        self.successResultOf(_render(file, request))
563        self.assertEqual(b"".join(request.written), b"")
564        self.assertTrue(fakeFile.closed)
565
566
567class StaticMakeProducerTests(TestCase):
568    """
569    Tests for L{File.makeProducer}.
570    """
571
572    def makeResourceWithContent(self, content, type=None, encoding=None):
573        """
574        Make a L{static.File} resource that has C{content} for its content.
575
576        @param content: The L{bytes} to use as the contents of the resource.
577        @param type: Optional value for the content type of the resource.
578        """
579        fileName = FilePath(self.mktemp())
580        fileName.setContent(content)
581        resource = static.File(fileName._asBytesPath())
582        resource.encoding = encoding
583        resource.type = type
584        return resource
585
586    def contentHeaders(self, request):
587        """
588        Extract the content-* headers from the L{DummyRequest} C{request}.
589
590        This returns the subset of C{request.outgoingHeaders} of headers that
591        start with 'content-'.
592        """
593        contentHeaders = {}
594        for k, v in request.responseHeaders.getAllRawHeaders():
595            if k.lower().startswith(b"content-"):
596                contentHeaders[k.lower()] = v[0]
597        return contentHeaders
598
599    def test_noRangeHeaderGivesNoRangeStaticProducer(self):
600        """
601        makeProducer when no Range header is set returns an instance of
602        NoRangeStaticProducer.
603        """
604        resource = self.makeResourceWithContent(b"")
605        request = DummyRequest([])
606        with resource.openForReading() as file:
607            producer = resource.makeProducer(request, file)
608            self.assertIsInstance(producer, static.NoRangeStaticProducer)
609
610    def test_noRangeHeaderSets200OK(self):
611        """
612        makeProducer when no Range header is set sets the responseCode on the
613        request to 'OK'.
614        """
615        resource = self.makeResourceWithContent(b"")
616        request = DummyRequest([])
617        with resource.openForReading() as file:
618            resource.makeProducer(request, file)
619            self.assertEqual(http.OK, request.responseCode)
620
621    def test_noRangeHeaderSetsContentHeaders(self):
622        """
623        makeProducer when no Range header is set sets the Content-* headers
624        for the response.
625        """
626        length = 123
627        contentType = "text/plain"
628        contentEncoding = "gzip"
629        resource = self.makeResourceWithContent(
630            b"a" * length, type=contentType, encoding=contentEncoding
631        )
632        request = DummyRequest([])
633        with resource.openForReading() as file:
634            resource.makeProducer(request, file)
635            self.assertEqual(
636                {
637                    b"content-type": networkString(contentType),
638                    b"content-length": b"%d" % (length,),
639                    b"content-encoding": networkString(contentEncoding),
640                },
641                self.contentHeaders(request),
642            )
643
644    def test_singleRangeGivesSingleRangeStaticProducer(self):
645        """
646        makeProducer when the Range header requests a single byte range
647        returns an instance of SingleRangeStaticProducer.
648        """
649        request = DummyRequest([])
650        request.requestHeaders.addRawHeader(b"range", b"bytes=1-3")
651        resource = self.makeResourceWithContent(b"abcdef")
652        with resource.openForReading() as file:
653            producer = resource.makeProducer(request, file)
654            self.assertIsInstance(producer, static.SingleRangeStaticProducer)
655
656    def test_singleRangeSets206PartialContent(self):
657        """
658        makeProducer when the Range header requests a single, satisfiable byte
659        range sets the response code on the request to 'Partial Content'.
660        """
661        request = DummyRequest([])
662        request.requestHeaders.addRawHeader(b"range", b"bytes=1-3")
663        resource = self.makeResourceWithContent(b"abcdef")
664        with resource.openForReading() as file:
665            resource.makeProducer(request, file)
666            self.assertEqual(http.PARTIAL_CONTENT, request.responseCode)
667
668    def test_singleRangeSetsContentHeaders(self):
669        """
670        makeProducer when the Range header requests a single, satisfiable byte
671        range sets the Content-* headers appropriately.
672        """
673        request = DummyRequest([])
674        request.requestHeaders.addRawHeader(b"range", b"bytes=1-3")
675        contentType = "text/plain"
676        contentEncoding = "gzip"
677        resource = self.makeResourceWithContent(
678            b"abcdef", type=contentType, encoding=contentEncoding
679        )
680        with resource.openForReading() as file:
681            resource.makeProducer(request, file)
682            self.assertEqual(
683                {
684                    b"content-type": networkString(contentType),
685                    b"content-encoding": networkString(contentEncoding),
686                    b"content-range": b"bytes 1-3/6",
687                    b"content-length": b"3",
688                },
689                self.contentHeaders(request),
690            )
691
692    def test_singleUnsatisfiableRangeReturnsSingleRangeStaticProducer(self):
693        """
694        makeProducer still returns an instance of L{SingleRangeStaticProducer}
695        when the Range header requests a single unsatisfiable byte range.
696        """
697        request = DummyRequest([])
698        request.requestHeaders.addRawHeader(b"range", b"bytes=4-10")
699        resource = self.makeResourceWithContent(b"abc")
700        with resource.openForReading() as file:
701            producer = resource.makeProducer(request, file)
702            self.assertIsInstance(producer, static.SingleRangeStaticProducer)
703
704    def test_singleUnsatisfiableRangeSets416ReqestedRangeNotSatisfiable(self):
705        """
706        makeProducer sets the response code of the request to of 'Requested
707        Range Not Satisfiable' when the Range header requests a single
708        unsatisfiable byte range.
709        """
710        request = DummyRequest([])
711        request.requestHeaders.addRawHeader(b"range", b"bytes=4-10")
712        resource = self.makeResourceWithContent(b"abc")
713        with resource.openForReading() as file:
714            resource.makeProducer(request, file)
715            self.assertEqual(http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
716
717    def test_singleUnsatisfiableRangeSetsContentHeaders(self):
718        """
719        makeProducer when the Range header requests a single, unsatisfiable
720        byte range sets the Content-* headers appropriately.
721        """
722        request = DummyRequest([])
723        request.requestHeaders.addRawHeader(b"range", b"bytes=4-10")
724        contentType = "text/plain"
725        resource = self.makeResourceWithContent(b"abc", type=contentType)
726        with resource.openForReading() as file:
727            resource.makeProducer(request, file)
728            self.assertEqual(
729                {
730                    b"content-type": b"text/plain",
731                    b"content-length": b"0",
732                    b"content-range": b"bytes */3",
733                },
734                self.contentHeaders(request),
735            )
736
737    def test_singlePartiallyOverlappingRangeSetsContentHeaders(self):
738        """
739        makeProducer when the Range header requests a single byte range that
740        partly overlaps the resource sets the Content-* headers appropriately.
741        """
742        request = DummyRequest([])
743        request.requestHeaders.addRawHeader(b"range", b"bytes=2-10")
744        contentType = "text/plain"
745        resource = self.makeResourceWithContent(b"abc", type=contentType)
746        with resource.openForReading() as file:
747            resource.makeProducer(request, file)
748            self.assertEqual(
749                {
750                    b"content-type": b"text/plain",
751                    b"content-length": b"1",
752                    b"content-range": b"bytes 2-2/3",
753                },
754                self.contentHeaders(request),
755            )
756
757    def test_multipleRangeGivesMultipleRangeStaticProducer(self):
758        """
759        makeProducer when the Range header requests a single byte range
760        returns an instance of MultipleRangeStaticProducer.
761        """
762        request = DummyRequest([])
763        request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,5-6")
764        resource = self.makeResourceWithContent(b"abcdef")
765        with resource.openForReading() as file:
766            producer = resource.makeProducer(request, file)
767            self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
768
769    def test_multipleRangeSets206PartialContent(self):
770        """
771        makeProducer when the Range header requests a multiple satisfiable
772        byte ranges sets the response code on the request to 'Partial
773        Content'.
774        """
775        request = DummyRequest([])
776        request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,5-6")
777        resource = self.makeResourceWithContent(b"abcdef")
778        with resource.openForReading() as file:
779            resource.makeProducer(request, file)
780            self.assertEqual(http.PARTIAL_CONTENT, request.responseCode)
781
782    def test_mutipleRangeSetsContentHeaders(self):
783        """
784        makeProducer when the Range header requests a single, satisfiable byte
785        range sets the Content-* headers appropriately.
786        """
787        request = DummyRequest([])
788        request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,5-6")
789        resource = self.makeResourceWithContent(b"abcdefghijkl", encoding="gzip")
790        with resource.openForReading() as file:
791            producer = resource.makeProducer(request, file)
792            contentHeaders = self.contentHeaders(request)
793            # The only content-* headers set are content-type and content-length.
794            self.assertEqual(
795                {b"content-length", b"content-type"}, set(contentHeaders.keys())
796            )
797            # The content-length depends on the boundary used in the response.
798            expectedLength = 5
799            for boundary, offset, size in producer.rangeInfo:
800                expectedLength += len(boundary)
801            self.assertEqual(
802                b"%d" % (expectedLength,), contentHeaders[b"content-length"]
803            )
804            # Content-type should be set to a value indicating a multipart
805            # response and the boundary used to separate the parts.
806            self.assertIn(b"content-type", contentHeaders)
807            contentType = contentHeaders[b"content-type"]
808            self.assertNotIdentical(
809                None,
810                re.match(br'multipart/byteranges; boundary="[^"]*"\Z', contentType),
811            )
812            # Content-encoding is not set in the response to a multiple range
813            # response, which is a bit wussy but works well enough with the way
814            # static.File does content-encodings...
815            self.assertNotIn(b"content-encoding", contentHeaders)
816
817    def test_multipleUnsatisfiableRangesReturnsMultipleRangeStaticProducer(self):
818        """
819        makeProducer still returns an instance of L{SingleRangeStaticProducer}
820        when the Range header requests multiple ranges, none of which are
821        satisfiable.
822        """
823        request = DummyRequest([])
824        request.requestHeaders.addRawHeader(b"range", b"bytes=10-12,15-20")
825        resource = self.makeResourceWithContent(b"abc")
826        with resource.openForReading() as file:
827            producer = resource.makeProducer(request, file)
828            self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
829
830    def test_multipleUnsatisfiableRangesSets416ReqestedRangeNotSatisfiable(self):
831        """
832        makeProducer sets the response code of the request to of 'Requested
833        Range Not Satisfiable' when the Range header requests multiple ranges,
834        none of which are satisfiable.
835        """
836        request = DummyRequest([])
837        request.requestHeaders.addRawHeader(b"range", b"bytes=10-12,15-20")
838        resource = self.makeResourceWithContent(b"abc")
839        with resource.openForReading() as file:
840            resource.makeProducer(request, file)
841            self.assertEqual(http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
842
843    def test_multipleUnsatisfiableRangeSetsContentHeaders(self):
844        """
845        makeProducer when the Range header requests multiple ranges, none of
846        which are satisfiable, sets the Content-* headers appropriately.
847        """
848        request = DummyRequest([])
849        request.requestHeaders.addRawHeader(b"range", b"bytes=4-10")
850        contentType = "text/plain"
851        request.requestHeaders.addRawHeader(b"range", b"bytes=10-12,15-20")
852        resource = self.makeResourceWithContent(b"abc", type=contentType)
853        with resource.openForReading() as file:
854            resource.makeProducer(request, file)
855            self.assertEqual(
856                {
857                    b"content-length": b"0",
858                    b"content-range": b"bytes */3",
859                    b"content-type": b"text/plain",
860                },
861                self.contentHeaders(request),
862            )
863
864    def test_oneSatisfiableRangeIsEnough(self):
865        """
866        makeProducer when the Range header requests multiple ranges, at least
867        one of which matches, sets the response code to 'Partial Content'.
868        """
869        request = DummyRequest([])
870        request.requestHeaders.addRawHeader(b"range", b"bytes=1-3,100-200")
871        resource = self.makeResourceWithContent(b"abcdef")
872        with resource.openForReading() as file:
873            resource.makeProducer(request, file)
874            self.assertEqual(http.PARTIAL_CONTENT, request.responseCode)
875
876
877class StaticProducerTests(TestCase):
878    """
879    Tests for the abstract L{StaticProducer}.
880    """
881
882    def test_stopProducingClosesFile(self):
883        """
884        L{StaticProducer.stopProducing} closes the file object the producer is
885        producing data from.
886        """
887        fileObject = StringIO()
888        producer = static.StaticProducer(None, fileObject)
889        producer.stopProducing()
890        self.assertTrue(fileObject.closed)
891
892    def test_stopProducingSetsRequestToNone(self):
893        """
894        L{StaticProducer.stopProducing} sets the request instance variable to
895        None, which indicates to subclasses' resumeProducing methods that no
896        more data should be produced.
897        """
898        fileObject = StringIO()
899        producer = static.StaticProducer(DummyRequest([]), fileObject)
900        producer.stopProducing()
901        self.assertIdentical(None, producer.request)
902
903
904class NoRangeStaticProducerTests(TestCase):
905    """
906    Tests for L{NoRangeStaticProducer}.
907    """
908
909    def test_implementsIPullProducer(self):
910        """
911        L{NoRangeStaticProducer} implements L{IPullProducer}.
912        """
913        verifyObject(interfaces.IPullProducer, static.NoRangeStaticProducer(None, None))
914
915    def test_resumeProducingProducesContent(self):
916        """
917        L{NoRangeStaticProducer.resumeProducing} writes content from the
918        resource to the request.
919        """
920        request = DummyRequest([])
921        content = b"abcdef"
922        producer = static.NoRangeStaticProducer(request, StringIO(content))
923        # start calls registerProducer on the DummyRequest, which pulls all
924        # output from the producer and so we just need this one call.
925        producer.start()
926        self.assertEqual(content, b"".join(request.written))
927
928    def test_resumeProducingBuffersOutput(self):
929        """
930        L{NoRangeStaticProducer.start} writes at most
931        C{abstract.FileDescriptor.bufferSize} bytes of content from the
932        resource to the request at once.
933        """
934        request = DummyRequest([])
935        bufferSize = abstract.FileDescriptor.bufferSize
936        content = b"a" * (2 * bufferSize + 1)
937        producer = static.NoRangeStaticProducer(request, StringIO(content))
938        # start calls registerProducer on the DummyRequest, which pulls all
939        # output from the producer and so we just need this one call.
940        producer.start()
941        expected = [
942            content[0:bufferSize],
943            content[bufferSize : 2 * bufferSize],
944            content[2 * bufferSize :],
945        ]
946        self.assertEqual(expected, request.written)
947
948    def test_finishCalledWhenDone(self):
949        """
950        L{NoRangeStaticProducer.resumeProducing} calls finish() on the request
951        after it is done producing content.
952        """
953        request = DummyRequest([])
954        finishDeferred = request.notifyFinish()
955        callbackList = []
956        finishDeferred.addCallback(callbackList.append)
957        producer = static.NoRangeStaticProducer(request, StringIO(b"abcdef"))
958        # start calls registerProducer on the DummyRequest, which pulls all
959        # output from the producer and so we just need this one call.
960        producer.start()
961        self.assertEqual([None], callbackList)
962
963
964class SingleRangeStaticProducerTests(TestCase):
965    """
966    Tests for L{SingleRangeStaticProducer}.
967    """
968
969    def test_implementsIPullProducer(self):
970        """
971        L{SingleRangeStaticProducer} implements L{IPullProducer}.
972        """
973        verifyObject(
974            interfaces.IPullProducer,
975            static.SingleRangeStaticProducer(None, None, None, None),
976        )
977
978    def test_resumeProducingProducesContent(self):
979        """
980        L{SingleRangeStaticProducer.resumeProducing} writes the given amount
981        of content, starting at the given offset, from the resource to the
982        request.
983        """
984        request = DummyRequest([])
985        content = b"abcdef"
986        producer = static.SingleRangeStaticProducer(request, StringIO(content), 1, 3)
987        # DummyRequest.registerProducer pulls all output from the producer, so
988        # we just need to call start.
989        producer.start()
990        self.assertEqual(content[1:4], b"".join(request.written))
991
992    def test_resumeProducingBuffersOutput(self):
993        """
994        L{SingleRangeStaticProducer.start} writes at most
995        C{abstract.FileDescriptor.bufferSize} bytes of content from the
996        resource to the request at once.
997        """
998        request = DummyRequest([])
999        bufferSize = abstract.FileDescriptor.bufferSize
1000        content = b"abc" * bufferSize
1001        producer = static.SingleRangeStaticProducer(
1002            request, StringIO(content), 1, bufferSize + 10
1003        )
1004        # DummyRequest.registerProducer pulls all output from the producer, so
1005        # we just need to call start.
1006        producer.start()
1007        expected = [
1008            content[1 : bufferSize + 1],
1009            content[bufferSize + 1 : bufferSize + 11],
1010        ]
1011        self.assertEqual(expected, request.written)
1012
1013    def test_finishCalledWhenDone(self):
1014        """
1015        L{SingleRangeStaticProducer.resumeProducing} calls finish() on the
1016        request after it is done producing content.
1017        """
1018        request = DummyRequest([])
1019        finishDeferred = request.notifyFinish()
1020        callbackList = []
1021        finishDeferred.addCallback(callbackList.append)
1022        producer = static.SingleRangeStaticProducer(request, StringIO(b"abcdef"), 1, 1)
1023        # start calls registerProducer on the DummyRequest, which pulls all
1024        # output from the producer and so we just need this one call.
1025        producer.start()
1026        self.assertEqual([None], callbackList)
1027
1028
1029class MultipleRangeStaticProducerTests(TestCase):
1030    """
1031    Tests for L{MultipleRangeStaticProducer}.
1032    """
1033
1034    def test_implementsIPullProducer(self):
1035        """
1036        L{MultipleRangeStaticProducer} implements L{IPullProducer}.
1037        """
1038        verifyObject(
1039            interfaces.IPullProducer,
1040            static.MultipleRangeStaticProducer(None, None, None),
1041        )
1042
1043    def test_resumeProducingProducesContent(self):
1044        """
1045        L{MultipleRangeStaticProducer.resumeProducing} writes the requested
1046        chunks of content from the resource to the request, with the supplied
1047        boundaries in between each chunk.
1048        """
1049        request = DummyRequest([])
1050        content = b"abcdef"
1051        producer = static.MultipleRangeStaticProducer(
1052            request, StringIO(content), [(b"1", 1, 3), (b"2", 5, 1)]
1053        )
1054        # DummyRequest.registerProducer pulls all output from the producer, so
1055        # we just need to call start.
1056        producer.start()
1057        self.assertEqual(b"1bcd2f", b"".join(request.written))
1058
1059    def test_resumeProducingBuffersOutput(self):
1060        """
1061        L{MultipleRangeStaticProducer.start} writes about
1062        C{abstract.FileDescriptor.bufferSize} bytes of content from the
1063        resource to the request at once.
1064
1065        To be specific about the 'about' above: it can write slightly more,
1066        for example in the case where the first boundary plus the first chunk
1067        is less than C{bufferSize} but first boundary plus the first chunk
1068        plus the second boundary is more, but this is unimportant as in
1069        practice the boundaries are fairly small.  On the other side, it is
1070        important for performance to bundle up several small chunks into one
1071        call to request.write.
1072        """
1073        request = DummyRequest([])
1074        content = b"0123456789" * 2
1075        producer = static.MultipleRangeStaticProducer(
1076            request, StringIO(content), [(b"a", 0, 2), (b"b", 5, 10), (b"c", 0, 0)]
1077        )
1078        producer.bufferSize = 10
1079        # DummyRequest.registerProducer pulls all output from the producer, so
1080        # we just need to call start.
1081        producer.start()
1082        expected = [
1083            b"a" + content[0:2] + b"b" + content[5:11],
1084            content[11:15] + b"c",
1085        ]
1086        self.assertEqual(expected, request.written)
1087
1088    def test_finishCalledWhenDone(self):
1089        """
1090        L{MultipleRangeStaticProducer.resumeProducing} calls finish() on the
1091        request after it is done producing content.
1092        """
1093        request = DummyRequest([])
1094        finishDeferred = request.notifyFinish()
1095        callbackList = []
1096        finishDeferred.addCallback(callbackList.append)
1097        producer = static.MultipleRangeStaticProducer(
1098            request, StringIO(b"abcdef"), [(b"", 1, 2)]
1099        )
1100        # start calls registerProducer on the DummyRequest, which pulls all
1101        # output from the producer and so we just need this one call.
1102        producer.start()
1103        self.assertEqual([None], callbackList)
1104
1105
1106class RangeTests(TestCase):
1107    """
1108    Tests for I{Range-Header} support in L{twisted.web.static.File}.
1109
1110    @type file: L{file}
1111    @ivar file: Temporary (binary) file containing the content to be served.
1112
1113    @type resource: L{static.File}
1114    @ivar resource: A leaf web resource using C{file} as content.
1115
1116    @type request: L{DummyRequest}
1117    @ivar request: A fake request, requesting C{resource}.
1118
1119    @type catcher: L{list}
1120    @ivar catcher: List which gathers all log information.
1121    """
1122
1123    def setUp(self):
1124        """
1125        Create a temporary file with a fixed payload of 64 bytes.  Create a
1126        resource for that file and create a request which will be for that
1127        resource.  Each test can set a different range header to test different
1128        aspects of the implementation.
1129        """
1130        path = FilePath(self.mktemp())
1131        # This is just a jumble of random stuff.  It's supposed to be a good
1132        # set of data for this test, particularly in order to avoid
1133        # accidentally seeing the right result by having a byte sequence
1134        # repeated at different locations or by having byte values which are
1135        # somehow correlated with their position in the string.
1136        self.payload = (
1137            b"\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7"
1138            b"\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0"
1139            b"\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d"
1140            b"&\xfd%\xdd\x82q/A\x10Y\x8b"
1141        )
1142        path.setContent(self.payload)
1143        self.file = path.open()
1144        self.resource = static.File(self.file.name)
1145        self.resource.isLeaf = 1
1146        self.request = DummyRequest([b""])
1147        self.request.uri = self.file.name
1148        self.catcher = []
1149        log.addObserver(self.catcher.append)
1150
1151    def tearDown(self):
1152        """
1153        Clean up the resource file and the log observer.
1154        """
1155        self.file.close()
1156        log.removeObserver(self.catcher.append)
1157
1158    def _assertLogged(self, expected):
1159        """
1160        Asserts that a given log message occurred with an expected message.
1161        """
1162        logItem = self.catcher.pop()
1163        self.assertEqual(logItem["message"][0], expected)
1164        self.assertEqual(self.catcher, [], f"An additional log occurred: {logItem!r}")
1165
1166    def test_invalidRanges(self):
1167        """
1168        L{File._parseRangeHeader} raises L{ValueError} when passed
1169        syntactically invalid byte ranges.
1170        """
1171        f = self.resource._parseRangeHeader
1172
1173        # there's no =
1174        self.assertRaises(ValueError, f, b"bytes")
1175
1176        # unknown isn't a valid Bytes-Unit
1177        self.assertRaises(ValueError, f, b"unknown=1-2")
1178
1179        # there's no - in =stuff
1180        self.assertRaises(ValueError, f, b"bytes=3")
1181
1182        # both start and end are empty
1183        self.assertRaises(ValueError, f, b"bytes=-")
1184
1185        # start isn't an integer
1186        self.assertRaises(ValueError, f, b"bytes=foo-")
1187
1188        # end isn't an integer
1189        self.assertRaises(ValueError, f, b"bytes=-foo")
1190
1191        # end isn't equal to or greater than start
1192        self.assertRaises(ValueError, f, b"bytes=5-4")
1193
1194    def test_rangeMissingStop(self):
1195        """
1196        A single bytes range without an explicit stop position is parsed into a
1197        two-tuple giving the start position and L{None}.
1198        """
1199        self.assertEqual(self.resource._parseRangeHeader(b"bytes=0-"), [(0, None)])
1200
1201    def test_rangeMissingStart(self):
1202        """
1203        A single bytes range without an explicit start position is parsed into
1204        a two-tuple of L{None} and the end position.
1205        """
1206        self.assertEqual(self.resource._parseRangeHeader(b"bytes=-3"), [(None, 3)])
1207
1208    def test_range(self):
1209        """
1210        A single bytes range with explicit start and stop positions is parsed
1211        into a two-tuple of those positions.
1212        """
1213        self.assertEqual(self.resource._parseRangeHeader(b"bytes=2-5"), [(2, 5)])
1214
1215    def test_rangeWithSpace(self):
1216        """
1217        A single bytes range with whitespace in allowed places is parsed in
1218        the same way as it would be without the whitespace.
1219        """
1220        self.assertEqual(self.resource._parseRangeHeader(b" bytes=1-2 "), [(1, 2)])
1221        self.assertEqual(self.resource._parseRangeHeader(b"bytes =1-2 "), [(1, 2)])
1222        self.assertEqual(self.resource._parseRangeHeader(b"bytes= 1-2"), [(1, 2)])
1223        self.assertEqual(self.resource._parseRangeHeader(b"bytes=1 -2"), [(1, 2)])
1224        self.assertEqual(self.resource._parseRangeHeader(b"bytes=1- 2"), [(1, 2)])
1225        self.assertEqual(self.resource._parseRangeHeader(b"bytes=1-2 "), [(1, 2)])
1226
1227    def test_nullRangeElements(self):
1228        """
1229        If there are multiple byte ranges but only one is non-null, the
1230        non-null range is parsed and its start and stop returned.
1231        """
1232        self.assertEqual(
1233            self.resource._parseRangeHeader(b"bytes=1-2,\r\n, ,\t"), [(1, 2)]
1234        )
1235
1236    def test_multipleRanges(self):
1237        """
1238        If multiple byte ranges are specified their starts and stops are
1239        returned.
1240        """
1241        self.assertEqual(
1242            self.resource._parseRangeHeader(b"bytes=1-2,3-4"), [(1, 2), (3, 4)]
1243        )
1244
1245    def test_bodyLength(self):
1246        """
1247        A correct response to a range request is as long as the length of the
1248        requested range.
1249        """
1250        self.request.requestHeaders.addRawHeader(b"range", b"bytes=0-43")
1251        self.resource.render(self.request)
1252        self.assertEqual(len(b"".join(self.request.written)), 44)
1253
1254    def test_invalidRangeRequest(self):
1255        """
1256        An incorrect range request (RFC 2616 defines a correct range request as
1257        a Bytes-Unit followed by a '=' character followed by a specific range.
1258        Only 'bytes' is defined) results in the range header value being logged
1259        and a normal 200 response being sent.
1260        """
1261        range = b"foobar=0-43"
1262        self.request.requestHeaders.addRawHeader(b"range", range)
1263        self.resource.render(self.request)
1264        expected = f"Ignoring malformed Range header {range.decode()!r}"
1265        self._assertLogged(expected)
1266        self.assertEqual(b"".join(self.request.written), self.payload)
1267        self.assertEqual(self.request.responseCode, http.OK)
1268        self.assertEqual(
1269            self.request.responseHeaders.getRawHeaders(b"content-length")[0],
1270            b"%d" % (len(self.payload),),
1271        )
1272
1273    def parseMultipartBody(self, body, boundary):
1274        """
1275        Parse C{body} as a multipart MIME response separated by C{boundary}.
1276
1277        Note that this with fail the calling test on certain syntactic
1278        problems.
1279        """
1280        sep = b"\r\n--" + boundary
1281        parts = body.split(sep)
1282        self.assertEqual(b"", parts[0])
1283        self.assertEqual(b"--\r\n", parts[-1])
1284        parsed_parts = []
1285        for part in parts[1:-1]:
1286            before, header1, header2, blank, partBody = part.split(b"\r\n", 4)
1287            headers = header1 + b"\n" + header2
1288            self.assertEqual(b"", before)
1289            self.assertEqual(b"", blank)
1290            partContentTypeValue = re.search(
1291                b"^content-type: (.*)$", headers, re.I | re.M
1292            ).group(1)
1293            start, end, size = re.search(
1294                b"^content-range: bytes ([0-9]+)-([0-9]+)/([0-9]+)$",
1295                headers,
1296                re.I | re.M,
1297            ).groups()
1298            parsed_parts.append(
1299                {
1300                    b"contentType": partContentTypeValue,
1301                    b"contentRange": (start, end, size),
1302                    b"body": partBody,
1303                }
1304            )
1305        return parsed_parts
1306
1307    def test_multipleRangeRequest(self):
1308        """
1309        The response to a request for multiple bytes ranges is a MIME-ish
1310        multipart response.
1311        """
1312        startEnds = [(0, 2), (20, 30), (40, 50)]
1313        rangeHeaderValue = b",".join(
1314            [networkString(f"{s}-{e}") for (s, e) in startEnds]
1315        )
1316        self.request.requestHeaders.addRawHeader(b"range", b"bytes=" + rangeHeaderValue)
1317        self.resource.render(self.request)
1318        self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1319        boundary = re.match(
1320            b'^multipart/byteranges; boundary="(.*)"$',
1321            self.request.responseHeaders.getRawHeaders(b"content-type")[0],
1322        ).group(1)
1323        parts = self.parseMultipartBody(b"".join(self.request.written), boundary)
1324        self.assertEqual(len(startEnds), len(parts))
1325        for part, (s, e) in zip(parts, startEnds):
1326            self.assertEqual(networkString(self.resource.type), part[b"contentType"])
1327            start, end, size = part[b"contentRange"]
1328            self.assertEqual(int(start), s)
1329            self.assertEqual(int(end), e)
1330            self.assertEqual(int(size), self.resource.getFileSize())
1331            self.assertEqual(self.payload[s : e + 1], part[b"body"])
1332
1333    def test_multipleRangeRequestWithRangeOverlappingEnd(self):
1334        """
1335        The response to a request for multiple bytes ranges is a MIME-ish
1336        multipart response, even when one of the ranged falls off the end of
1337        the resource.
1338        """
1339        startEnds = [(0, 2), (40, len(self.payload) + 10)]
1340        rangeHeaderValue = b",".join(
1341            [networkString(f"{s}-{e}") for (s, e) in startEnds]
1342        )
1343        self.request.requestHeaders.addRawHeader(b"range", b"bytes=" + rangeHeaderValue)
1344        self.resource.render(self.request)
1345        self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1346        boundary = re.match(
1347            b'^multipart/byteranges; boundary="(.*)"$',
1348            self.request.responseHeaders.getRawHeaders(b"content-type")[0],
1349        ).group(1)
1350        parts = self.parseMultipartBody(b"".join(self.request.written), boundary)
1351        self.assertEqual(len(startEnds), len(parts))
1352        for part, (s, e) in zip(parts, startEnds):
1353            self.assertEqual(networkString(self.resource.type), part[b"contentType"])
1354            start, end, size = part[b"contentRange"]
1355            self.assertEqual(int(start), s)
1356            self.assertEqual(int(end), min(e, self.resource.getFileSize() - 1))
1357            self.assertEqual(int(size), self.resource.getFileSize())
1358            self.assertEqual(self.payload[s : e + 1], part[b"body"])
1359
1360    def test_implicitEnd(self):
1361        """
1362        If the end byte position is omitted, then it is treated as if the
1363        length of the resource was specified by the end byte position.
1364        """
1365        self.request.requestHeaders.addRawHeader(b"range", b"bytes=23-")
1366        self.resource.render(self.request)
1367        self.assertEqual(b"".join(self.request.written), self.payload[23:])
1368        self.assertEqual(len(b"".join(self.request.written)), 41)
1369        self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1370        self.assertEqual(
1371            self.request.responseHeaders.getRawHeaders(b"content-range")[0],
1372            b"bytes 23-63/64",
1373        )
1374        self.assertEqual(
1375            self.request.responseHeaders.getRawHeaders(b"content-length")[0], b"41"
1376        )
1377
1378    def test_implicitStart(self):
1379        """
1380        If the start byte position is omitted but the end byte position is
1381        supplied, then the range is treated as requesting the last -N bytes of
1382        the resource, where N is the end byte position.
1383        """
1384        self.request.requestHeaders.addRawHeader(b"range", b"bytes=-17")
1385        self.resource.render(self.request)
1386        self.assertEqual(b"".join(self.request.written), self.payload[-17:])
1387        self.assertEqual(len(b"".join(self.request.written)), 17)
1388        self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1389        self.assertEqual(
1390            self.request.responseHeaders.getRawHeaders(b"content-range")[0],
1391            b"bytes 47-63/64",
1392        )
1393        self.assertEqual(
1394            self.request.responseHeaders.getRawHeaders(b"content-length")[0], b"17"
1395        )
1396
1397    def test_explicitRange(self):
1398        """
1399        A correct response to a bytes range header request from A to B starts
1400        with the A'th byte and ends with (including) the B'th byte. The first
1401        byte of a page is numbered with 0.
1402        """
1403        self.request.requestHeaders.addRawHeader(b"range", b"bytes=3-43")
1404        self.resource.render(self.request)
1405        written = b"".join(self.request.written)
1406        self.assertEqual(written, self.payload[3:44])
1407        self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1408        self.assertEqual(
1409            self.request.responseHeaders.getRawHeaders(b"content-range")[0],
1410            b"bytes 3-43/64",
1411        )
1412        self.assertEqual(
1413            b"%d" % (len(written),),
1414            self.request.responseHeaders.getRawHeaders(b"content-length")[0],
1415        )
1416
1417    def test_explicitRangeOverlappingEnd(self):
1418        """
1419        A correct response to a bytes range header request from A to B when B
1420        is past the end of the resource starts with the A'th byte and ends
1421        with the last byte of the resource. The first byte of a page is
1422        numbered with 0.
1423        """
1424        self.request.requestHeaders.addRawHeader(b"range", b"bytes=40-100")
1425        self.resource.render(self.request)
1426        written = b"".join(self.request.written)
1427        self.assertEqual(written, self.payload[40:])
1428        self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1429        self.assertEqual(
1430            self.request.responseHeaders.getRawHeaders(b"content-range")[0],
1431            b"bytes 40-63/64",
1432        )
1433        self.assertEqual(
1434            b"%d" % (len(written),),
1435            self.request.responseHeaders.getRawHeaders(b"content-length")[0],
1436        )
1437
1438    def test_statusCodeRequestedRangeNotSatisfiable(self):
1439        """
1440        If a range is syntactically invalid due to the start being greater than
1441        the end, the range header is ignored (the request is responded to as if
1442        it were not present).
1443        """
1444        self.request.requestHeaders.addRawHeader(b"range", b"bytes=20-13")
1445        self.resource.render(self.request)
1446        self.assertEqual(self.request.responseCode, http.OK)
1447        self.assertEqual(b"".join(self.request.written), self.payload)
1448        self.assertEqual(
1449            self.request.responseHeaders.getRawHeaders(b"content-length")[0],
1450            b"%d" % (len(self.payload),),
1451        )
1452
1453    def test_invalidStartBytePos(self):
1454        """
1455        If a range is unsatisfiable due to the start not being less than the
1456        length of the resource, the response is 416 (Requested range not
1457        satisfiable) and no data is written to the response body (RFC 2616,
1458        section 14.35.1).
1459        """
1460        self.request.requestHeaders.addRawHeader(b"range", b"bytes=67-108")
1461        self.resource.render(self.request)
1462        self.assertEqual(
1463            self.request.responseCode, http.REQUESTED_RANGE_NOT_SATISFIABLE
1464        )
1465        self.assertEqual(b"".join(self.request.written), b"")
1466        self.assertEqual(
1467            self.request.responseHeaders.getRawHeaders(b"content-length")[0], b"0"
1468        )
1469        # Sections 10.4.17 and 14.16
1470        self.assertEqual(
1471            self.request.responseHeaders.getRawHeaders(b"content-range")[0],
1472            networkString("bytes */%d" % (len(self.payload),)),
1473        )
1474
1475
1476class DirectoryListerTests(TestCase):
1477    """
1478    Tests for L{static.DirectoryLister}.
1479    """
1480
1481    def _request(self, uri):
1482        request = DummyRequest([b""])
1483        request.uri = uri
1484        return request
1485
1486    def test_renderHeader(self):
1487        """
1488        L{static.DirectoryLister} prints the request uri as header of the
1489        rendered content.
1490        """
1491        path = FilePath(self.mktemp())
1492        path.makedirs()
1493
1494        lister = static.DirectoryLister(path.path)
1495        data = lister.render(self._request(b"foo"))
1496        self.assertIn(b"<h1>Directory listing for foo</h1>", data)
1497        self.assertIn(b"<title>Directory listing for foo</title>", data)
1498
1499    def test_renderUnquoteHeader(self):
1500        """
1501        L{static.DirectoryLister} unquote the request uri before printing it.
1502        """
1503        path = FilePath(self.mktemp())
1504        path.makedirs()
1505
1506        lister = static.DirectoryLister(path.path)
1507        data = lister.render(self._request(b"foo%20bar"))
1508        self.assertIn(b"<h1>Directory listing for foo bar</h1>", data)
1509        self.assertIn(b"<title>Directory listing for foo bar</title>", data)
1510
1511    def test_escapeHeader(self):
1512        """
1513        L{static.DirectoryLister} escape "&", "<" and ">" after unquoting the
1514        request uri.
1515        """
1516        path = FilePath(self.mktemp())
1517        path.makedirs()
1518
1519        lister = static.DirectoryLister(path.path)
1520        data = lister.render(self._request(b"foo%26bar"))
1521        self.assertIn(b"<h1>Directory listing for foo&amp;bar</h1>", data)
1522        self.assertIn(b"<title>Directory listing for foo&amp;bar</title>", data)
1523
1524    def test_renderFiles(self):
1525        """
1526        L{static.DirectoryLister} is able to list all the files inside a
1527        directory.
1528        """
1529        path = FilePath(self.mktemp())
1530        path.makedirs()
1531        path.child("file1").setContent(b"content1")
1532        path.child("file2").setContent(b"content2" * 1000)
1533
1534        lister = static.DirectoryLister(path.path)
1535        data = lister.render(self._request(b"foo"))
1536        body = b"""<tr class="odd">
1537    <td><a href="file1">file1</a></td>
1538    <td>8B</td>
1539    <td>[text/html]</td>
1540    <td></td>
1541</tr>
1542<tr class="even">
1543    <td><a href="file2">file2</a></td>
1544    <td>7K</td>
1545    <td>[text/html]</td>
1546    <td></td>
1547</tr>"""
1548        self.assertIn(body, data)
1549
1550    def test_renderDirectories(self):
1551        """
1552        L{static.DirectoryLister} is able to list all the directories inside
1553        a directory.
1554        """
1555        path = FilePath(self.mktemp())
1556        path.makedirs()
1557        path.child("dir1").makedirs()
1558        path.child("dir2 & 3").makedirs()
1559
1560        lister = static.DirectoryLister(path.path)
1561        data = lister.render(self._request(b"foo"))
1562        body = b"""<tr class="odd">
1563    <td><a href="dir1/">dir1/</a></td>
1564    <td></td>
1565    <td>[Directory]</td>
1566    <td></td>
1567</tr>
1568<tr class="even">
1569    <td><a href="dir2%20%26%203/">dir2 &amp; 3/</a></td>
1570    <td></td>
1571    <td>[Directory]</td>
1572    <td></td>
1573</tr>"""
1574        self.assertIn(body, data)
1575
1576    def test_renderFiltered(self):
1577        """
1578        L{static.DirectoryLister} takes an optional C{dirs} argument that
1579        filter out the list of directories and files printed.
1580        """
1581        path = FilePath(self.mktemp())
1582        path.makedirs()
1583        path.child("dir1").makedirs()
1584        path.child("dir2").makedirs()
1585        path.child("dir3").makedirs()
1586        lister = static.DirectoryLister(path.path, dirs=["dir1", "dir3"])
1587        data = lister.render(self._request(b"foo"))
1588        body = b"""<tr class="odd">
1589    <td><a href="dir1/">dir1/</a></td>
1590    <td></td>
1591    <td>[Directory]</td>
1592    <td></td>
1593</tr>
1594<tr class="even">
1595    <td><a href="dir3/">dir3/</a></td>
1596    <td></td>
1597    <td>[Directory]</td>
1598    <td></td>
1599</tr>"""
1600        self.assertIn(body, data)
1601
1602    def test_oddAndEven(self):
1603        """
1604        L{static.DirectoryLister} gives an alternate class for each odd and
1605        even rows in the table.
1606        """
1607        lister = static.DirectoryLister(None)
1608        elements = [
1609            {"href": "", "text": "", "size": "", "type": "", "encoding": ""}
1610            for i in range(5)
1611        ]
1612        content = lister._buildTableContent(elements)
1613
1614        self.assertEqual(len(content), 5)
1615        self.assertTrue(content[0].startswith('<tr class="odd">'))
1616        self.assertTrue(content[1].startswith('<tr class="even">'))
1617        self.assertTrue(content[2].startswith('<tr class="odd">'))
1618        self.assertTrue(content[3].startswith('<tr class="even">'))
1619        self.assertTrue(content[4].startswith('<tr class="odd">'))
1620
1621    def test_contentType(self):
1622        """
1623        L{static.DirectoryLister} produces a MIME-type that indicates that it is
1624        HTML, and includes its charset (UTF-8).
1625        """
1626        path = FilePath(self.mktemp())
1627        path.makedirs()
1628        lister = static.DirectoryLister(path.path)
1629        req = self._request(b"")
1630        lister.render(req)
1631        self.assertEqual(
1632            req.responseHeaders.getRawHeaders(b"content-type")[0],
1633            b"text/html; charset=utf-8",
1634        )
1635
1636    def test_mimeTypeAndEncodings(self):
1637        """
1638        L{static.DirectoryLister} is able to detect mimetype and encoding of
1639        listed files.
1640        """
1641        path = FilePath(self.mktemp())
1642        path.makedirs()
1643        path.child("file1.txt").setContent(b"file1")
1644        path.child("file2.py").setContent(b"python")
1645        path.child("file3.conf.gz").setContent(b"conf compressed")
1646        path.child("file4.diff.bz2").setContent(b"diff compressed")
1647        directory = os.listdir(path.path)
1648        directory.sort()
1649
1650        contentTypes = {
1651            ".txt": "text/plain",
1652            ".py": "text/python",
1653            ".conf": "text/configuration",
1654            ".diff": "text/diff",
1655        }
1656
1657        lister = static.DirectoryLister(path.path, contentTypes=contentTypes)
1658        dirs, files = lister._getFilesAndDirectories(directory)
1659        self.assertEqual(dirs, [])
1660        self.assertEqual(
1661            files,
1662            [
1663                {
1664                    "encoding": "",
1665                    "href": "file1.txt",
1666                    "size": "5B",
1667                    "text": "file1.txt",
1668                    "type": "[text/plain]",
1669                },
1670                {
1671                    "encoding": "",
1672                    "href": "file2.py",
1673                    "size": "6B",
1674                    "text": "file2.py",
1675                    "type": "[text/python]",
1676                },
1677                {
1678                    "encoding": "[gzip]",
1679                    "href": "file3.conf.gz",
1680                    "size": "15B",
1681                    "text": "file3.conf.gz",
1682                    "type": "[text/configuration]",
1683                },
1684                {
1685                    "encoding": "[bzip2]",
1686                    "href": "file4.diff.bz2",
1687                    "size": "15B",
1688                    "text": "file4.diff.bz2",
1689                    "type": "[text/diff]",
1690                },
1691            ],
1692        )
1693
1694    @skipIf(not platform._supportsSymlinks(), "No symlink support")
1695    def test_brokenSymlink(self):
1696        """
1697        If on the file in the listing points to a broken symlink, it should not
1698        be returned by L{static.DirectoryLister._getFilesAndDirectories}.
1699        """
1700        path = FilePath(self.mktemp())
1701        path.makedirs()
1702        file1 = path.child("file1")
1703        file1.setContent(b"file1")
1704        file1.linkTo(path.child("file2"))
1705        file1.remove()
1706
1707        lister = static.DirectoryLister(path.path)
1708        directory = os.listdir(path.path)
1709        directory.sort()
1710        dirs, files = lister._getFilesAndDirectories(directory)
1711        self.assertEqual(dirs, [])
1712        self.assertEqual(files, [])
1713
1714    def test_childrenNotFound(self):
1715        """
1716        Any child resource of L{static.DirectoryLister} renders an HTTP
1717        I{NOT FOUND} response code.
1718        """
1719        path = FilePath(self.mktemp())
1720        path.makedirs()
1721        lister = static.DirectoryLister(path.path)
1722        request = self._request(b"")
1723        child = resource.getChildForRequest(lister, request)
1724        result = _render(child, request)
1725
1726        def cbRendered(ignored):
1727            self.assertEqual(request.responseCode, http.NOT_FOUND)
1728
1729        result.addCallback(cbRendered)
1730        return result
1731
1732    def test_repr(self):
1733        """
1734        L{static.DirectoryLister.__repr__} gives the path of the lister.
1735        """
1736        path = FilePath(self.mktemp())
1737        lister = static.DirectoryLister(path.path)
1738        self.assertEqual(repr(lister), f"<DirectoryLister of {path.path!r}>")
1739        self.assertEqual(str(lister), f"<DirectoryLister of {path.path!r}>")
1740
1741    def test_formatFileSize(self):
1742        """
1743        L{static.formatFileSize} format an amount of bytes into a more readable
1744        format.
1745        """
1746        self.assertEqual(static.formatFileSize(0), "0B")
1747        self.assertEqual(static.formatFileSize(123), "123B")
1748        self.assertEqual(static.formatFileSize(4567), "4K")
1749        self.assertEqual(static.formatFileSize(8900000), "8M")
1750        self.assertEqual(static.formatFileSize(1234000000), "1G")
1751        self.assertEqual(static.formatFileSize(1234567890000), "1149G")
1752
1753
1754class LoadMimeTypesTests(TestCase):
1755    """
1756    Tests for the MIME type loading routine.
1757
1758    @cvar UNSET: A sentinel to signify that C{self.paths} has not been set by
1759        the mock init.
1760    """
1761
1762    UNSET = object()
1763
1764    def setUp(self):
1765        self.paths = self.UNSET
1766
1767    def _fakeInit(self, paths):
1768        """
1769        A mock L{mimetypes.init} that records the value of the passed C{paths}
1770        argument.
1771
1772        @param paths: The paths that will be recorded.
1773        """
1774        self.paths = paths
1775
1776    def test_defaultArgumentIsNone(self):
1777        """
1778        By default, L{None} is passed to C{mimetypes.init}.
1779        """
1780        static.loadMimeTypes(init=self._fakeInit)
1781        self.assertIdentical(self.paths, None)
1782
1783    def test_extraLocationsWork(self):
1784        """
1785        Passed MIME type files are passed to C{mimetypes.init}.
1786        """
1787        paths = ["x", "y", "z"]
1788        static.loadMimeTypes(paths, init=self._fakeInit)
1789        self.assertIdentical(self.paths, paths)
1790
1791    def test_usesGlobalInitFunction(self):
1792        """
1793        By default, C{mimetypes.init} is called.
1794        """
1795        # Checking mimetypes.inited doesn't always work, because
1796        # something, somewhere, calls mimetypes.init. Yay global
1797        # mutable state :)
1798        if getattr(inspect, "signature", None):
1799            signature = inspect.signature(static.loadMimeTypes)
1800            self.assertIs(signature.parameters["init"].default, mimetypes.init)
1801        else:
1802            args, _, _, defaults = inspect.getargspec(static.loadMimeTypes)
1803            defaultInit = defaults[args.index("init")]
1804            self.assertIs(defaultInit, mimetypes.init)
1805
1806
1807class StaticDeprecationTests(TestCase):
1808    def test_addSlashDeprecated(self):
1809        """
1810        L{twisted.web.static.addSlash} is deprecated.
1811        """
1812        from twisted.web.static import addSlash
1813
1814        addSlash(DummyRequest([b""]))
1815
1816        warnings = self.flushWarnings([self.test_addSlashDeprecated])
1817        self.assertEqual(len(warnings), 1)
1818        self.assertEqual(
1819            warnings[0]["message"],
1820            "twisted.web.static.addSlash was deprecated in Twisted 16.0.0",
1821        )
1822