1import contextlib
2
3try:
4    from eventlet.green import zmq
5except ImportError:
6    zmq = {}    # for systems lacking zmq, skips tests instead of barfing
7else:
8    RECV_ON_CLOSED_SOCKET_ERRNOS = (zmq.ENOTSUP, zmq.ENOTSOCK)
9
10import eventlet
11import tests
12
13
14def zmq_supported(_):
15    try:
16        import zmq
17    except ImportError:
18        return False
19    return not tests.using_pyevent(_)
20
21
22class TestUpstreamDownStream(tests.LimitedTestCase):
23    TEST_TIMEOUT = 2
24
25    @tests.skip_unless(zmq_supported)
26    def setUp(self):
27        super(TestUpstreamDownStream, self).setUp()
28        self.context = zmq.Context()
29        self.sockets = []
30
31    @tests.skip_unless(zmq_supported)
32    def tearDown(self):
33        self.clear_up_sockets()
34        super(TestUpstreamDownStream, self).tearDown()
35
36    def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'):
37        """Create a bound socket pair using a random port."""
38        s1 = self.context.socket(type1)
39        port = s1.bind_to_random_port(interface)
40        s2 = self.context.socket(type2)
41        s2.connect('%s:%s' % (interface, port))
42        self.sockets.append(s1)
43        self.sockets.append(s2)
44        return s1, s2, port
45
46    def clear_up_sockets(self):
47        for sock in self.sockets:
48            sock.close()
49        self.sockets = None
50        self.context.destroy(0)
51
52    def assertRaisesErrno(self, errnos, func, *args):
53        try:
54            func(*args)
55        except zmq.ZMQError as e:
56            if not hasattr(errnos, '__iter__'):
57                errnos = (errnos,)
58
59            if e.errno not in errnos:
60                raise AssertionError(
61                    "wrong error raised, expected one of ['%s'], got '%s'" % (
62                        ", ".join("%s" % zmq.ZMQError(errno) for errno in errnos),
63                        zmq.ZMQError(e.errno)
64                    ),
65                )
66        else:
67            self.fail("Function did not raise any error")
68
69    @tests.skip_unless(zmq_supported)
70    def test_close_linger(self):
71        """Socket.close() must support linger argument.
72
73        https://github.com/eventlet/eventlet/issues/9
74        """
75        sock1, sock2, _ = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
76        sock1.close(1)
77        sock2.close(linger=0)
78
79    @tests.skip_unless(zmq_supported)
80    def test_recv_spawned_before_send_is_non_blocking(self):
81        req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
82#       req.connect(ipc)
83#       rep.bind(ipc)
84        eventlet.sleep()
85        msg = dict(res=None)
86        done = eventlet.Event()
87
88        def rx():
89            msg['res'] = rep.recv()
90            done.send('done')
91
92        eventlet.spawn(rx)
93        req.send(b'test')
94        done.wait()
95        self.assertEqual(msg['res'], b'test')
96
97    @tests.skip_unless(zmq_supported)
98    def test_close_socket_raises_enotsup(self):
99        req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
100
101        rep.close()
102        req.close()
103        self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
104        self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
105
106    @tests.skip_unless(zmq_supported)
107    def test_close_xsocket_raises_enotsup(self):
108        req, rep, port = self.create_bound_pair(zmq.XREQ, zmq.XREP)
109
110        rep.close()
111        req.close()
112        self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, rep.recv)
113        self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, req.send, b'test')
114
115    @tests.skip_unless(zmq_supported)
116    def test_send_1k_req_rep(self):
117        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
118        eventlet.sleep()
119        done = eventlet.Event()
120
121        def tx():
122            tx_i = 0
123            req.send(str(tx_i).encode())
124            while req.recv() != b'done':
125                tx_i += 1
126                req.send(str(tx_i).encode())
127            done.send(0)
128
129        def rx():
130            while True:
131                rx_i = rep.recv()
132                if rx_i == b"1000":
133                    rep.send(b'done')
134                    break
135                rep.send(b'i')
136        eventlet.spawn(tx)
137        eventlet.spawn(rx)
138        final_i = done.wait()
139        self.assertEqual(final_i, 0)
140
141    @tests.skip_unless(zmq_supported)
142    def test_send_1k_push_pull(self):
143        down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
144        eventlet.sleep()
145
146        done = eventlet.Event()
147
148        def tx():
149            tx_i = 0
150            while tx_i <= 1000:
151                tx_i += 1
152                down.send(str(tx_i).encode())
153
154        def rx():
155            while True:
156                rx_i = up.recv()
157                if rx_i == b"1000":
158                    done.send(0)
159                    break
160        eventlet.spawn(tx)
161        eventlet.spawn(rx)
162        final_i = done.wait()
163        self.assertEqual(final_i, 0)
164
165    @tests.skip_unless(zmq_supported)
166    def test_send_1k_pub_sub(self):
167        pub, sub_all, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
168        sub1 = self.context.socket(zmq.SUB)
169        sub2 = self.context.socket(zmq.SUB)
170        self.sockets.extend([sub1, sub2])
171        addr = 'tcp://127.0.0.1:%s' % port
172        sub1.connect(addr)
173        sub2.connect(addr)
174        sub_all.setsockopt(zmq.SUBSCRIBE, b'')
175        sub1.setsockopt(zmq.SUBSCRIBE, b'sub1')
176        sub2.setsockopt(zmq.SUBSCRIBE, b'sub2')
177
178        sub_all_done = eventlet.Event()
179        sub1_done = eventlet.Event()
180        sub2_done = eventlet.Event()
181
182        eventlet.sleep(0.2)
183
184        def rx(sock, done_evt, msg_count=10000):
185            count = 0
186            while count < msg_count:
187                msg = sock.recv()
188                eventlet.sleep()
189                if b'LAST' in msg:
190                    break
191                count += 1
192
193            done_evt.send(count)
194
195        def tx(sock):
196            for i in range(1, 1001):
197                msg = ("sub%s %s" % ([2, 1][i % 2], i)).encode()
198                sock.send(msg)
199                eventlet.sleep()
200            sock.send(b'sub1 LAST')
201            sock.send(b'sub2 LAST')
202
203        eventlet.spawn(rx, sub_all, sub_all_done)
204        eventlet.spawn(rx, sub1, sub1_done)
205        eventlet.spawn(rx, sub2, sub2_done)
206        eventlet.spawn(tx, pub)
207        sub1_count = sub1_done.wait()
208        sub2_count = sub2_done.wait()
209        sub_all_count = sub_all_done.wait()
210        self.assertEqual(sub1_count, 500)
211        self.assertEqual(sub2_count, 500)
212        self.assertEqual(sub_all_count, 1000)
213
214    @tests.skip_unless(zmq_supported)
215    def test_change_subscription(self):
216        # FIXME: Extensive testing showed this particular test is the root cause
217        # of sporadic failures on Travis.
218        pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
219        sub.setsockopt(zmq.SUBSCRIBE, b'test')
220        eventlet.sleep(0)
221        sub_ready = eventlet.Event()
222        sub_last = eventlet.Event()
223        sub_done = eventlet.Event()
224
225        def rx():
226            while sub.recv() != b'test BEGIN':
227                eventlet.sleep(0)
228            sub_ready.send()
229            count = 0
230            while True:
231                msg = sub.recv()
232                if msg == b'test BEGIN':
233                    # BEGIN may come many times
234                    continue
235                if msg == b'test LAST':
236                    sub.setsockopt(zmq.SUBSCRIBE, b'done')
237                    sub.setsockopt(zmq.UNSUBSCRIBE, b'test')
238                    eventlet.sleep(0)
239                    # In real application you should either sync
240                    # or tolerate loss of messages.
241                    sub_last.send()
242                if msg == b'done DONE':
243                    break
244                count += 1
245            sub_done.send(count)
246
247        def tx():
248            # Sync receiver ready to avoid loss of first packets
249            while not sub_ready.ready():
250                pub.send(b'test BEGIN')
251                eventlet.sleep(0.005)
252            for i in range(1, 101):
253                msg = 'test {0}'.format(i).encode()
254                if i != 50:
255                    pub.send(msg)
256                else:
257                    pub.send(b'test LAST')
258                    sub_last.wait()
259                # XXX: putting a real delay of 1ms here fixes sporadic failures on Travis
260                # just yield eventlet.sleep(0) doesn't cut it
261                eventlet.sleep(0.001)
262            pub.send(b'done DONE')
263
264        eventlet.spawn(rx)
265        eventlet.spawn(tx)
266        rx_count = sub_done.wait()
267        self.assertEqual(rx_count, 50)
268
269    @tests.skip_unless(zmq_supported)
270    def test_recv_multipart_bug68(self):
271        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
272        msg = [b'']
273        req.send_multipart(msg)
274        recieved_msg = rep.recv_multipart()
275        self.assertEqual(recieved_msg, msg)
276
277        # Send a message back the other way
278        msg2 = [b""]
279        rep.send_multipart(msg2, copy=False)
280        # When receiving a copy it's a zmq.core.message.Message you get back
281        recieved_msg = req.recv_multipart(copy=False)
282        # So it needs to be converted to a string
283        # I'm calling str(m) consciously here; Message has a .data attribute
284        # but it's private __str__ appears to be the way to go
285        self.assertEqual([m.bytes for m in recieved_msg], msg2)
286
287    @tests.skip_unless(zmq_supported)
288    def test_recv_noblock_bug76(self):
289        req, rep, port = self.create_bound_pair(zmq.REQ, zmq.REP)
290        self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK)
291        self.assertRaisesErrno(zmq.EAGAIN, rep.recv, zmq.NOBLOCK, True)
292
293    @tests.skip_unless(zmq_supported)
294    def test_send_during_recv(self):
295        sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
296        eventlet.sleep()
297
298        num_recvs = 30
299        done_evts = [eventlet.Event() for _ in range(num_recvs)]
300
301        def slow_rx(done, msg):
302            self.assertEqual(sender.recv(), msg)
303            done.send(0)
304
305        def tx():
306            tx_i = 0
307            while tx_i <= 1000:
308                sender.send(str(tx_i).encode())
309                tx_i += 1
310
311        def rx():
312            while True:
313                rx_i = receiver.recv()
314                if rx_i == b"1000":
315                    for i in range(num_recvs):
316                        receiver.send(('done%d' % i).encode())
317                    eventlet.sleep()
318                    return
319
320        for i in range(num_recvs):
321            eventlet.spawn(slow_rx, done_evts[i], ("done%d" % i).encode())
322
323        eventlet.spawn(tx)
324        eventlet.spawn(rx)
325        for evt in done_evts:
326            self.assertEqual(evt.wait(), 0)
327
328    @tests.skip_unless(zmq_supported)
329    def test_send_during_recv_multipart(self):
330        sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
331        eventlet.sleep()
332
333        num_recvs = 30
334        done_evts = [eventlet.Event() for _ in range(num_recvs)]
335
336        def slow_rx(done, msg):
337            self.assertEqual(sender.recv_multipart(), msg)
338            done.send(0)
339
340        def tx():
341            tx_i = 0
342            while tx_i <= 1000:
343                sender.send_multipart([str(tx_i).encode(), b'1', b'2', b'3'])
344                tx_i += 1
345
346        def rx():
347            while True:
348                rx_i = receiver.recv_multipart()
349                if rx_i == [b"1000", b'1', b'2', b'3']:
350                    for i in range(num_recvs):
351                        receiver.send_multipart([
352                            ('done%d' % i).encode(), b'a', b'b', b'c'])
353                    eventlet.sleep()
354                    return
355
356        for i in range(num_recvs):
357            eventlet.spawn(slow_rx, done_evts[i], [
358                ("done%d" % i).encode(), b'a', b'b', b'c'])
359
360        eventlet.spawn(tx)
361        eventlet.spawn(rx)
362        for i in range(num_recvs):
363            final_i = done_evts[i].wait()
364            self.assertEqual(final_i, 0)
365
366    # Need someway to ensure a thread is blocked on send... This isn't working
367    @tests.skip_unless(zmq_supported)
368    def test_recv_during_send(self):
369        sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
370        eventlet.sleep()
371
372        done = eventlet.Event()
373
374        try:
375            SNDHWM = zmq.SNDHWM
376        except AttributeError:
377            # ZeroMQ <3.0
378            SNDHWM = zmq.HWM
379
380        sender.setsockopt(SNDHWM, 10)
381        sender.setsockopt(zmq.SNDBUF, 10)
382
383        receiver.setsockopt(zmq.RCVBUF, 10)
384
385        def tx():
386            tx_i = 0
387            while tx_i <= 1000:
388                sender.send(str(tx_i).encode())
389                tx_i += 1
390            done.send(0)
391
392        eventlet.spawn(tx)
393        final_i = done.wait()
394        self.assertEqual(final_i, 0)
395
396    @tests.skip_unless(zmq_supported)
397    def test_close_during_recv(self):
398        sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
399        eventlet.sleep()
400        done1 = eventlet.Event()
401        done2 = eventlet.Event()
402
403        def rx(e):
404            self.assertRaisesErrno(RECV_ON_CLOSED_SOCKET_ERRNOS, receiver.recv)
405            e.send()
406
407        eventlet.spawn(rx, done1)
408        eventlet.spawn(rx, done2)
409
410        eventlet.sleep()
411        receiver.close()
412
413        done1.wait()
414        done2.wait()
415
416    @tests.skip_unless(zmq_supported)
417    def test_getsockopt_events(self):
418        sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
419        eventlet.sleep()
420        poll_out = zmq.Poller()
421        poll_out.register(sock1, zmq.POLLOUT)
422        sock_map = poll_out.poll(100)
423        self.assertEqual(len(sock_map), 1)
424        events = sock1.getsockopt(zmq.EVENTS)
425        self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
426        sock1.send(b'')
427
428        poll_in = zmq.Poller()
429        poll_in.register(sock2, zmq.POLLIN)
430        sock_map = poll_in.poll(100)
431        self.assertEqual(len(sock_map), 1)
432        events = sock2.getsockopt(zmq.EVENTS)
433        self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
434
435    @tests.skip_unless(zmq_supported)
436    def test_cpu_usage_after_bind(self):
437        """zmq eats CPU after PUB socket .bind()
438
439        https://bitbucket.org/eventlet/eventlet/issue/128
440
441        According to the ZeroMQ documentation, the socket file descriptor
442        can be readable without any pending messages. So we need to ensure
443        that Eventlet wraps around ZeroMQ sockets do not create busy loops.
444
445        A naive way to test it is to measure resource usage. This will require
446        some tuning to set appropriate acceptable limits.
447        """
448        sock = self.context.socket(zmq.PUB)
449        self.sockets.append(sock)
450        sock.bind_to_random_port("tcp://127.0.0.1")
451        eventlet.sleep()
452        tests.check_idle_cpu_usage(0.2, 0.1)
453
454    @tests.skip_unless(zmq_supported)
455    def test_cpu_usage_after_pub_send_or_dealer_recv(self):
456        """zmq eats CPU after PUB send or DEALER recv.
457
458        Same https://bitbucket.org/eventlet/eventlet/issue/128
459        """
460        pub, sub, _port = self.create_bound_pair(zmq.PUB, zmq.SUB)
461        sub.setsockopt(zmq.SUBSCRIBE, b"")
462        eventlet.sleep()
463        pub.send(b'test_send')
464        tests.check_idle_cpu_usage(0.2, 0.1)
465
466        sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
467        eventlet.sleep()
468        sender.send(b'test_recv')
469        msg = receiver.recv()
470        self.assertEqual(msg, b'test_recv')
471        tests.check_idle_cpu_usage(0.2, 0.1)
472
473
474class TestQueueLock(tests.LimitedTestCase):
475    @tests.skip_unless(zmq_supported)
476    def test_queue_lock_order(self):
477        q = zmq._QueueLock()
478        s = eventlet.Semaphore(0)
479        results = []
480
481        def lock(x):
482            with q:
483                results.append(x)
484            s.release()
485
486        q.acquire()
487
488        eventlet.spawn(lock, 1)
489        eventlet.sleep()
490        eventlet.spawn(lock, 2)
491        eventlet.sleep()
492        eventlet.spawn(lock, 3)
493        eventlet.sleep()
494
495        self.assertEqual(results, [])
496        q.release()
497        s.acquire()
498        s.acquire()
499        s.acquire()
500        self.assertEqual(results, [1, 2, 3])
501
502    @tests.skip_unless(zmq_supported)
503    def test_count(self):
504        q = zmq._QueueLock()
505        self.assertFalse(q)
506        q.acquire()
507        self.assertTrue(q)
508        q.release()
509        self.assertFalse(q)
510
511        with q:
512            self.assertTrue(q)
513        self.assertFalse(q)
514
515    @tests.skip_unless(zmq_supported)
516    def test_errors(self):
517        q = zmq._QueueLock()
518
519        self.assertRaises(zmq.LockReleaseError, q.release)
520
521        q.acquire()
522        q.release()
523
524        self.assertRaises(zmq.LockReleaseError, q.release)
525
526    @tests.skip_unless(zmq_supported)
527    def test_nested_acquire(self):
528        q = zmq._QueueLock()
529        self.assertFalse(q)
530        q.acquire()
531        q.acquire()
532
533        s = eventlet.Semaphore(0)
534        results = []
535
536        def lock(x):
537            with q:
538                results.append(x)
539            s.release()
540
541        eventlet.spawn(lock, 1)
542        eventlet.sleep()
543        self.assertEqual(results, [])
544        q.release()
545        eventlet.sleep()
546        self.assertEqual(results, [])
547        self.assertTrue(q)
548        q.release()
549
550        s.acquire()
551        self.assertEqual(results, [1])
552
553
554class TestBlockedThread(tests.LimitedTestCase):
555    @tests.skip_unless(zmq_supported)
556    def test_block(self):
557        e = zmq._BlockedThread()
558        done = eventlet.Event()
559        self.assertFalse(e)
560
561        def block():
562            e.block()
563            done.send(1)
564
565        eventlet.spawn(block)
566        eventlet.sleep()
567
568        self.assertFalse(done.has_result())
569        e.wake()
570        done.wait()
571
572
573@contextlib.contextmanager
574def clean_context():
575    ctx = zmq.Context()
576    eventlet.sleep()
577    yield ctx
578    ctx.destroy()
579
580
581@contextlib.contextmanager
582def clean_pair(type1, type2, interface='tcp://127.0.0.1'):
583    with clean_context() as ctx:
584        s1 = ctx.socket(type1)
585        port = s1.bind_to_random_port(interface)
586        s2 = ctx.socket(type2)
587        s2.connect('{0}:{1}'.format(interface, port))
588        eventlet.sleep()
589        yield (s1, s2, port)
590        s1.close()
591        s2.close()
592
593
594@tests.skip_unless(zmq_supported)
595def test_recv_json_no_args():
596    # https://github.com/eventlet/eventlet/issues/376
597    with clean_pair(zmq.REQ, zmq.REP) as (s1, s2, _):
598        eventlet.spawn(s1.send_json, {})
599        s2.recv_json()
600
601
602@tests.skip_unless(zmq_supported)
603def test_recv_timeout():
604    # https://github.com/eventlet/eventlet/issues/282
605    with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _):
606        sub.setsockopt(zmq.RCVTIMEO, 100)
607        try:
608            with eventlet.Timeout(1, False):
609                sub.recv()
610            assert False
611        except zmq.ZMQError as e:
612            assert eventlet.is_timeout(e)
613