1#!/usr/bin/env python 2 3 4import pytest 5 6 7import select 8import os.path 9from socket import error as SocketError 10from ssl import wrap_socket as sslsocket 11from socket import EAI_NODATA, EAI_NONAME 12from socket import socket, AF_INET, AF_INET6, SOCK_STREAM, has_ipv6 13 14 15from circuits import Manager, Debugger 16from circuits.net.events import close, connect, write 17from circuits.core.pollers import Select, Poll, EPoll, KQueue 18from circuits.net.sockets import TCPServer, TCP6Server, TCPClient, TCP6Client 19 20 21from .client import Client 22from .server import Server 23from tests.conftest import WaitEvent 24 25 26CERT_FILE = os.path.join(os.path.dirname(__file__), "cert.pem") 27 28 29class TestClient(object): 30 31 def __init__(self, ipv6=False): 32 self._sockname = None 33 34 self.sock = socket( 35 AF_INET6 if ipv6 36 else AF_INET, 37 SOCK_STREAM 38 ) 39 40 self.ssock = sslsocket(self.sock) 41 42 @property 43 def sockname(self): 44 return self._sockname 45 46 def connect(self, host, port): 47 self.ssock.connect_ex((host, port)) 48 self._sockname = self.ssock.getsockname() 49 50 def send(self, data): 51 self.ssock.send(data) 52 53 def recv(self, buflen=4069): 54 return self.ssock.recv(buflen) 55 56 def disconnect(self): 57 try: 58 self.ssock.shutdown(2) 59 except SocketError: 60 pass 61 62 try: 63 self.ssock.close() 64 except SockerError: 65 pass 66 67 68@pytest.fixture 69def client(request, ipv6): 70 client = TestClient(ipv6=ipv6) 71 72 def finalizer(): 73 client.disconnect() 74 75 request.addfinalizer(finalizer) 76 77 return client 78 79 80def wait_host(server): 81 def checker(obj, attr): 82 return all((getattr(obj, a) for a in attr)) 83 assert pytest.wait_for(server, ("host", "port"), checker) 84 85 86def _pytest_generate_tests(metafunc, ipv6): 87 metafunc.addcall(funcargs={"Poller": Select, "ipv6": ipv6}) 88 89 if hasattr(select, "poll"): 90 metafunc.addcall(funcargs={"Poller": Poll, "ipv6": ipv6}) 91 92 if hasattr(select, "epoll"): 93 metafunc.addcall(funcargs={"Poller": EPoll, "ipv6": ipv6}) 94 95 if hasattr(select, "kqueue"): 96 metafunc.addcall(funcargs={"Poller": KQueue, "ipv6": ipv6}) 97 98 99def pytest_generate_tests(metafunc): 100 _pytest_generate_tests(metafunc, ipv6=False) 101 if has_ipv6: 102 _pytest_generate_tests(metafunc, ipv6=True) 103 104 105def test_tcp_basic(Poller, ipv6): 106 m = Manager() + Poller() 107 108 if ipv6: 109 tcp_server = TCP6Server(("::1", 0)) 110 tcp_client = TCP6Client() 111 else: 112 tcp_server = TCPServer(0) 113 tcp_client = TCPClient() 114 server = Server() + tcp_server 115 client = Client() + tcp_client 116 117 server.register(m) 118 client.register(m) 119 120 m.start() 121 122 try: 123 assert pytest.wait_for(client, "ready") 124 assert pytest.wait_for(server, "ready") 125 wait_host(server) 126 127 client.fire(connect(server.host, server.port)) 128 assert pytest.wait_for(client, "connected") 129 assert pytest.wait_for(server, "connected") 130 assert pytest.wait_for(client, "data", b"Ready") 131 132 client.fire(write(b"foo")) 133 assert pytest.wait_for(server, "data", b"foo") 134 assert pytest.wait_for(client, "data", b"foo") 135 136 client.fire(close()) 137 assert pytest.wait_for(client, "disconnected") 138 assert pytest.wait_for(server, "disconnected") 139 140 server.fire(close()) 141 assert pytest.wait_for(server, "closed") 142 finally: 143 m.stop() 144 145 146def test_tcps_basic(manager, watcher, client, Poller, ipv6): 147 poller = Poller().register(manager) 148 149 if ipv6: 150 tcp_server = TCP6Server(("::1", 0), secure=True, certfile=CERT_FILE) 151 else: 152 tcp_server = TCPServer(0, secure=True, certfile=CERT_FILE) 153 154 server = Server() + tcp_server 155 156 server.register(manager) 157 158 try: 159 watcher.wait("ready", "server") 160 161 client.connect(server.host, server.port) 162 assert watcher.wait("connect", "server") 163 assert client.recv() == b"Ready" 164 165 client.send(b"foo") 166 assert watcher.wait("read", "server") 167 assert client.recv() == b"foo" 168 169 client.disconnect() 170 assert watcher.wait("disconnect", "server") 171 172 server.fire(close()) 173 assert watcher.wait("closed", "server") 174 finally: 175 poller.unregister() 176 server.unregister() 177 178 179def test_tcp_reconnect(Poller, ipv6): 180 # XXX: Apparently this doesn't work on Windows either? 181 # XXX: UPDATE: Apparently Broken on Windows + Python 3.2 182 # TODO: Need to look into this. Find out why... 183 184 if pytest.PLATFORM == "win32" and pytest.PYVER[:2] >= (3, 2): 185 pytest.skip("Broken on Windows on Python 3.2") 186 187 m = Manager() + Poller() 188 189 if ipv6: 190 tcp_server = TCP6Server(("::1", 0)) 191 tcp_client = TCP6Client() 192 else: 193 tcp_server = TCPServer(0) 194 tcp_client = TCPClient() 195 server = Server() + tcp_server 196 client = Client() + tcp_client 197 198 server.register(m) 199 client.register(m) 200 201 m.start() 202 203 try: 204 assert pytest.wait_for(client, "ready") 205 assert pytest.wait_for(server, "ready") 206 wait_host(server) 207 208 # 1st connect 209 client.fire(connect(server.host, server.port)) 210 assert pytest.wait_for(client, "connected") 211 assert pytest.wait_for(server, "connected") 212 assert pytest.wait_for(client, "data", b"Ready") 213 214 client.fire(write(b"foo")) 215 assert pytest.wait_for(server, "data", b"foo") 216 217 # disconnect 218 client.fire(close()) 219 assert pytest.wait_for(client, "disconnected") 220 221 # 2nd reconnect 222 client.fire(connect(server.host, server.port)) 223 assert pytest.wait_for(client, "connected") 224 assert pytest.wait_for(server, "connected") 225 assert pytest.wait_for(client, "data", b"Ready") 226 227 client.fire(write(b"foo")) 228 assert pytest.wait_for(server, "data", b"foo") 229 230 client.fire(close()) 231 assert pytest.wait_for(client, "disconnected") 232 assert pytest.wait_for(server, "disconnected") 233 234 server.fire(close()) 235 assert pytest.wait_for(server, "closed") 236 finally: 237 m.stop() 238 239 240def test_tcp_connect_closed_port(Poller, ipv6): 241 242 if pytest.PLATFORM == "win32": 243 pytest.skip("Broken on Windows") 244 245 m = Manager() + Poller() + Debugger() 246 247 if ipv6: 248 tcp_server = TCP6Server(("::1", 0)) 249 tcp_client = TCP6Client(connect_timeout=1) 250 else: 251 tcp_server = TCPServer(0) 252 tcp_client = TCPClient(connect_timeout=1) 253 server = Server() + tcp_server 254 client = Client() + tcp_client 255 256 server.register(m) 257 client.register(m) 258 259 m.start() 260 261 try: 262 assert pytest.wait_for(client, "ready") 263 assert pytest.wait_for(server, "ready") 264 wait_host(server) 265 266 host, port = server.host, server.port 267 tcp_server._sock.close() 268 269 # 1st connect 270 client.fire(connect(host, port)) 271 waiter = WaitEvent(m, "unreachable", channel='client') 272 assert waiter.wait() 273 finally: 274 server.unregister() 275 client.unregister() 276 m.stop() 277 278 279def test_tcp_bind(Poller, ipv6): 280 m = Manager() + Poller() 281 282 if ipv6: 283 sock = socket(AF_INET6, SOCK_STREAM) 284 sock.bind(("::1", 0)) 285 sock.listen(5) 286 _, bind_port, _, _ = sock.getsockname() 287 sock.close() 288 server = Server() + TCP6Server(("::1", 0)) 289 client = Client() + TCP6Client() 290 else: 291 sock = socket(AF_INET, SOCK_STREAM) 292 sock.bind(("", 0)) 293 sock.listen(5) 294 _, bind_port = sock.getsockname() 295 sock.close() 296 server = Server() + TCPServer(0) 297 client = Client() + TCPClient() 298 299 server.register(m) 300 client.register(m) 301 302 m.start() 303 304 try: 305 assert pytest.wait_for(client, "ready") 306 assert pytest.wait_for(server, "ready") 307 wait_host(server) 308 309 client.fire(connect(server.host, server.port)) 310 assert pytest.wait_for(client, "connected") 311 assert pytest.wait_for(server, "connected") 312 assert pytest.wait_for(client, "data", b"Ready") 313 314 # assert server.client[1] == bind_port 315 316 client.fire(write(b"foo")) 317 assert pytest.wait_for(server, "data", b"foo") 318 319 client.fire(close()) 320 assert pytest.wait_for(client, "disconnected") 321 assert pytest.wait_for(server, "disconnected") 322 323 server.fire(close()) 324 assert pytest.wait_for(server, "closed") 325 finally: 326 m.stop() 327 328 329def test_tcp_lookup_failure(manager, watcher, Poller, ipv6): 330 poller = Poller().register(manager) 331 332 if ipv6: 333 tcp_client = TCP6Client() 334 else: 335 tcp_client = TCPClient() 336 337 client = Client() + tcp_client 338 client.register(manager) 339 340 try: 341 assert watcher.wait("ready", "client") 342 343 client.fire(connect("foo", 1234)) 344 assert watcher.wait("error", "client") 345 346 if pytest.PLATFORM == "win32": 347 assert client.error.errno == 11004 348 else: 349 assert client.error.errno in (EAI_NODATA, EAI_NONAME,) 350 finally: 351 poller.unregister() 352 client.unregister() 353