1import contextlib 2 3try: 4 from eventlet.green import zmq 5except ImportError: 6 zmq = {} # for systems lacking zmq, skips tests instead of barfing 7else: 8 RECV_ON_CLOSED_SOCKET_ERRNOS = (zmq.ENOTSUP, zmq.ENOTSOCK) 9 10import eventlet 11import tests 12 13 14def zmq_supported(_): 15 try: 16 import zmq 17 except ImportError: 18 return False 19 return not tests.using_pyevent(_) 20 21 22class TestUpstreamDownStream(tests.LimitedTestCase): 23 TEST_TIMEOUT = 2 24 25 @tests.skip_unless(zmq_supported) 26 def setUp(self): 27 super(TestUpstreamDownStream, self).setUp() 28 self.context = zmq.Context() 29 self.sockets = [] 30 31 @tests.skip_unless(zmq_supported) 32 def tearDown(self): 33 self.clear_up_sockets() 34 super(TestUpstreamDownStream, self).tearDown() 35 36 def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'): 37 """Create a bound socket pair using a random port.""" 38 s1 = self.context.socket(type1) 39 port = s1.bind_to_random_port(interface) 40 s2 = self.context.socket(type2) 41 s2.connect('%s:%s' % (interface, port)) 42 self.sockets.append(s1) 43 self.sockets.append(s2) 44 return s1, s2, port 45 46 def clear_up_sockets(self): 47 for sock in self.sockets: 48 sock.close() 49 self.sockets = None 50 self.context.destroy(0) 51 52 def assertRaisesErrno(self, errnos, func, *args): 53 try: 54 func(*args) 55 except zmq.ZMQError as e: 56 if not hasattr(errnos, '__iter__'): 57 errnos = (errnos,) 58 59 if e.errno not in errnos: 60 raise AssertionError( 61 "wrong error raised, expected one of ['%s'], got '%s'" % ( 62 ", ".join("%s" % zmq.ZMQError(errno) for errno in errnos), 63 zmq.ZMQError(e.errno) 64 ), 65 ) 66 else: 67 self.fail("Function did not raise any error") 68 69 @tests.skip_unless(zmq_supported) 70 def test_close_linger(self): 71 """Socket.close() must support linger argument. 72 73 https://github.com/eventlet/eventlet/issues/9 74 """ 75 sock1, sock2, _ = self.create_bound_pair(zmq.PAIR, zmq.PAIR) 76 sock1.close(1) 77 sock2.close(linger=0) 78 79 @tests.skip_unless(zmq_supported) 80 def test_recv_spawned_before_send_is_non_blocking(self): 81 req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR) 82# req.connect(ipc) 83# rep.bind(ipc) 84 eventlet.sleep() 85 msg = dict(res=None) 86 done = eventlet.Event() 87 88 def rx(): 89 msg['res'] = rep.recv() 90 done.send('done') 91 92 eventlet.spawn(rx) 93 req.send(b'test') 94 done.wait() 95 self.assertEqual(msg['res'], b'test') 96 97 @tests.skip_unless(zmq_supported) 98 def test_close_socket_raises_enotsup(self): 99 req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR) 100 101 rep.close() 102 req.close() 103 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv) 104 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test') 105 106 @tests.skip_unless(zmq_supported) 107 def test_close_xsocket_raises_enotsup(self): 108 req, rep, port = self.create_bound_pair(zmq.XREQ, zmq.XREP) 109 110 rep.close() 111 req.close() 112 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv) 113 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test') 114 115 @tests.skip_unless(zmq_supported) 116 def test_send_1k_req_rep(self): 117 req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP) 118 eventlet.sleep() 119 done = eventlet.Event() 120 121 def tx(): 122 tx_i = 0 123 req.send(str(tx_i).encode()) 124 while req.recv() != b'done': 125 tx_i += 1 126 req.send(str(tx_i).encode()) 127 done.send(0) 128 129 def rx(): 130 while True: 131 rx_i = rep.recv() 132 if rx_i == b"1000": 133 rep.send(b'done') 134 break 135 rep.send(b'i') 136 eventlet.spawn(tx) 137 eventlet.spawn(rx) 138 final_i = done.wait() 139 self.assertEqual(final_i, 0) 140 141 @tests.skip_unless(zmq_supported) 142 def test_send_1k_push_pull(self): 143 down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL) 144 eventlet.sleep() 145 146 done = eventlet.Event() 147 148 def tx(): 149 tx_i = 0 150 while tx_i <= 1000: 151 tx_i += 1 152 down.send(str(tx_i).encode()) 153 154 def rx(): 155 while True: 156 rx_i = up.recv() 157 if rx_i == b"1000": 158 done.send(0) 159 break 160 eventlet.spawn(tx) 161 eventlet.spawn(rx) 162 final_i = done.wait() 163 self.assertEqual(final_i, 0) 164 165 @tests.skip_unless(zmq_supported) 166 def test_send_1k_pub_sub(self): 167 pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB) 168 sub1 = self.context.socket(zmq.SUB) 169 sub2 = self.context.socket(zmq.SUB) 170 self.sockets.extend([sub1, sub2]) 171 addr = 'tcp://127.0.0.1:%s' % port 172 sub1.connect(addr) 173 sub2.connect(addr) 174 sub_all.setsockopt(zmq.SUBSCRIBE, b'') 175 sub1.setsockopt(zmq.SUBSCRIBE, b'sub1') 176 sub2.setsockopt(zmq.SUBSCRIBE, b'sub2') 177 178 sub_all_done = eventlet.Event() 179 sub1_done = eventlet.Event() 180 sub2_done = eventlet.Event() 181 182 eventlet.sleep(0.2) 183 184 def rx(sock, done_evt, msg_count=10000): 185 count = 0 186 while count < msg_count: 187 msg = sock.recv() 188 eventlet.sleep() 189 if b'LAST' in msg: 190 break 191 count += 1 192 193 done_evt.send(count) 194 195 def tx(sock): 196 for i in range(1, 1001): 197 msg = ("sub%s %s" % ([2, 1][i % 2], i)).encode() 198 sock.send(msg) 199 eventlet.sleep() 200 sock.send(b'sub1 LAST') 201 sock.send(b'sub2 LAST') 202 203 eventlet.spawn(rx, sub_all, sub_all_done) 204 eventlet.spawn(rx, sub1, sub1_done) 205 eventlet.spawn(rx, sub2, sub2_done) 206 eventlet.spawn(tx, pub) 207 sub1_count = sub1_done.wait() 208 sub2_count = sub2_done.wait() 209 sub_all_count = sub_all_done.wait() 210 self.assertEqual(sub1_count, 500) 211 self.assertEqual(sub2_count, 500) 212 self.assertEqual(sub_all_count, 1000) 213 214 @tests.skip_unless(zmq_supported) 215 def test_change_subscription(self): 216 # FIXME: Extensive testing showed this particular test is the root cause 217 # of sporadic failures on Travis. 218 pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB) 219 sub.setsockopt(zmq.SUBSCRIBE, b'test') 220 eventlet.sleep(0) 221 sub_ready = eventlet.Event() 222 sub_last = eventlet.Event() 223 sub_done = eventlet.Event() 224 225 def rx(): 226 while sub.recv() != b'test BEGIN': 227 eventlet.sleep(0) 228 sub_ready.send() 229 count = 0 230 while True: 231 msg = sub.recv() 232 if msg == b'test BEGIN': 233 # BEGIN may come many times 234 continue 235 if msg == b'test LAST': 236 sub.setsockopt(zmq.SUBSCRIBE, b'done') 237 sub.setsockopt(zmq.UNSUBSCRIBE, b'test') 238 eventlet.sleep(0) 239 # In real application you should either sync 240 # or tolerate loss of messages. 241 sub_last.send() 242 if msg == b'done DONE': 243 break 244 count += 1 245 sub_done.send(count) 246 247 def tx(): 248 # Sync receiver ready to avoid loss of first packets 249 while not sub_ready.ready(): 250 pub.send(b'test BEGIN') 251 eventlet.sleep(0.005) 252 for i in range(1, 101): 253 msg = 'test {0}'.format(i).encode() 254 if i != 50: 255 pub.send(msg) 256 else: 257 pub.send(b'test LAST') 258 sub_last.wait() 259 # XXX: putting a real delay of 1ms here fixes sporadic failures on Travis 260 # just yield eventlet.sleep(0) doesn't cut it 261 eventlet.sleep(0.001) 262 pub.send(b'done DONE') 263 264 eventlet.spawn(rx) 265 eventlet.spawn(tx) 266 rx_count = sub_done.wait() 267 self.assertEqual(rx_count, 50) 268 269 @tests.skip_unless(zmq_supported) 270 def test_recv_multipart_bug68(self): 271 req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP) 272 msg = [b''] 273 req.send_multipart(msg) 274 recieved_msg = rep.recv_multipart() 275 self.assertEqual(recieved_msg, msg) 276 277 # Send a message back the other way 278 msg2 = [b""] 279 rep.send_multipart(msg2, copy=False) 280 # When receiving a copy it's a zmq.core.message.Message you get back 281 recieved_msg = req.recv_multipart(copy=False) 282 # So it needs to be converted to a string 283 # I'm calling str(m) consciously here; Message has a .data attribute 284 # but it's private __str__ appears to be the way to go 285 self.assertEqual([m.bytes for m in recieved_msg], msg2) 286 287 @tests.skip_unless(zmq_supported) 288 def test_recv_noblock_bug76(self): 289 req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP) 290 self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK) 291 self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True) 292 293 @tests.skip_unless(zmq_supported) 294 def test_send_during_recv(self): 295 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) 296 eventlet.sleep() 297 298 num_recvs = 30 299 done_evts = [eventlet.Event() for _ in range(num_recvs)] 300 301 def slow_rx(done, msg): 302 self.assertEqual(sender.recv(), msg) 303 done.send(0) 304 305 def tx(): 306 tx_i = 0 307 while tx_i <= 1000: 308 sender.send(str(tx_i).encode()) 309 tx_i += 1 310 311 def rx(): 312 while True: 313 rx_i = receiver.recv() 314 if rx_i == b"1000": 315 for i in range(num_recvs): 316 receiver.send(('done%d' % i).encode()) 317 eventlet.sleep() 318 return 319 320 for i in range(num_recvs): 321 eventlet.spawn(slow_rx, done_evts[i], ("done%d" % i).encode()) 322 323 eventlet.spawn(tx) 324 eventlet.spawn(rx) 325 for evt in done_evts: 326 self.assertEqual(evt.wait(), 0) 327 328 @tests.skip_unless(zmq_supported) 329 def test_send_during_recv_multipart(self): 330 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) 331 eventlet.sleep() 332 333 num_recvs = 30 334 done_evts = [eventlet.Event() for _ in range(num_recvs)] 335 336 def slow_rx(done, msg): 337 self.assertEqual(sender.recv_multipart(), msg) 338 done.send(0) 339 340 def tx(): 341 tx_i = 0 342 while tx_i <= 1000: 343 sender.send_multipart([str(tx_i).encode(), b'1', b'2', b'3']) 344 tx_i += 1 345 346 def rx(): 347 while True: 348 rx_i = receiver.recv_multipart() 349 if rx_i == [b"1000", b'1', b'2', b'3']: 350 for i in range(num_recvs): 351 receiver.send_multipart([ 352 ('done%d' % i).encode(), b'a', b'b', b'c']) 353 eventlet.sleep() 354 return 355 356 for i in range(num_recvs): 357 eventlet.spawn(slow_rx, done_evts[i], [ 358 ("done%d" % i).encode(), b'a', b'b', b'c']) 359 360 eventlet.spawn(tx) 361 eventlet.spawn(rx) 362 for i in range(num_recvs): 363 final_i = done_evts[i].wait() 364 self.assertEqual(final_i, 0) 365 366 # Need someway to ensure a thread is blocked on send... This isn't working 367 @tests.skip_unless(zmq_supported) 368 def test_recv_during_send(self): 369 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) 370 eventlet.sleep() 371 372 done = eventlet.Event() 373 374 try: 375 SNDHWM = zmq.SNDHWM 376 except AttributeError: 377 # ZeroMQ <3.0 378 SNDHWM = zmq.HWM 379 380 sender.setsockopt(SNDHWM, 10) 381 sender.setsockopt(zmq.SNDBUF, 10) 382 383 receiver.setsockopt(zmq.RCVBUF, 10) 384 385 def tx(): 386 tx_i = 0 387 while tx_i <= 1000: 388 sender.send(str(tx_i).encode()) 389 tx_i += 1 390 done.send(0) 391 392 eventlet.spawn(tx) 393 final_i = done.wait() 394 self.assertEqual(final_i, 0) 395 396 @tests.skip_unless(zmq_supported) 397 def test_close_during_recv(self): 398 sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) 399 eventlet.sleep() 400 done1 = eventlet.Event() 401 done2 = eventlet.Event() 402 403 def rx(e): 404 self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, receiver.recv) 405 e.send() 406 407 eventlet.spawn(rx, done1) 408 eventlet.spawn(rx, done2) 409 410 eventlet.sleep() 411 receiver.close() 412 413 done1.wait() 414 done2.wait() 415 416 @tests.skip_unless(zmq_supported) 417 def test_getsockopt_events(self): 418 sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) 419 eventlet.sleep() 420 poll_out = zmq.Poller() 421 poll_out.register(sock1, zmq.POLLOUT) 422 sock_map = poll_out.poll(100) 423 self.assertEqual(len(sock_map), 1) 424 events = sock1.getsockopt(zmq.EVENTS) 425 self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT) 426 sock1.send(b'') 427 428 poll_in = zmq.Poller() 429 poll_in.register(sock2, zmq.POLLIN) 430 sock_map = poll_in.poll(100) 431 self.assertEqual(len(sock_map), 1) 432 events = sock2.getsockopt(zmq.EVENTS) 433 self.assertEqual(events & zmq.POLLIN, zmq.POLLIN) 434 435 @tests.skip_unless(zmq_supported) 436 def test_cpu_usage_after_bind(self): 437 """zmq eats CPU after PUB socket .bind() 438 439 https://bitbucket.org/eventlet/eventlet/issue/128 440 441 According to the ZeroMQ documentation, the socket file descriptor 442 can be readable without any pending messages. So we need to ensure 443 that Eventlet wraps around ZeroMQ sockets do not create busy loops. 444 445 A naive way to test it is to measure resource usage. This will require 446 some tuning to set appropriate acceptable limits. 447 """ 448 sock = self.context.socket(zmq.PUB) 449 self.sockets.append(sock) 450 sock.bind_to_random_port("tcp://127.0.0.1") 451 eventlet.sleep() 452 tests.check_idle_cpu_usage(0.2, 0.1) 453 454 @tests.skip_unless(zmq_supported) 455 def test_cpu_usage_after_pub_send_or_dealer_recv(self): 456 """zmq eats CPU after PUB send or DEALER recv. 457 458 Same https://bitbucket.org/eventlet/eventlet/issue/128 459 """ 460 pub, sub, _port = self.create_bound_pair(zmq.PUB, zmq.SUB) 461 sub.setsockopt(zmq.SUBSCRIBE, b"") 462 eventlet.sleep() 463 pub.send(b'test_send') 464 tests.check_idle_cpu_usage(0.2, 0.1) 465 466 sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) 467 eventlet.sleep() 468 sender.send(b'test_recv') 469 msg = receiver.recv() 470 self.assertEqual(msg, b'test_recv') 471 tests.check_idle_cpu_usage(0.2, 0.1) 472 473 474class TestQueueLock(tests.LimitedTestCase): 475 @tests.skip_unless(zmq_supported) 476 def test_queue_lock_order(self): 477 q = zmq._QueueLock() 478 s = eventlet.Semaphore(0) 479 results = [] 480 481 def lock(x): 482 with q: 483 results.append(x) 484 s.release() 485 486 q.acquire() 487 488 eventlet.spawn(lock, 1) 489 eventlet.sleep() 490 eventlet.spawn(lock, 2) 491 eventlet.sleep() 492 eventlet.spawn(lock, 3) 493 eventlet.sleep() 494 495 self.assertEqual(results, []) 496 q.release() 497 s.acquire() 498 s.acquire() 499 s.acquire() 500 self.assertEqual(results, [1, 2, 3]) 501 502 @tests.skip_unless(zmq_supported) 503 def test_count(self): 504 q = zmq._QueueLock() 505 self.assertFalse(q) 506 q.acquire() 507 self.assertTrue(q) 508 q.release() 509 self.assertFalse(q) 510 511 with q: 512 self.assertTrue(q) 513 self.assertFalse(q) 514 515 @tests.skip_unless(zmq_supported) 516 def test_errors(self): 517 q = zmq._QueueLock() 518 519 self.assertRaises(zmq.LockReleaseError, q.release) 520 521 q.acquire() 522 q.release() 523 524 self.assertRaises(zmq.LockReleaseError, q.release) 525 526 @tests.skip_unless(zmq_supported) 527 def test_nested_acquire(self): 528 q = zmq._QueueLock() 529 self.assertFalse(q) 530 q.acquire() 531 q.acquire() 532 533 s = eventlet.Semaphore(0) 534 results = [] 535 536 def lock(x): 537 with q: 538 results.append(x) 539 s.release() 540 541 eventlet.spawn(lock, 1) 542 eventlet.sleep() 543 self.assertEqual(results, []) 544 q.release() 545 eventlet.sleep() 546 self.assertEqual(results, []) 547 self.assertTrue(q) 548 q.release() 549 550 s.acquire() 551 self.assertEqual(results, [1]) 552 553 554class TestBlockedThread(tests.LimitedTestCase): 555 @tests.skip_unless(zmq_supported) 556 def test_block(self): 557 e = zmq._BlockedThread() 558 done = eventlet.Event() 559 self.assertFalse(e) 560 561 def block(): 562 e.block() 563 done.send(1) 564 565 eventlet.spawn(block) 566 eventlet.sleep() 567 568 self.assertFalse(done.has_result()) 569 e.wake() 570 done.wait() 571 572 573@contextlib.contextmanager 574def clean_context(): 575 ctx = zmq.Context() 576 eventlet.sleep() 577 yield ctx 578 ctx.destroy() 579 580 581@contextlib.contextmanager 582def clean_pair(type1, type2, interface='tcp://127.0.0.1'): 583 with clean_context() as ctx: 584 s1 = ctx.socket(type1) 585 port = s1.bind_to_random_port(interface) 586 s2 = ctx.socket(type2) 587 s2.connect('{0}:{1}'.format(interface, port)) 588 eventlet.sleep() 589 yield (s1, s2, port) 590 s1.close() 591 s2.close() 592 593 594@tests.skip_unless(zmq_supported) 595def test_recv_json_no_args(): 596 # https://github.com/eventlet/eventlet/issues/376 597 with clean_pair(zmq.REQ, zmq.REP) as (s1, s2, _): 598 eventlet.spawn(s1.send_json, {}) 599 s2.recv_json() 600 601 602@tests.skip_unless(zmq_supported) 603def test_recv_timeout(): 604 # https://github.com/eventlet/eventlet/issues/282 605 with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _): 606 sub.setsockopt(zmq.RCVTIMEO, 100) 607 try: 608 with eventlet.Timeout(1, False): 609 sub.recv() 610 assert False 611 except zmq.ZMQError as e: 612 assert eventlet.is_timeout(e) 613