1# coding: utf-8
2"""Tests for managing HTTP issues (malformed requests, etc)."""
3
4import errno
5import mimetypes
6import socket
7import sys
8from unittest import mock
9import urllib.parse
10from http.client import HTTPConnection
11
12import cherrypy
13from cherrypy._cpcompat import HTTPSConnection
14
15from cherrypy.test import helper
16
17
18def is_ascii(text):
19    """
20    Return True if the text encodes as ascii.
21    """
22    try:
23        text.encode('ascii')
24        return True
25    except Exception:
26        pass
27    return False
28
29
30def encode_filename(filename):
31    """
32    Given a filename to be used in a multipart/form-data,
33    encode the name. Return the key and encoded filename.
34    """
35    if is_ascii(filename):
36        return 'filename', '"{filename}"'.format(**locals())
37    encoded = urllib.parse.quote(filename, encoding='utf-8')
38    return 'filename*', "'".join((
39        'UTF-8',
40        '',  # lang
41        encoded,
42    ))
43
44
45def encode_multipart_formdata(files):
46    """Return (content_type, body) ready for httplib.HTTP instance.
47
48    files: a sequence of (name, filename, value) tuples for multipart uploads.
49    filename can be a string or a tuple ('filename string', 'encoding')
50    """
51    BOUNDARY = '________ThIs_Is_tHe_bouNdaRY_$'
52    L = []
53    for key, filename, value in files:
54        L.append('--' + BOUNDARY)
55
56        fn_key, encoded = encode_filename(filename)
57        tmpl = \
58            'Content-Disposition: form-data; name="{key}"; {fn_key}={encoded}'
59        L.append(tmpl.format(**locals()))
60        ct = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
61        L.append('Content-Type: %s' % ct)
62        L.append('')
63        L.append(value)
64    L.append('--' + BOUNDARY + '--')
65    L.append('')
66    body = '\r\n'.join(L)
67    content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
68    return content_type, body
69
70
71class HTTPTests(helper.CPWebCase):
72
73    def make_connection(self):
74        if self.scheme == 'https':
75            return HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
76        else:
77            return HTTPConnection('%s:%s' % (self.interface(), self.PORT))
78
79    @staticmethod
80    def setup_server():
81        class Root:
82
83            @cherrypy.expose
84            def index(self, *args, **kwargs):
85                return 'Hello world!'
86
87            @cherrypy.expose
88            @cherrypy.config(**{'request.process_request_body': False})
89            def no_body(self, *args, **kwargs):
90                return 'Hello world!'
91
92            @cherrypy.expose
93            def post_multipart(self, file):
94                """Return a summary ("a * 65536\nb * 65536") of the uploaded
95                file.
96                """
97                contents = file.file.read()
98                summary = []
99                curchar = None
100                count = 0
101                for c in contents:
102                    if c == curchar:
103                        count += 1
104                    else:
105                        if count:
106                            curchar = chr(curchar)
107                            summary.append('%s * %d' % (curchar, count))
108                        count = 1
109                        curchar = c
110                if count:
111                    curchar = chr(curchar)
112                    summary.append('%s * %d' % (curchar, count))
113                return ', '.join(summary)
114
115            @cherrypy.expose
116            def post_filename(self, myfile):
117                '''Return the name of the file which was uploaded.'''
118                return myfile.filename
119
120        cherrypy.tree.mount(Root())
121        cherrypy.config.update({'server.max_request_body_size': 30000000})
122
123    def test_no_content_length(self):
124        # "The presence of a message-body in a request is signaled by the
125        # inclusion of a Content-Length or Transfer-Encoding header field in
126        # the request's message-headers."
127        #
128        # Send a message with neither header and no body. Even though
129        # the request is of method POST, this should be OK because we set
130        # request.process_request_body to False for our handler.
131        c = self.make_connection()
132        c.request('POST', '/no_body')
133        response = c.getresponse()
134        self.body = response.fp.read()
135        self.status = str(response.status)
136        self.assertStatus(200)
137        self.assertBody(b'Hello world!')
138
139        # Now send a message that has no Content-Length, but does send a body.
140        # Verify that CP times out the socket and responds
141        # with 411 Length Required.
142        if self.scheme == 'https':
143            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
144        else:
145            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
146
147        # `_get_content_length` is needed for Python 3.6+
148        with mock.patch.object(
149                c,
150                '_get_content_length',
151                lambda body, method: None,
152                create=True):
153            # `_set_content_length` is needed for Python 2.7-3.5
154            with mock.patch.object(c, '_set_content_length', create=True):
155                c.request('POST', '/')
156
157        response = c.getresponse()
158        self.body = response.fp.read()
159        self.status = str(response.status)
160        self.assertStatus(411)
161
162    def test_post_multipart(self):
163        alphabet = 'abcdefghijklmnopqrstuvwxyz'
164        # generate file contents for a large post
165        contents = ''.join([c * 65536 for c in alphabet])
166
167        # encode as multipart form data
168        files = [('file', 'file.txt', contents)]
169        content_type, body = encode_multipart_formdata(files)
170        body = body.encode('Latin-1')
171
172        # post file
173        c = self.make_connection()
174        c.putrequest('POST', '/post_multipart')
175        c.putheader('Content-Type', content_type)
176        c.putheader('Content-Length', str(len(body)))
177        c.endheaders()
178        c.send(body)
179
180        response = c.getresponse()
181        self.body = response.fp.read()
182        self.status = str(response.status)
183        self.assertStatus(200)
184        parts = ['%s * 65536' % ch for ch in alphabet]
185        self.assertBody(', '.join(parts))
186
187    def test_post_filename_with_special_characters(self):
188        """Testing that we can handle filenames with special characters.
189
190        This was reported as a bug in:
191
192        * https://github.com/cherrypy/cherrypy/issues/1146/
193        * https://github.com/cherrypy/cherrypy/issues/1397/
194        * https://github.com/cherrypy/cherrypy/issues/1694/
195        """
196        # We'll upload a bunch of files with differing names.
197        fnames = [
198            'boop.csv', 'foo, bar.csv', 'bar, xxxx.csv', 'file"name.csv',
199            'file;name.csv', 'file; name.csv', u'test_łóąä.txt',
200        ]
201        for fname in fnames:
202            files = [('myfile', fname, 'yunyeenyunyue')]
203            content_type, body = encode_multipart_formdata(files)
204            body = body.encode('Latin-1')
205
206            # post file
207            c = self.make_connection()
208            c.putrequest('POST', '/post_filename')
209            c.putheader('Content-Type', content_type)
210            c.putheader('Content-Length', str(len(body)))
211            c.endheaders()
212            c.send(body)
213
214            response = c.getresponse()
215            self.body = response.fp.read()
216            self.status = str(response.status)
217            self.assertStatus(200)
218            self.assertBody(fname)
219
220    def test_malformed_request_line(self):
221        if getattr(cherrypy.server, 'using_apache', False):
222            return self.skip('skipped due to known Apache differences...')
223
224        # Test missing version in Request-Line
225        c = self.make_connection()
226        c._output(b'geT /')
227        c._send_output()
228        if hasattr(c, 'strict'):
229            response = c.response_class(c.sock, strict=c.strict, method='GET')
230        else:
231            # Python 3.2 removed the 'strict' feature, saying:
232            # "http.client now always assumes HTTP/1.x compliant servers."
233            response = c.response_class(c.sock, method='GET')
234        response.begin()
235        self.assertEqual(response.status, 400)
236        self.assertEqual(response.fp.read(22), b'Malformed Request-Line')
237        c.close()
238
239    def test_request_line_split_issue_1220(self):
240        params = {
241            'intervenant-entreprise-evenement_classaction':
242                'evenement-mailremerciements',
243            '_path': 'intervenant-entreprise-evenement',
244            'intervenant-entreprise-evenement_action-id': 19404,
245            'intervenant-entreprise-evenement_id': 19404,
246            'intervenant-entreprise_id': 28092,
247        }
248        Request_URI = '/index?' + urllib.parse.urlencode(params)
249        self.assertEqual(len('GET %s HTTP/1.1\r\n' % Request_URI), 256)
250        self.getPage(Request_URI)
251        self.assertBody('Hello world!')
252
253    def test_malformed_header(self):
254        c = self.make_connection()
255        c.putrequest('GET', '/')
256        c.putheader('Content-Type', 'text/plain')
257        # See https://github.com/cherrypy/cherrypy/issues/941
258        c._output(b're, 1.2.3.4#015#012')
259        c.endheaders()
260
261        response = c.getresponse()
262        self.status = str(response.status)
263        self.assertStatus(400)
264        self.body = response.fp.read(20)
265        self.assertBody('Illegal header line.')
266
267    def test_http_over_https(self):
268        if self.scheme != 'https':
269            return self.skip('skipped (not running HTTPS)... ')
270
271        # Try connecting without SSL.
272        conn = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
273        conn.putrequest('GET', '/', skip_host=True)
274        conn.putheader('Host', self.HOST)
275        conn.endheaders()
276        response = conn.response_class(conn.sock, method='GET')
277        try:
278            response.begin()
279            self.assertEqual(response.status, 400)
280            self.body = response.read()
281            self.assertBody('The client sent a plain HTTP request, but this '
282                            'server only speaks HTTPS on this port.')
283        except socket.error:
284            e = sys.exc_info()[1]
285            # "Connection reset by peer" is also acceptable.
286            if e.errno != errno.ECONNRESET:
287                raise
288
289    def test_garbage_in(self):
290        # Connect without SSL regardless of server.scheme
291        c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
292        c._output(b'gjkgjklsgjklsgjkljklsg')
293        c._send_output()
294        response = c.response_class(c.sock, method='GET')
295        try:
296            response.begin()
297            self.assertEqual(response.status, 400)
298            self.assertEqual(response.fp.read(22),
299                             b'Malformed Request-Line')
300            c.close()
301        except socket.error:
302            e = sys.exc_info()[1]
303            # "Connection reset by peer" is also acceptable.
304            if e.errno != errno.ECONNRESET:
305                raise
306