1# Test TCP server with client issuing TCP RST part way through read
2
3try:
4    import uasyncio as asyncio
5except ImportError:
6    try:
7        import asyncio
8    except ImportError:
9        print("SKIP")
10        raise SystemExit
11
12import struct, time, socket
13
14PORT = 8000
15
16
17async def handle_connection(reader, writer):
18    data = await reader.read(10)  # should succeed
19    print(data)
20    await asyncio.sleep(0.2)  # wait for client to drop connection
21    try:
22        data = await reader.read(100)
23        print(data)
24        writer.close()
25        await writer.wait_closed()
26    except OSError as er:
27        print("OSError", er.errno)
28    ev.set()
29
30
31async def main():
32    global ev
33    ev = asyncio.Event()
34    server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT)
35    multitest.next()
36    async with server:
37        await asyncio.wait_for(ev.wait(), 10)
38
39
40def instance0():
41    multitest.globals(IP=multitest.get_network_ip())
42    asyncio.run(main())
43
44
45def instance1():
46    if not hasattr(socket, "SO_LINGER"):
47        multitest.skip()
48    multitest.next()
49    s = socket.socket()
50    s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
51    lgr_onoff = 1
52    lgr_linger = 0
53    s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", lgr_onoff, lgr_linger))
54    s.send(b"GET / HTTP/1.0\r\n\r\n")
55    time.sleep(0.1)
56    s.close()  # This issues a TCP RST since we've set the linger option
57