1# coding: utf-8 2"""Tests for managing HTTP issues (malformed requests, etc).""" 3 4import errno 5import mimetypes 6import socket 7import sys 8from unittest import mock 9import urllib.parse 10from http.client import HTTPConnection 11 12import cherrypy 13from cherrypy._cpcompat import HTTPSConnection 14 15from cherrypy.test import helper 16 17 18def is_ascii(text): 19 """ 20 Return True if the text encodes as ascii. 21 """ 22 try: 23 text.encode('ascii') 24 return True 25 except Exception: 26 pass 27 return False 28 29 30def encode_filename(filename): 31 """ 32 Given a filename to be used in a multipart/form-data, 33 encode the name. Return the key and encoded filename. 34 """ 35 if is_ascii(filename): 36 return 'filename', '"{filename}"'.format(**locals()) 37 encoded = urllib.parse.quote(filename, encoding='utf-8') 38 return 'filename*', "'".join(( 39 'UTF-8', 40 '', # lang 41 encoded, 42 )) 43 44 45def encode_multipart_formdata(files): 46 """Return (content_type, body) ready for httplib.HTTP instance. 47 48 files: a sequence of (name, filename, value) tuples for multipart uploads. 49 filename can be a string or a tuple ('filename string', 'encoding') 50 """ 51 BOUNDARY = '________ThIs_Is_tHe_bouNdaRY_$' 52 L = [] 53 for key, filename, value in files: 54 L.append('--' + BOUNDARY) 55 56 fn_key, encoded = encode_filename(filename) 57 tmpl = \ 58 'Content-Disposition: form-data; name="{key}"; {fn_key}={encoded}' 59 L.append(tmpl.format(**locals())) 60 ct = mimetypes.guess_type(filename)[0] or 'application/octet-stream' 61 L.append('Content-Type: %s' % ct) 62 L.append('') 63 L.append(value) 64 L.append('--' + BOUNDARY + '--') 65 L.append('') 66 body = '\r\n'.join(L) 67 content_type = 'multipart/form-data; boundary=%s' % BOUNDARY 68 return content_type, body 69 70 71class HTTPTests(helper.CPWebCase): 72 73 def make_connection(self): 74 if self.scheme == 'https': 75 return HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 76 else: 77 return HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 78 79 @staticmethod 80 def setup_server(): 81 class Root: 82 83 @cherrypy.expose 84 def index(self, *args, **kwargs): 85 return 'Hello world!' 86 87 @cherrypy.expose 88 @cherrypy.config(**{'request.process_request_body': False}) 89 def no_body(self, *args, **kwargs): 90 return 'Hello world!' 91 92 @cherrypy.expose 93 def post_multipart(self, file): 94 """Return a summary ("a * 65536\nb * 65536") of the uploaded 95 file. 96 """ 97 contents = file.file.read() 98 summary = [] 99 curchar = None 100 count = 0 101 for c in contents: 102 if c == curchar: 103 count += 1 104 else: 105 if count: 106 curchar = chr(curchar) 107 summary.append('%s * %d' % (curchar, count)) 108 count = 1 109 curchar = c 110 if count: 111 curchar = chr(curchar) 112 summary.append('%s * %d' % (curchar, count)) 113 return ', '.join(summary) 114 115 @cherrypy.expose 116 def post_filename(self, myfile): 117 '''Return the name of the file which was uploaded.''' 118 return myfile.filename 119 120 cherrypy.tree.mount(Root()) 121 cherrypy.config.update({'server.max_request_body_size': 30000000}) 122 123 def test_no_content_length(self): 124 # "The presence of a message-body in a request is signaled by the 125 # inclusion of a Content-Length or Transfer-Encoding header field in 126 # the request's message-headers." 127 # 128 # Send a message with neither header and no body. Even though 129 # the request is of method POST, this should be OK because we set 130 # request.process_request_body to False for our handler. 131 c = self.make_connection() 132 c.request('POST', '/no_body') 133 response = c.getresponse() 134 self.body = response.fp.read() 135 self.status = str(response.status) 136 self.assertStatus(200) 137 self.assertBody(b'Hello world!') 138 139 # Now send a message that has no Content-Length, but does send a body. 140 # Verify that CP times out the socket and responds 141 # with 411 Length Required. 142 if self.scheme == 'https': 143 c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 144 else: 145 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 146 147 # `_get_content_length` is needed for Python 3.6+ 148 with mock.patch.object( 149 c, 150 '_get_content_length', 151 lambda body, method: None, 152 create=True): 153 # `_set_content_length` is needed for Python 2.7-3.5 154 with mock.patch.object(c, '_set_content_length', create=True): 155 c.request('POST', '/') 156 157 response = c.getresponse() 158 self.body = response.fp.read() 159 self.status = str(response.status) 160 self.assertStatus(411) 161 162 def test_post_multipart(self): 163 alphabet = 'abcdefghijklmnopqrstuvwxyz' 164 # generate file contents for a large post 165 contents = ''.join([c * 65536 for c in alphabet]) 166 167 # encode as multipart form data 168 files = [('file', 'file.txt', contents)] 169 content_type, body = encode_multipart_formdata(files) 170 body = body.encode('Latin-1') 171 172 # post file 173 c = self.make_connection() 174 c.putrequest('POST', '/post_multipart') 175 c.putheader('Content-Type', content_type) 176 c.putheader('Content-Length', str(len(body))) 177 c.endheaders() 178 c.send(body) 179 180 response = c.getresponse() 181 self.body = response.fp.read() 182 self.status = str(response.status) 183 self.assertStatus(200) 184 parts = ['%s * 65536' % ch for ch in alphabet] 185 self.assertBody(', '.join(parts)) 186 187 def test_post_filename_with_special_characters(self): 188 """Testing that we can handle filenames with special characters. 189 190 This was reported as a bug in: 191 192 * https://github.com/cherrypy/cherrypy/issues/1146/ 193 * https://github.com/cherrypy/cherrypy/issues/1397/ 194 * https://github.com/cherrypy/cherrypy/issues/1694/ 195 """ 196 # We'll upload a bunch of files with differing names. 197 fnames = [ 198 'boop.csv', 'foo, bar.csv', 'bar, xxxx.csv', 'file"name.csv', 199 'file;name.csv', 'file; name.csv', u'test_łóąä.txt', 200 ] 201 for fname in fnames: 202 files = [('myfile', fname, 'yunyeenyunyue')] 203 content_type, body = encode_multipart_formdata(files) 204 body = body.encode('Latin-1') 205 206 # post file 207 c = self.make_connection() 208 c.putrequest('POST', '/post_filename') 209 c.putheader('Content-Type', content_type) 210 c.putheader('Content-Length', str(len(body))) 211 c.endheaders() 212 c.send(body) 213 214 response = c.getresponse() 215 self.body = response.fp.read() 216 self.status = str(response.status) 217 self.assertStatus(200) 218 self.assertBody(fname) 219 220 def test_malformed_request_line(self): 221 if getattr(cherrypy.server, 'using_apache', False): 222 return self.skip('skipped due to known Apache differences...') 223 224 # Test missing version in Request-Line 225 c = self.make_connection() 226 c._output(b'geT /') 227 c._send_output() 228 if hasattr(c, 'strict'): 229 response = c.response_class(c.sock, strict=c.strict, method='GET') 230 else: 231 # Python 3.2 removed the 'strict' feature, saying: 232 # "http.client now always assumes HTTP/1.x compliant servers." 233 response = c.response_class(c.sock, method='GET') 234 response.begin() 235 self.assertEqual(response.status, 400) 236 self.assertEqual(response.fp.read(22), b'Malformed Request-Line') 237 c.close() 238 239 def test_request_line_split_issue_1220(self): 240 params = { 241 'intervenant-entreprise-evenement_classaction': 242 'evenement-mailremerciements', 243 '_path': 'intervenant-entreprise-evenement', 244 'intervenant-entreprise-evenement_action-id': 19404, 245 'intervenant-entreprise-evenement_id': 19404, 246 'intervenant-entreprise_id': 28092, 247 } 248 Request_URI = '/index?' + urllib.parse.urlencode(params) 249 self.assertEqual(len('GET %s HTTP/1.1\r\n' % Request_URI), 256) 250 self.getPage(Request_URI) 251 self.assertBody('Hello world!') 252 253 def test_malformed_header(self): 254 c = self.make_connection() 255 c.putrequest('GET', '/') 256 c.putheader('Content-Type', 'text/plain') 257 # See https://github.com/cherrypy/cherrypy/issues/941 258 c._output(b're, 1.2.3.4#015#012') 259 c.endheaders() 260 261 response = c.getresponse() 262 self.status = str(response.status) 263 self.assertStatus(400) 264 self.body = response.fp.read(20) 265 self.assertBody('Illegal header line.') 266 267 def test_http_over_https(self): 268 if self.scheme != 'https': 269 return self.skip('skipped (not running HTTPS)... ') 270 271 # Try connecting without SSL. 272 conn = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 273 conn.putrequest('GET', '/', skip_host=True) 274 conn.putheader('Host', self.HOST) 275 conn.endheaders() 276 response = conn.response_class(conn.sock, method='GET') 277 try: 278 response.begin() 279 self.assertEqual(response.status, 400) 280 self.body = response.read() 281 self.assertBody('The client sent a plain HTTP request, but this ' 282 'server only speaks HTTPS on this port.') 283 except socket.error: 284 e = sys.exc_info()[1] 285 # "Connection reset by peer" is also acceptable. 286 if e.errno != errno.ECONNRESET: 287 raise 288 289 def test_garbage_in(self): 290 # Connect without SSL regardless of server.scheme 291 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 292 c._output(b'gjkgjklsgjklsgjkljklsg') 293 c._send_output() 294 response = c.response_class(c.sock, method='GET') 295 try: 296 response.begin() 297 self.assertEqual(response.status, 400) 298 self.assertEqual(response.fp.read(22), 299 b'Malformed Request-Line') 300 c.close() 301 except socket.error: 302 e = sys.exc_info()[1] 303 # "Connection reset by peer" is also acceptable. 304 if e.errno != errno.ECONNRESET: 305 raise 306