1#!/usr/bin/env python
2
3
4import pytest
5
6
7import select
8import os.path
9from socket import error as SocketError
10from ssl import wrap_socket as sslsocket
11from socket import EAI_NODATA, EAI_NONAME
12from socket import socket, AF_INET, AF_INET6, SOCK_STREAM, has_ipv6
13
14
15from circuits import Manager, Debugger
16from circuits.net.events import close, connect, write
17from circuits.core.pollers import Select, Poll, EPoll, KQueue
18from circuits.net.sockets import TCPServer, TCP6Server, TCPClient, TCP6Client
19
20
21from .client import Client
22from .server import Server
23from tests.conftest import WaitEvent
24
25
26CERT_FILE = os.path.join(os.path.dirname(__file__), "cert.pem")
27
28
29class TestClient(object):
30
31    def __init__(self, ipv6=False):
32        self._sockname = None
33
34        self.sock = socket(
35            AF_INET6 if ipv6
36            else AF_INET,
37            SOCK_STREAM
38        )
39
40        self.ssock = sslsocket(self.sock)
41
42    @property
43    def sockname(self):
44        return self._sockname
45
46    def connect(self, host, port):
47        self.ssock.connect_ex((host, port))
48        self._sockname = self.ssock.getsockname()
49
50    def send(self, data):
51        self.ssock.send(data)
52
53    def recv(self, buflen=4069):
54        return self.ssock.recv(buflen)
55
56    def disconnect(self):
57        try:
58            self.ssock.shutdown(2)
59        except SocketError:
60            pass
61
62        try:
63            self.ssock.close()
64        except SockerError:
65            pass
66
67
68@pytest.fixture
69def client(request, ipv6):
70    client = TestClient(ipv6=ipv6)
71
72    def finalizer():
73        client.disconnect()
74
75    request.addfinalizer(finalizer)
76
77    return client
78
79
80def wait_host(server):
81    def checker(obj, attr):
82        return all((getattr(obj, a) for a in attr))
83    assert pytest.wait_for(server, ("host", "port"), checker)
84
85
86def _pytest_generate_tests(metafunc, ipv6):
87    metafunc.addcall(funcargs={"Poller": Select, "ipv6": ipv6})
88
89    if hasattr(select, "poll"):
90        metafunc.addcall(funcargs={"Poller": Poll, "ipv6": ipv6})
91
92    if hasattr(select, "epoll"):
93        metafunc.addcall(funcargs={"Poller": EPoll, "ipv6": ipv6})
94
95    if hasattr(select, "kqueue"):
96        metafunc.addcall(funcargs={"Poller": KQueue, "ipv6": ipv6})
97
98
99def pytest_generate_tests(metafunc):
100    _pytest_generate_tests(metafunc, ipv6=False)
101    if has_ipv6:
102        _pytest_generate_tests(metafunc, ipv6=True)
103
104
105def test_tcp_basic(Poller, ipv6):
106    m = Manager() + Poller()
107
108    if ipv6:
109        tcp_server = TCP6Server(("::1", 0))
110        tcp_client = TCP6Client()
111    else:
112        tcp_server = TCPServer(0)
113        tcp_client = TCPClient()
114    server = Server() + tcp_server
115    client = Client() + tcp_client
116
117    server.register(m)
118    client.register(m)
119
120    m.start()
121
122    try:
123        assert pytest.wait_for(client, "ready")
124        assert pytest.wait_for(server, "ready")
125        wait_host(server)
126
127        client.fire(connect(server.host, server.port))
128        assert pytest.wait_for(client, "connected")
129        assert pytest.wait_for(server, "connected")
130        assert pytest.wait_for(client, "data", b"Ready")
131
132        client.fire(write(b"foo"))
133        assert pytest.wait_for(server, "data", b"foo")
134        assert pytest.wait_for(client, "data", b"foo")
135
136        client.fire(close())
137        assert pytest.wait_for(client, "disconnected")
138        assert pytest.wait_for(server, "disconnected")
139
140        server.fire(close())
141        assert pytest.wait_for(server, "closed")
142    finally:
143        m.stop()
144
145
146def test_tcps_basic(manager, watcher, client, Poller, ipv6):
147    poller = Poller().register(manager)
148
149    if ipv6:
150        tcp_server = TCP6Server(("::1", 0), secure=True, certfile=CERT_FILE)
151    else:
152        tcp_server = TCPServer(0, secure=True, certfile=CERT_FILE)
153
154    server = Server() + tcp_server
155
156    server.register(manager)
157
158    try:
159        watcher.wait("ready", "server")
160
161        client.connect(server.host, server.port)
162        assert watcher.wait("connect", "server")
163        assert client.recv() == b"Ready"
164
165        client.send(b"foo")
166        assert watcher.wait("read", "server")
167        assert client.recv() == b"foo"
168
169        client.disconnect()
170        assert watcher.wait("disconnect", "server")
171
172        server.fire(close())
173        assert watcher.wait("closed", "server")
174    finally:
175        poller.unregister()
176        server.unregister()
177
178
179def test_tcp_reconnect(Poller, ipv6):
180    # XXX: Apparently this doesn't work on Windows either?
181    # XXX: UPDATE: Apparently Broken on Windows + Python 3.2
182    # TODO: Need to look into this. Find out why...
183
184    if pytest.PLATFORM == "win32" and pytest.PYVER[:2] >= (3, 2):
185        pytest.skip("Broken on Windows on Python 3.2")
186
187    m = Manager() + Poller()
188
189    if ipv6:
190        tcp_server = TCP6Server(("::1", 0))
191        tcp_client = TCP6Client()
192    else:
193        tcp_server = TCPServer(0)
194        tcp_client = TCPClient()
195    server = Server() + tcp_server
196    client = Client() + tcp_client
197
198    server.register(m)
199    client.register(m)
200
201    m.start()
202
203    try:
204        assert pytest.wait_for(client, "ready")
205        assert pytest.wait_for(server, "ready")
206        wait_host(server)
207
208        # 1st connect
209        client.fire(connect(server.host, server.port))
210        assert pytest.wait_for(client, "connected")
211        assert pytest.wait_for(server, "connected")
212        assert pytest.wait_for(client, "data", b"Ready")
213
214        client.fire(write(b"foo"))
215        assert pytest.wait_for(server, "data", b"foo")
216
217        # disconnect
218        client.fire(close())
219        assert pytest.wait_for(client, "disconnected")
220
221        # 2nd reconnect
222        client.fire(connect(server.host, server.port))
223        assert pytest.wait_for(client, "connected")
224        assert pytest.wait_for(server, "connected")
225        assert pytest.wait_for(client, "data", b"Ready")
226
227        client.fire(write(b"foo"))
228        assert pytest.wait_for(server, "data", b"foo")
229
230        client.fire(close())
231        assert pytest.wait_for(client, "disconnected")
232        assert pytest.wait_for(server, "disconnected")
233
234        server.fire(close())
235        assert pytest.wait_for(server, "closed")
236    finally:
237        m.stop()
238
239
240def test_tcp_connect_closed_port(Poller, ipv6):
241
242    if pytest.PLATFORM == "win32":
243        pytest.skip("Broken on Windows")
244
245    m = Manager() + Poller() + Debugger()
246
247    if ipv6:
248        tcp_server = TCP6Server(("::1", 0))
249        tcp_client = TCP6Client(connect_timeout=1)
250    else:
251        tcp_server = TCPServer(0)
252        tcp_client = TCPClient(connect_timeout=1)
253    server = Server() + tcp_server
254    client = Client() + tcp_client
255
256    server.register(m)
257    client.register(m)
258
259    m.start()
260
261    try:
262        assert pytest.wait_for(client, "ready")
263        assert pytest.wait_for(server, "ready")
264        wait_host(server)
265
266        host, port = server.host, server.port
267        tcp_server._sock.close()
268
269        # 1st connect
270        client.fire(connect(host, port))
271        waiter = WaitEvent(m, "unreachable", channel='client')
272        assert waiter.wait()
273    finally:
274        server.unregister()
275        client.unregister()
276        m.stop()
277
278
279def test_tcp_bind(Poller, ipv6):
280    m = Manager() + Poller()
281
282    if ipv6:
283        sock = socket(AF_INET6, SOCK_STREAM)
284        sock.bind(("::1", 0))
285        sock.listen(5)
286        _, bind_port, _, _ = sock.getsockname()
287        sock.close()
288        server = Server() + TCP6Server(("::1", 0))
289        client = Client() + TCP6Client()
290    else:
291        sock = socket(AF_INET, SOCK_STREAM)
292        sock.bind(("", 0))
293        sock.listen(5)
294        _, bind_port = sock.getsockname()
295        sock.close()
296        server = Server() + TCPServer(0)
297        client = Client() + TCPClient()
298
299    server.register(m)
300    client.register(m)
301
302    m.start()
303
304    try:
305        assert pytest.wait_for(client, "ready")
306        assert pytest.wait_for(server, "ready")
307        wait_host(server)
308
309        client.fire(connect(server.host, server.port))
310        assert pytest.wait_for(client, "connected")
311        assert pytest.wait_for(server, "connected")
312        assert pytest.wait_for(client, "data", b"Ready")
313
314        # assert server.client[1] == bind_port
315
316        client.fire(write(b"foo"))
317        assert pytest.wait_for(server, "data", b"foo")
318
319        client.fire(close())
320        assert pytest.wait_for(client, "disconnected")
321        assert pytest.wait_for(server, "disconnected")
322
323        server.fire(close())
324        assert pytest.wait_for(server, "closed")
325    finally:
326        m.stop()
327
328
329def test_tcp_lookup_failure(manager, watcher, Poller, ipv6):
330    poller = Poller().register(manager)
331
332    if ipv6:
333        tcp_client = TCP6Client()
334    else:
335        tcp_client = TCPClient()
336
337    client = Client() + tcp_client
338    client.register(manager)
339
340    try:
341        assert watcher.wait("ready", "client")
342
343        client.fire(connect("foo", 1234))
344        assert watcher.wait("error", "client")
345
346        if pytest.PLATFORM == "win32":
347            assert client.error.errno == 11004
348        else:
349            assert client.error.errno in (EAI_NODATA, EAI_NONAME,)
350    finally:
351        poller.unregister()
352        client.unregister()
353