1# coding: utf-8
2"""Tests for managing HTTP issues (malformed requests, etc)."""
3
4import errno
5import mimetypes
6import socket
7import sys
8from unittest import mock
9
10import six
11from six.moves.http_client import HTTPConnection
12from six.moves import urllib
13
14import cherrypy
15from cherrypy._cpcompat import HTTPSConnection, quote
16
17from cherrypy.test import helper
18
19
20def is_ascii(text):
21    """
22    Return True if the text encodes as ascii.
23    """
24    try:
25        text.encode('ascii')
26        return True
27    except Exception:
28        pass
29    return False
30
31
32def encode_filename(filename):
33    """
34    Given a filename to be used in a multipart/form-data,
35    encode the name. Return the key and encoded filename.
36    """
37    if is_ascii(filename):
38        return 'filename', '"{filename}"'.format(**locals())
39    encoded = quote(filename, encoding='utf-8')
40    return 'filename*', "'".join((
41        'UTF-8',
42        '',  # lang
43        encoded,
44    ))
45
46
47def encode_multipart_formdata(files):
48    """Return (content_type, body) ready for httplib.HTTP instance.
49
50    files: a sequence of (name, filename, value) tuples for multipart uploads.
51    filename can be a string or a tuple ('filename string', 'encoding')
52    """
53    BOUNDARY = '________ThIs_Is_tHe_bouNdaRY_$'
54    L = []
55    for key, filename, value in files:
56        L.append('--' + BOUNDARY)
57
58        fn_key, encoded = encode_filename(filename)
59        tmpl = \
60            'Content-Disposition: form-data; name="{key}"; {fn_key}={encoded}'
61        L.append(tmpl.format(**locals()))
62        ct = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
63        L.append('Content-Type: %s' % ct)
64        L.append('')
65        L.append(value)
66    L.append('--' + BOUNDARY + '--')
67    L.append('')
68    body = '\r\n'.join(L)
69    content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
70    return content_type, body
71
72
73class HTTPTests(helper.CPWebCase):
74
75    def make_connection(self):
76        if self.scheme == 'https':
77            return HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
78        else:
79            return HTTPConnection('%s:%s' % (self.interface(), self.PORT))
80
81    @staticmethod
82    def setup_server():
83        class Root:
84
85            @cherrypy.expose
86            def index(self, *args, **kwargs):
87                return 'Hello world!'
88
89            @cherrypy.expose
90            @cherrypy.config(**{'request.process_request_body': False})
91            def no_body(self, *args, **kwargs):
92                return 'Hello world!'
93
94            @cherrypy.expose
95            def post_multipart(self, file):
96                """Return a summary ("a * 65536\nb * 65536") of the uploaded
97                file.
98                """
99                contents = file.file.read()
100                summary = []
101                curchar = None
102                count = 0
103                for c in contents:
104                    if c == curchar:
105                        count += 1
106                    else:
107                        if count:
108                            if six.PY3:
109                                curchar = chr(curchar)
110                            summary.append('%s * %d' % (curchar, count))
111                        count = 1
112                        curchar = c
113                if count:
114                    if six.PY3:
115                        curchar = chr(curchar)
116                    summary.append('%s * %d' % (curchar, count))
117                return ', '.join(summary)
118
119            @cherrypy.expose
120            def post_filename(self, myfile):
121                '''Return the name of the file which was uploaded.'''
122                return myfile.filename
123
124        cherrypy.tree.mount(Root())
125        cherrypy.config.update({'server.max_request_body_size': 30000000})
126
127    def test_no_content_length(self):
128        # "The presence of a message-body in a request is signaled by the
129        # inclusion of a Content-Length or Transfer-Encoding header field in
130        # the request's message-headers."
131        #
132        # Send a message with neither header and no body. Even though
133        # the request is of method POST, this should be OK because we set
134        # request.process_request_body to False for our handler.
135        c = self.make_connection()
136        c.request('POST', '/no_body')
137        response = c.getresponse()
138        self.body = response.fp.read()
139        self.status = str(response.status)
140        self.assertStatus(200)
141        self.assertBody(b'Hello world!')
142
143        # Now send a message that has no Content-Length, but does send a body.
144        # Verify that CP times out the socket and responds
145        # with 411 Length Required.
146        if self.scheme == 'https':
147            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
148        else:
149            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
150
151        # `_get_content_length` is needed for Python 3.6+
152        with mock.patch.object(
153                c,
154                '_get_content_length',
155                lambda body, method: None,
156                create=True):
157            # `_set_content_length` is needed for Python 2.7-3.5
158            with mock.patch.object(c, '_set_content_length', create=True):
159                c.request('POST', '/')
160
161        response = c.getresponse()
162        self.body = response.fp.read()
163        self.status = str(response.status)
164        self.assertStatus(411)
165
166    def test_post_multipart(self):
167        alphabet = 'abcdefghijklmnopqrstuvwxyz'
168        # generate file contents for a large post
169        contents = ''.join([c * 65536 for c in alphabet])
170
171        # encode as multipart form data
172        files = [('file', 'file.txt', contents)]
173        content_type, body = encode_multipart_formdata(files)
174        body = body.encode('Latin-1')
175
176        # post file
177        c = self.make_connection()
178        c.putrequest('POST', '/post_multipart')
179        c.putheader('Content-Type', content_type)
180        c.putheader('Content-Length', str(len(body)))
181        c.endheaders()
182        c.send(body)
183
184        response = c.getresponse()
185        self.body = response.fp.read()
186        self.status = str(response.status)
187        self.assertStatus(200)
188        parts = ['%s * 65536' % ch for ch in alphabet]
189        self.assertBody(', '.join(parts))
190
191    def test_post_filename_with_special_characters(self):
192        '''Testing that we can handle filenames with special characters. This
193        was reported as a bug in:
194           https://github.com/cherrypy/cherrypy/issues/1146/
195           https://github.com/cherrypy/cherrypy/issues/1397/
196           https://github.com/cherrypy/cherrypy/issues/1694/
197        '''
198        # We'll upload a bunch of files with differing names.
199        fnames = [
200            'boop.csv', 'foo, bar.csv', 'bar, xxxx.csv', 'file"name.csv',
201            'file;name.csv', 'file; name.csv', u'test_łóąä.txt',
202        ]
203        for fname in fnames:
204            files = [('myfile', fname, 'yunyeenyunyue')]
205            content_type, body = encode_multipart_formdata(files)
206            body = body.encode('Latin-1')
207
208            # post file
209            c = self.make_connection()
210            c.putrequest('POST', '/post_filename')
211            c.putheader('Content-Type', content_type)
212            c.putheader('Content-Length', str(len(body)))
213            c.endheaders()
214            c.send(body)
215
216            response = c.getresponse()
217            self.body = response.fp.read()
218            self.status = str(response.status)
219            self.assertStatus(200)
220            self.assertBody(fname)
221
222    def test_malformed_request_line(self):
223        if getattr(cherrypy.server, 'using_apache', False):
224            return self.skip('skipped due to known Apache differences...')
225
226        # Test missing version in Request-Line
227        c = self.make_connection()
228        c._output(b'geT /')
229        c._send_output()
230        if hasattr(c, 'strict'):
231            response = c.response_class(c.sock, strict=c.strict, method='GET')
232        else:
233            # Python 3.2 removed the 'strict' feature, saying:
234            # "http.client now always assumes HTTP/1.x compliant servers."
235            response = c.response_class(c.sock, method='GET')
236        response.begin()
237        self.assertEqual(response.status, 400)
238        self.assertEqual(response.fp.read(22), b'Malformed Request-Line')
239        c.close()
240
241    def test_request_line_split_issue_1220(self):
242        params = {
243            'intervenant-entreprise-evenement_classaction':
244                'evenement-mailremerciements',
245            '_path': 'intervenant-entreprise-evenement',
246            'intervenant-entreprise-evenement_action-id': 19404,
247            'intervenant-entreprise-evenement_id': 19404,
248            'intervenant-entreprise_id': 28092,
249        }
250        Request_URI = '/index?' + urllib.parse.urlencode(params)
251        self.assertEqual(len('GET %s HTTP/1.1\r\n' % Request_URI), 256)
252        self.getPage(Request_URI)
253        self.assertBody('Hello world!')
254
255    def test_malformed_header(self):
256        c = self.make_connection()
257        c.putrequest('GET', '/')
258        c.putheader('Content-Type', 'text/plain')
259        # See https://github.com/cherrypy/cherrypy/issues/941
260        c._output(b're, 1.2.3.4#015#012')
261        c.endheaders()
262
263        response = c.getresponse()
264        self.status = str(response.status)
265        self.assertStatus(400)
266        self.body = response.fp.read(20)
267        self.assertBody('Illegal header line.')
268
269    def test_http_over_https(self):
270        if self.scheme != 'https':
271            return self.skip('skipped (not running HTTPS)... ')
272
273        # Try connecting without SSL.
274        conn = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
275        conn.putrequest('GET', '/', skip_host=True)
276        conn.putheader('Host', self.HOST)
277        conn.endheaders()
278        response = conn.response_class(conn.sock, method='GET')
279        try:
280            response.begin()
281            self.assertEqual(response.status, 400)
282            self.body = response.read()
283            self.assertBody('The client sent a plain HTTP request, but this '
284                            'server only speaks HTTPS on this port.')
285        except socket.error:
286            e = sys.exc_info()[1]
287            # "Connection reset by peer" is also acceptable.
288            if e.errno != errno.ECONNRESET:
289                raise
290
291    def test_garbage_in(self):
292        # Connect without SSL regardless of server.scheme
293        c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
294        c._output(b'gjkgjklsgjklsgjkljklsg')
295        c._send_output()
296        response = c.response_class(c.sock, method='GET')
297        try:
298            response.begin()
299            self.assertEqual(response.status, 400)
300            self.assertEqual(response.fp.read(22),
301                             b'Malformed Request-Line')
302            c.close()
303        except socket.error:
304            e = sys.exc_info()[1]
305            # "Connection reset by peer" is also acceptable.
306            if e.errno != errno.ECONNRESET:
307                raise
308