1# Test TCP server with client issuing TCP RST part way through read 2 3try: 4 import uasyncio as asyncio 5except ImportError: 6 try: 7 import asyncio 8 except ImportError: 9 print("SKIP") 10 raise SystemExit 11 12import struct, time, socket 13 14PORT = 8000 15 16 17async def handle_connection(reader, writer): 18 data = await reader.read(10) # should succeed 19 print(data) 20 await asyncio.sleep(0.2) # wait for client to drop connection 21 try: 22 data = await reader.read(100) 23 print(data) 24 writer.close() 25 await writer.wait_closed() 26 except OSError as er: 27 print("OSError", er.errno) 28 ev.set() 29 30 31async def main(): 32 global ev 33 ev = asyncio.Event() 34 server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT) 35 multitest.next() 36 async with server: 37 await asyncio.wait_for(ev.wait(), 10) 38 39 40def instance0(): 41 multitest.globals(IP=multitest.get_network_ip()) 42 asyncio.run(main()) 43 44 45def instance1(): 46 if not hasattr(socket, "SO_LINGER"): 47 multitest.skip() 48 multitest.next() 49 s = socket.socket() 50 s.connect(socket.getaddrinfo(IP, PORT)[0][-1]) 51 lgr_onoff = 1 52 lgr_linger = 0 53 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", lgr_onoff, lgr_linger)) 54 s.send(b"GET / HTTP/1.0\r\n\r\n") 55 time.sleep(0.1) 56 s.close() # This issues a TCP RST since we've set the linger option 57