1# coding: utf-8 2"""Tests for managing HTTP issues (malformed requests, etc).""" 3 4import errno 5import mimetypes 6import socket 7import sys 8from unittest import mock 9 10import six 11from six.moves.http_client import HTTPConnection 12from six.moves import urllib 13 14import cherrypy 15from cherrypy._cpcompat import HTTPSConnection, quote 16 17from cherrypy.test import helper 18 19 20def is_ascii(text): 21 """ 22 Return True if the text encodes as ascii. 23 """ 24 try: 25 text.encode('ascii') 26 return True 27 except Exception: 28 pass 29 return False 30 31 32def encode_filename(filename): 33 """ 34 Given a filename to be used in a multipart/form-data, 35 encode the name. Return the key and encoded filename. 36 """ 37 if is_ascii(filename): 38 return 'filename', '"{filename}"'.format(**locals()) 39 encoded = quote(filename, encoding='utf-8') 40 return 'filename*', "'".join(( 41 'UTF-8', 42 '', # lang 43 encoded, 44 )) 45 46 47def encode_multipart_formdata(files): 48 """Return (content_type, body) ready for httplib.HTTP instance. 49 50 files: a sequence of (name, filename, value) tuples for multipart uploads. 51 filename can be a string or a tuple ('filename string', 'encoding') 52 """ 53 BOUNDARY = '________ThIs_Is_tHe_bouNdaRY_$' 54 L = [] 55 for key, filename, value in files: 56 L.append('--' + BOUNDARY) 57 58 fn_key, encoded = encode_filename(filename) 59 tmpl = \ 60 'Content-Disposition: form-data; name="{key}"; {fn_key}={encoded}' 61 L.append(tmpl.format(**locals())) 62 ct = mimetypes.guess_type(filename)[0] or 'application/octet-stream' 63 L.append('Content-Type: %s' % ct) 64 L.append('') 65 L.append(value) 66 L.append('--' + BOUNDARY + '--') 67 L.append('') 68 body = '\r\n'.join(L) 69 content_type = 'multipart/form-data; boundary=%s' % BOUNDARY 70 return content_type, body 71 72 73class HTTPTests(helper.CPWebCase): 74 75 def make_connection(self): 76 if self.scheme == 'https': 77 return HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 78 else: 79 return HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 80 81 @staticmethod 82 def setup_server(): 83 class Root: 84 85 @cherrypy.expose 86 def index(self, *args, **kwargs): 87 return 'Hello world!' 88 89 @cherrypy.expose 90 @cherrypy.config(**{'request.process_request_body': False}) 91 def no_body(self, *args, **kwargs): 92 return 'Hello world!' 93 94 @cherrypy.expose 95 def post_multipart(self, file): 96 """Return a summary ("a * 65536\nb * 65536") of the uploaded 97 file. 98 """ 99 contents = file.file.read() 100 summary = [] 101 curchar = None 102 count = 0 103 for c in contents: 104 if c == curchar: 105 count += 1 106 else: 107 if count: 108 if six.PY3: 109 curchar = chr(curchar) 110 summary.append('%s * %d' % (curchar, count)) 111 count = 1 112 curchar = c 113 if count: 114 if six.PY3: 115 curchar = chr(curchar) 116 summary.append('%s * %d' % (curchar, count)) 117 return ', '.join(summary) 118 119 @cherrypy.expose 120 def post_filename(self, myfile): 121 '''Return the name of the file which was uploaded.''' 122 return myfile.filename 123 124 cherrypy.tree.mount(Root()) 125 cherrypy.config.update({'server.max_request_body_size': 30000000}) 126 127 def test_no_content_length(self): 128 # "The presence of a message-body in a request is signaled by the 129 # inclusion of a Content-Length or Transfer-Encoding header field in 130 # the request's message-headers." 131 # 132 # Send a message with neither header and no body. Even though 133 # the request is of method POST, this should be OK because we set 134 # request.process_request_body to False for our handler. 135 c = self.make_connection() 136 c.request('POST', '/no_body') 137 response = c.getresponse() 138 self.body = response.fp.read() 139 self.status = str(response.status) 140 self.assertStatus(200) 141 self.assertBody(b'Hello world!') 142 143 # Now send a message that has no Content-Length, but does send a body. 144 # Verify that CP times out the socket and responds 145 # with 411 Length Required. 146 if self.scheme == 'https': 147 c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT)) 148 else: 149 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 150 151 # `_get_content_length` is needed for Python 3.6+ 152 with mock.patch.object( 153 c, 154 '_get_content_length', 155 lambda body, method: None, 156 create=True): 157 # `_set_content_length` is needed for Python 2.7-3.5 158 with mock.patch.object(c, '_set_content_length', create=True): 159 c.request('POST', '/') 160 161 response = c.getresponse() 162 self.body = response.fp.read() 163 self.status = str(response.status) 164 self.assertStatus(411) 165 166 def test_post_multipart(self): 167 alphabet = 'abcdefghijklmnopqrstuvwxyz' 168 # generate file contents for a large post 169 contents = ''.join([c * 65536 for c in alphabet]) 170 171 # encode as multipart form data 172 files = [('file', 'file.txt', contents)] 173 content_type, body = encode_multipart_formdata(files) 174 body = body.encode('Latin-1') 175 176 # post file 177 c = self.make_connection() 178 c.putrequest('POST', '/post_multipart') 179 c.putheader('Content-Type', content_type) 180 c.putheader('Content-Length', str(len(body))) 181 c.endheaders() 182 c.send(body) 183 184 response = c.getresponse() 185 self.body = response.fp.read() 186 self.status = str(response.status) 187 self.assertStatus(200) 188 parts = ['%s * 65536' % ch for ch in alphabet] 189 self.assertBody(', '.join(parts)) 190 191 def test_post_filename_with_special_characters(self): 192 '''Testing that we can handle filenames with special characters. This 193 was reported as a bug in: 194 https://github.com/cherrypy/cherrypy/issues/1146/ 195 https://github.com/cherrypy/cherrypy/issues/1397/ 196 https://github.com/cherrypy/cherrypy/issues/1694/ 197 ''' 198 # We'll upload a bunch of files with differing names. 199 fnames = [ 200 'boop.csv', 'foo, bar.csv', 'bar, xxxx.csv', 'file"name.csv', 201 'file;name.csv', 'file; name.csv', u'test_łóąä.txt', 202 ] 203 for fname in fnames: 204 files = [('myfile', fname, 'yunyeenyunyue')] 205 content_type, body = encode_multipart_formdata(files) 206 body = body.encode('Latin-1') 207 208 # post file 209 c = self.make_connection() 210 c.putrequest('POST', '/post_filename') 211 c.putheader('Content-Type', content_type) 212 c.putheader('Content-Length', str(len(body))) 213 c.endheaders() 214 c.send(body) 215 216 response = c.getresponse() 217 self.body = response.fp.read() 218 self.status = str(response.status) 219 self.assertStatus(200) 220 self.assertBody(fname) 221 222 def test_malformed_request_line(self): 223 if getattr(cherrypy.server, 'using_apache', False): 224 return self.skip('skipped due to known Apache differences...') 225 226 # Test missing version in Request-Line 227 c = self.make_connection() 228 c._output(b'geT /') 229 c._send_output() 230 if hasattr(c, 'strict'): 231 response = c.response_class(c.sock, strict=c.strict, method='GET') 232 else: 233 # Python 3.2 removed the 'strict' feature, saying: 234 # "http.client now always assumes HTTP/1.x compliant servers." 235 response = c.response_class(c.sock, method='GET') 236 response.begin() 237 self.assertEqual(response.status, 400) 238 self.assertEqual(response.fp.read(22), b'Malformed Request-Line') 239 c.close() 240 241 def test_request_line_split_issue_1220(self): 242 params = { 243 'intervenant-entreprise-evenement_classaction': 244 'evenement-mailremerciements', 245 '_path': 'intervenant-entreprise-evenement', 246 'intervenant-entreprise-evenement_action-id': 19404, 247 'intervenant-entreprise-evenement_id': 19404, 248 'intervenant-entreprise_id': 28092, 249 } 250 Request_URI = '/index?' + urllib.parse.urlencode(params) 251 self.assertEqual(len('GET %s HTTP/1.1\r\n' % Request_URI), 256) 252 self.getPage(Request_URI) 253 self.assertBody('Hello world!') 254 255 def test_malformed_header(self): 256 c = self.make_connection() 257 c.putrequest('GET', '/') 258 c.putheader('Content-Type', 'text/plain') 259 # See https://github.com/cherrypy/cherrypy/issues/941 260 c._output(b're, 1.2.3.4#015#012') 261 c.endheaders() 262 263 response = c.getresponse() 264 self.status = str(response.status) 265 self.assertStatus(400) 266 self.body = response.fp.read(20) 267 self.assertBody('Illegal header line.') 268 269 def test_http_over_https(self): 270 if self.scheme != 'https': 271 return self.skip('skipped (not running HTTPS)... ') 272 273 # Try connecting without SSL. 274 conn = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 275 conn.putrequest('GET', '/', skip_host=True) 276 conn.putheader('Host', self.HOST) 277 conn.endheaders() 278 response = conn.response_class(conn.sock, method='GET') 279 try: 280 response.begin() 281 self.assertEqual(response.status, 400) 282 self.body = response.read() 283 self.assertBody('The client sent a plain HTTP request, but this ' 284 'server only speaks HTTPS on this port.') 285 except socket.error: 286 e = sys.exc_info()[1] 287 # "Connection reset by peer" is also acceptable. 288 if e.errno != errno.ECONNRESET: 289 raise 290 291 def test_garbage_in(self): 292 # Connect without SSL regardless of server.scheme 293 c = HTTPConnection('%s:%s' % (self.interface(), self.PORT)) 294 c._output(b'gjkgjklsgjklsgjkljklsg') 295 c._send_output() 296 response = c.response_class(c.sock, method='GET') 297 try: 298 response.begin() 299 self.assertEqual(response.status, 400) 300 self.assertEqual(response.fp.read(22), 301 b'Malformed Request-Line') 302 c.close() 303 except socket.error: 304 e = sys.exc_info()[1] 305 # "Connection reset by peer" is also acceptable. 306 if e.errno != errno.ECONNRESET: 307 raise 308