1import re 2import socket 3import time 4 5import pytest 6from conftest import run_process 7from unit.applications.lang.python import TestApplicationPython 8from unit.option import option 9from unit.utils import waitforsocket 10 11 12class TestProxy(TestApplicationPython): 13 prerequisites = {'modules': {'python': 'any'}} 14 15 SERVER_PORT = 7999 16 17 @staticmethod 18 def run_server(server_port): 19 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 20 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 21 22 server_address = ('', server_port) 23 sock.bind(server_address) 24 sock.listen(5) 25 26 def recvall(sock): 27 buff_size = 4096 28 data = b'' 29 while True: 30 part = sock.recv(buff_size) 31 data += part 32 if len(part) < buff_size: 33 break 34 return data 35 36 req = b"""HTTP/1.1 200 OK 37Content-Length: 10 38 39""" 40 41 while True: 42 connection, client_address = sock.accept() 43 44 data = recvall(connection).decode() 45 46 to_send = req 47 48 m = re.search(r'X-Len: (\d+)', data) 49 if m: 50 to_send += b'X' * int(m.group(1)) 51 52 connection.sendall(to_send) 53 54 connection.close() 55 56 def get_http10(self, *args, **kwargs): 57 return self.get(*args, http_10=True, **kwargs) 58 59 def post_http10(self, *args, **kwargs): 60 return self.post(*args, http_10=True, **kwargs) 61 62 def setup_method(self): 63 run_process(self.run_server, self.SERVER_PORT) 64 waitforsocket(self.SERVER_PORT) 65 66 assert 'success' in self.conf( 67 { 68 "listeners": { 69 "*:7080": {"pass": "routes"}, 70 "*:7081": {"pass": "applications/mirror"}, 71 }, 72 "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}], 73 "applications": { 74 "mirror": { 75 "type": "python", 76 "processes": {"spare": 0}, 77 "path": option.test_dir + "/python/mirror", 78 "working_directory": option.test_dir 79 + "/python/mirror", 80 "module": "wsgi", 81 }, 82 "custom_header": { 83 "type": "python", 84 "processes": {"spare": 0}, 85 "path": option.test_dir + "/python/custom_header", 86 "working_directory": option.test_dir 87 + "/python/custom_header", 88 "module": "wsgi", 89 }, 90 "delayed": { 91 "type": "python", 92 "processes": {"spare": 0}, 93 "path": option.test_dir + "/python/delayed", 94 "working_directory": option.test_dir 95 + "/python/delayed", 96 "module": "wsgi", 97 }, 98 }, 99 } 100 ), 'proxy initial configuration' 101 102 def test_proxy_http10(self): 103 for _ in range(10): 104 assert self.get_http10()['status'] == 200, 'status' 105 106 def test_proxy_chain(self): 107 assert 'success' in self.conf( 108 { 109 "listeners": { 110 "*:7080": {"pass": "routes/first"}, 111 "*:7081": {"pass": "routes/second"}, 112 "*:7082": {"pass": "routes/third"}, 113 "*:7083": {"pass": "routes/fourth"}, 114 "*:7084": {"pass": "routes/fifth"}, 115 "*:7085": {"pass": "applications/mirror"}, 116 }, 117 "routes": { 118 "first": [{"action": {"proxy": "http://127.0.0.1:7081"}}], 119 "second": [{"action": {"proxy": "http://127.0.0.1:7082"}}], 120 "third": [{"action": {"proxy": "http://127.0.0.1:7083"}}], 121 "fourth": [{"action": {"proxy": "http://127.0.0.1:7084"}}], 122 "fifth": [{"action": {"proxy": "http://127.0.0.1:7085"}}], 123 }, 124 "applications": { 125 "mirror": { 126 "type": "python", 127 "processes": {"spare": 0}, 128 "path": option.test_dir + "/python/mirror", 129 "working_directory": option.test_dir 130 + "/python/mirror", 131 "module": "wsgi", 132 } 133 }, 134 } 135 ), 'proxy chain configuration' 136 137 assert self.get_http10()['status'] == 200, 'status' 138 139 def test_proxy_body(self): 140 payload = '0123456789' 141 for _ in range(10): 142 resp = self.post_http10(body=payload) 143 144 assert resp['status'] == 200, 'status' 145 assert resp['body'] == payload, 'body' 146 147 payload = 'X' * 4096 148 for _ in range(10): 149 resp = self.post_http10(body=payload) 150 151 assert resp['status'] == 200, 'status' 152 assert resp['body'] == payload, 'body' 153 154 payload = 'X' * 4097 155 for _ in range(10): 156 resp = self.post_http10(body=payload) 157 158 assert resp['status'] == 200, 'status' 159 assert resp['body'] == payload, 'body' 160 161 payload = 'X' * 4096 * 256 162 for _ in range(10): 163 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 164 165 assert resp['status'] == 200, 'status' 166 assert resp['body'] == payload, 'body' 167 168 payload = 'X' * 4096 * 257 169 for _ in range(10): 170 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 171 172 assert resp['status'] == 200, 'status' 173 assert resp['body'] == payload, 'body' 174 175 assert 'success' in self.conf( 176 {'http': {'max_body_size': 32 * 1024 * 1024}}, 'settings' 177 ) 178 179 payload = '0123456789abcdef' * 32 * 64 * 1024 180 resp = self.post_http10(body=payload, read_buffer_size=1024 * 1024) 181 assert resp['status'] == 200, 'status' 182 assert resp['body'] == payload, 'body' 183 184 def test_proxy_parallel(self): 185 payload = 'X' * 4096 * 257 186 buff_size = 4096 * 258 187 188 socks = [] 189 for i in range(10): 190 _, sock = self.post_http10( 191 body=payload + str(i), 192 start=True, 193 no_recv=True, 194 read_buffer_size=buff_size, 195 ) 196 socks.append(sock) 197 198 for i in range(10): 199 resp = self.recvall(socks[i], buff_size=buff_size).decode() 200 socks[i].close() 201 202 resp = self._resp_to_dict(resp) 203 204 assert resp['status'] == 200, 'status' 205 assert resp['body'] == payload + str(i), 'body' 206 207 def test_proxy_header(self): 208 assert 'success' in self.conf( 209 {"pass": "applications/custom_header"}, 'listeners/*:7081' 210 ), 'custom_header configure' 211 212 header_value = 'blah' 213 assert ( 214 self.get_http10( 215 headers={'Host': 'localhost', 'Custom-Header': header_value} 216 )['headers']['Custom-Header'] 217 == header_value 218 ), 'custom header' 219 220 header_value = r'(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~' 221 assert ( 222 self.get_http10( 223 headers={'Host': 'localhost', 'Custom-Header': header_value} 224 )['headers']['Custom-Header'] 225 == header_value 226 ), 'custom header 2' 227 228 header_value = 'X' * 4096 229 assert ( 230 self.get_http10( 231 headers={'Host': 'localhost', 'Custom-Header': header_value} 232 )['headers']['Custom-Header'] 233 == header_value 234 ), 'custom header 3' 235 236 header_value = 'X' * 8191 237 assert ( 238 self.get_http10( 239 headers={'Host': 'localhost', 'Custom-Header': header_value} 240 )['headers']['Custom-Header'] 241 == header_value 242 ), 'custom header 4' 243 244 header_value = 'X' * 8192 245 assert ( 246 self.get_http10( 247 headers={'Host': 'localhost', 'Custom-Header': header_value} 248 )['status'] 249 == 431 250 ), 'custom header 5' 251 252 def test_proxy_fragmented(self): 253 _, sock = self.http( 254 b"""GET / HTT""", raw=True, start=True, no_recv=True 255 ) 256 257 time.sleep(1) 258 259 sock.sendall("P/1.0\r\nHost: localhos".encode()) 260 261 time.sleep(1) 262 263 sock.sendall("t\r\n\r\n".encode()) 264 265 assert re.search( 266 '200 OK', self.recvall(sock).decode() 267 ), 'fragmented send' 268 sock.close() 269 270 def test_proxy_fragmented_close(self): 271 _, sock = self.http( 272 b"""GET / HTT""", raw=True, start=True, no_recv=True 273 ) 274 275 time.sleep(1) 276 277 sock.sendall("P/1.0\r\nHo".encode()) 278 279 sock.close() 280 281 def test_proxy_fragmented_body(self): 282 _, sock = self.http( 283 b"""GET / HTT""", raw=True, start=True, no_recv=True 284 ) 285 286 time.sleep(1) 287 288 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 289 sock.sendall("Content-Length: 30000\r\n".encode()) 290 291 time.sleep(1) 292 293 sock.sendall("\r\n".encode()) 294 sock.sendall(("X" * 10000).encode()) 295 296 time.sleep(1) 297 298 sock.sendall(("X" * 10000).encode()) 299 300 time.sleep(1) 301 302 sock.sendall(("X" * 10000).encode()) 303 304 resp = self._resp_to_dict(self.recvall(sock).decode()) 305 sock.close() 306 307 assert resp['status'] == 200, 'status' 308 assert resp['body'] == "X" * 30000, 'body' 309 310 def test_proxy_fragmented_body_close(self): 311 _, sock = self.http( 312 b"""GET / HTT""", raw=True, start=True, no_recv=True 313 ) 314 315 time.sleep(1) 316 317 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 318 sock.sendall("Content-Length: 30000\r\n".encode()) 319 320 time.sleep(1) 321 322 sock.sendall("\r\n".encode()) 323 sock.sendall(("X" * 10000).encode()) 324 325 sock.close() 326 327 def test_proxy_nowhere(self): 328 assert 'success' in self.conf( 329 [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes' 330 ), 'proxy path changed' 331 332 assert self.get_http10()['status'] == 502, 'status' 333 334 def test_proxy_ipv6(self): 335 assert 'success' in self.conf( 336 { 337 "*:7080": {"pass": "routes"}, 338 "[::1]:7081": {'application': 'mirror'}, 339 }, 340 'listeners', 341 ), 'add ipv6 listener configure' 342 343 assert 'success' in self.conf( 344 [{"action": {"proxy": "http://[::1]:7081"}}], 'routes' 345 ), 'proxy ipv6 configure' 346 347 assert self.get_http10()['status'] == 200, 'status' 348 349 def test_proxy_unix(self, temp_dir): 350 addr = temp_dir + '/sock' 351 352 assert 'success' in self.conf( 353 { 354 "*:7080": {"pass": "routes"}, 355 "unix:" + addr: {'application': 'mirror'}, 356 }, 357 'listeners', 358 ), 'add unix listener configure' 359 360 assert 'success' in self.conf( 361 [{"action": {"proxy": 'http://unix:' + addr}}], 'routes' 362 ), 'proxy unix configure' 363 364 assert self.get_http10()['status'] == 200, 'status' 365 366 def test_proxy_delayed(self): 367 assert 'success' in self.conf( 368 {"pass": "applications/delayed"}, 'listeners/*:7081' 369 ), 'delayed configure' 370 371 body = '0123456789' * 1000 372 resp = self.post_http10( 373 headers={ 374 'Host': 'localhost', 375 'Content-Type': 'text/html', 376 'Content-Length': str(len(body)), 377 'X-Parts': '2', 378 'X-Delay': '1', 379 }, 380 body=body, 381 ) 382 383 assert resp['status'] == 200, 'status' 384 assert resp['body'] == body, 'body' 385 386 resp = self.post_http10( 387 headers={ 388 'Host': 'localhost', 389 'Content-Type': 'text/html', 390 'Content-Length': str(len(body)), 391 'X-Parts': '2', 392 'X-Delay': '1', 393 }, 394 body=body, 395 ) 396 397 assert resp['status'] == 200, 'status' 398 assert resp['body'] == body, 'body' 399 400 def test_proxy_delayed_close(self): 401 assert 'success' in self.conf( 402 {"pass": "applications/delayed"}, 'listeners/*:7081' 403 ), 'delayed configure' 404 405 _, sock = self.post_http10( 406 headers={ 407 'Host': 'localhost', 408 'Content-Type': 'text/html', 409 'Content-Length': '10000', 410 'X-Parts': '3', 411 'X-Delay': '1', 412 }, 413 body='0123456789' * 1000, 414 start=True, 415 no_recv=True, 416 ) 417 418 assert re.search('200 OK', sock.recv(100).decode()), 'first' 419 sock.close() 420 421 _, sock = self.post_http10( 422 headers={ 423 'Host': 'localhost', 424 'Content-Type': 'text/html', 425 'Content-Length': '10000', 426 'X-Parts': '3', 427 'X-Delay': '1', 428 }, 429 body='0123456789' * 1000, 430 start=True, 431 no_recv=True, 432 ) 433 434 assert re.search('200 OK', sock.recv(100).decode()), 'second' 435 sock.close() 436 437 @pytest.mark.skip('not yet') 438 def test_proxy_content_length(self): 439 assert 'success' in self.conf( 440 [ 441 { 442 "action": { 443 "proxy": "http://127.0.0.1:" + str(self.SERVER_PORT) 444 } 445 } 446 ], 447 'routes', 448 ), 'proxy backend configure' 449 450 resp = self.get_http10() 451 assert len(resp['body']) == 0, 'body lt Content-Length 0' 452 453 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'}) 454 assert len(resp['body']) == 5, 'body lt Content-Length 5' 455 456 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'}) 457 assert len(resp['body']) == 9, 'body lt Content-Length 9' 458 459 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'}) 460 assert len(resp['body']) == 10, 'body gt Content-Length 11' 461 462 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'}) 463 assert len(resp['body']) == 10, 'body gt Content-Length 15' 464 465 def test_proxy_invalid(self): 466 def check_proxy(proxy): 467 assert 'error' in self.conf( 468 [{"action": {"proxy": proxy}}], 'routes' 469 ), 'proxy invalid' 470 471 check_proxy('blah') 472 check_proxy('/blah') 473 check_proxy('unix:/blah') 474 check_proxy('http://blah') 475 check_proxy('http://127.0.0.1') 476 check_proxy('http://127.0.0.1:') 477 check_proxy('http://127.0.0.1:blah') 478 check_proxy('http://127.0.0.1:-1') 479 check_proxy('http://127.0.0.1:7080b') 480 check_proxy('http://[]') 481 check_proxy('http://[]:7080') 482 check_proxy('http://[:]:7080') 483 check_proxy('http://[::7080') 484 485 @pytest.mark.skip('not yet') 486 def test_proxy_loop(self, skip_alert): 487 skip_alert( 488 r'socket.*failed', 489 r'accept.*failed', 490 r'new connections are not accepted', 491 ) 492 assert 'success' in self.conf( 493 { 494 "listeners": { 495 "*:7080": {"pass": "routes"}, 496 "*:7081": {"pass": "applications/mirror"}, 497 "*:7082": {"pass": "routes"}, 498 }, 499 "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}], 500 "applications": { 501 "mirror": { 502 "type": "python", 503 "processes": {"spare": 0}, 504 "path": option.test_dir + "/python/mirror", 505 "working_directory": option.test_dir 506 + "/python/mirror", 507 "module": "wsgi", 508 }, 509 }, 510 } 511 ) 512 513 self.get_http10(no_recv=True) 514 self.get_http10(read_timeout=1) 515