1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 #include "testutil_unity.hpp"
32 
33 #include <string.h>
34 
35 #ifndef _WIN32
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <arpa/inet.h>
39 #include <unistd.h>
40 #endif
41 
42 // Helper macro to define the v4/v6 function pairs
43 #define MAKE_TEST_V4V6(_test)                                                  \
44     static void _test##_ipv4 () { _test (false); }                             \
45                                                                                \
46     static void _test##_ipv6 ()                                                \
47     {                                                                          \
48         if (!is_ipv6_available ()) {                                           \
49             TEST_IGNORE_MESSAGE ("ipv6 is not available");                     \
50         }                                                                      \
51         _test (true);                                                          \
52     }
53 
54 SETUP_TEARDOWN_TESTCONTEXT
55 
msg_send_expect_success(void * s_,const char * group_,const char * body_)56 void msg_send_expect_success (void *s_, const char *group_, const char *body_)
57 {
58     zmq_msg_t msg;
59     const size_t len = strlen (body_);
60     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, len));
61 
62     memcpy (zmq_msg_data (&msg), body_, len);
63 
64     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_group (&msg, group_));
65 
66     int rc = zmq_msg_send (&msg, s_, 0);
67     TEST_ASSERT_EQUAL_INT ((int) len, rc);
68 
69     // TODO isn't the msg closed by zmq_msg_send?
70     zmq_msg_close (&msg);
71 }
72 
msg_recv_cmp(void * s_,const char * group_,const char * body_)73 void msg_recv_cmp (void *s_, const char *group_, const char *body_)
74 {
75     zmq_msg_t msg;
76     const size_t len = strlen (body_);
77     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
78 
79     int recv_rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, s_, 0));
80     TEST_ASSERT_EQUAL_INT (len, recv_rc);
81 
82     TEST_ASSERT_EQUAL_STRING (group_, zmq_msg_group (&msg));
83 
84     TEST_ASSERT_EQUAL_STRING_LEN (body_, zmq_msg_data (&msg), len);
85 
86     zmq_msg_close (&msg);
87 }
88 
test_leave_unjoined_fails()89 void test_leave_unjoined_fails ()
90 {
91     void *dish = test_context_socket (ZMQ_DISH);
92 
93     //  Leaving a group which we didn't join
94     TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_leave (dish, "Movies"));
95 
96     test_context_socket_close (dish);
97 }
98 
test_long_group()99 void test_long_group ()
100 {
101     size_t len = MAX_SOCKET_STRING;
102     char my_endpoint[MAX_SOCKET_STRING];
103 
104     void *radio = test_context_socket (ZMQ_RADIO);
105     bind_loopback (radio, false, my_endpoint, len);
106 
107     void *dish = test_context_socket (ZMQ_DISH);
108 
109     // Joining to a long group, over 14 chars
110     char group[19] = "0123456789ABCDEFGH";
111     TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, group));
112 
113     // Connecting
114     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
115 
116     msleep (SETTLE_TIME);
117 
118     //  This is going to be sent to the dish
119     msg_send_expect_success (radio, group, "HELLO");
120 
121     //  Check the correct message arrived
122     msg_recv_cmp (dish, group, "HELLO");
123 
124     test_context_socket_close (dish);
125     test_context_socket_close (radio);
126 }
127 
test_join_too_long_fails()128 void test_join_too_long_fails ()
129 {
130     void *dish = test_context_socket (ZMQ_DISH);
131 
132     //  Joining too long group
133     char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2];
134     for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++)
135         too_long_group[index] = 'A';
136     too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0';
137     TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, too_long_group));
138 
139     test_context_socket_close (dish);
140 }
141 
test_join_twice_fails()142 void test_join_twice_fails ()
143 {
144     void *dish = test_context_socket (ZMQ_DISH);
145 
146     TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
147 
148     // Duplicate Joining
149     TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, "Movies"));
150 
151     test_context_socket_close (dish);
152 }
153 
test_radio_dish_tcp_poll(int ipv6_)154 void test_radio_dish_tcp_poll (int ipv6_)
155 {
156     size_t len = MAX_SOCKET_STRING;
157     char my_endpoint[MAX_SOCKET_STRING];
158 
159     void *radio = test_context_socket (ZMQ_RADIO);
160     bind_loopback (radio, ipv6_, my_endpoint, len);
161 
162     void *dish = test_context_socket (ZMQ_DISH);
163 
164     TEST_ASSERT_SUCCESS_ERRNO (
165       zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
166 
167     // Joining
168     TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
169 
170     // Connecting
171     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
172 
173     msleep (SETTLE_TIME);
174 
175     //  This is not going to be sent as dish only subscribe to "Movies"
176     msg_send_expect_success (radio, "TV", "Friends");
177 
178     //  This is going to be sent to the dish
179     msg_send_expect_success (radio, "Movies", "Godfather");
180 
181     //  Check the correct message arrived
182     msg_recv_cmp (dish, "Movies", "Godfather");
183 
184     //  Join group during connection optvallen
185     TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
186 
187     zmq_sleep (1);
188 
189     //  This should arrive now as we joined the group
190     msg_send_expect_success (radio, "TV", "Friends");
191 
192     //  Check the correct message arrived
193     msg_recv_cmp (dish, "TV", "Friends");
194 
195     //  Leaving group
196     TEST_ASSERT_SUCCESS_ERRNO (zmq_leave (dish, "TV"));
197 
198     zmq_sleep (1);
199 
200     //  This is not going to be sent as dish only subscribe to "Movies"
201     msg_send_expect_success (radio, "TV", "Friends");
202 
203     //  This is going to be sent to the dish
204     msg_send_expect_success (radio, "Movies", "Godfather");
205 
206     // test zmq_poll with dish
207     zmq_pollitem_t items[] = {
208       {radio, 0, ZMQ_POLLIN, 0}, // read publications
209       {dish, 0, ZMQ_POLLIN, 0},  // read subscriptions
210     };
211     int rc = zmq_poll (items, 2, 2000);
212     TEST_ASSERT_EQUAL_INT (1, rc);
213     TEST_ASSERT_EQUAL_INT (ZMQ_POLLIN, items[1].revents);
214 
215     //  Check the correct message arrived
216     msg_recv_cmp (dish, "Movies", "Godfather");
217 
218     test_context_socket_close (dish);
219     test_context_socket_close (radio);
220 }
MAKE_TEST_V4V6(test_radio_dish_tcp_poll)221 MAKE_TEST_V4V6 (test_radio_dish_tcp_poll)
222 
223 void test_dish_connect_fails (int ipv6_)
224 {
225     void *dish = test_context_socket (ZMQ_DISH);
226 
227     TEST_ASSERT_SUCCESS_ERRNO (
228       zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
229 
230     const char *url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
231 
232     //  Connecting dish should fail
233     TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, zmq_connect (dish, url));
234 
235     test_context_socket_close (dish);
236 }
MAKE_TEST_V4V6(test_dish_connect_fails)237 MAKE_TEST_V4V6 (test_dish_connect_fails)
238 
239 void test_radio_bind_fails (int ipv6_)
240 {
241     void *radio = test_context_socket (ZMQ_RADIO);
242 
243     TEST_ASSERT_SUCCESS_ERRNO (
244       zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
245 
246     //  Connecting dish should fail
247     //  Bind radio should fail
248     TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO,
249                                zmq_bind (radio, "udp://*:5556"));
250 
251     test_context_socket_close (radio);
252 }
MAKE_TEST_V4V6(test_radio_bind_fails)253 MAKE_TEST_V4V6 (test_radio_bind_fails)
254 
255 void test_radio_dish_udp (int ipv6_)
256 {
257     void *radio = test_context_socket (ZMQ_RADIO);
258     void *dish = test_context_socket (ZMQ_DISH);
259 
260     TEST_ASSERT_SUCCESS_ERRNO (
261       zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
262     TEST_ASSERT_SUCCESS_ERRNO (
263       zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
264 
265     const char *radio_url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
266 
267     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, "udp://*:5556"));
268     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, radio_url));
269 
270     msleep (SETTLE_TIME);
271 
272     TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
273 
274     msg_send_expect_success (radio, "TV", "Friends");
275     msg_recv_cmp (dish, "TV", "Friends");
276 
277     test_context_socket_close (dish);
278     test_context_socket_close (radio);
279 }
MAKE_TEST_V4V6(test_radio_dish_udp) const280 MAKE_TEST_V4V6 (test_radio_dish_udp)
281 
282 #define MCAST_IPV4 "226.8.5.5"
283 #define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"
284 
285 static const char *mcast_url (int ipv6_)
286 {
287     if (ipv6_) {
288         return "udp://[" MCAST_IPV6 "]:5555";
289     }
290     return "udp://" MCAST_IPV4 ":5555";
291 }
292 
293 //  OSX uses a different name for this socket option
294 #ifndef IPV6_ADD_MEMBERSHIP
295 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
296 #endif
297 
298 union sa_u
299 {
300     struct sockaddr generic;
301     struct sockaddr_in ipv4;
302     struct sockaddr_in6 ipv6;
303 };
304 
305 //  Test if multicast is available on this machine by attempting to
306 //  send a receive a multicast datagram
is_multicast_available(int ipv6_)307 static bool is_multicast_available (int ipv6_)
308 {
309     int family = ipv6_ ? AF_INET6 : AF_INET;
310     fd_t bind_sock = retired_fd;
311     fd_t send_sock = retired_fd;
312     int port = 5555;
313     bool success = false;
314     const char *msg = "it works";
315     char buf[32];
316     union sa_u any;
317     union sa_u mcast;
318     socklen_t sl;
319     int rc;
320 
321     if (ipv6_) {
322         struct sockaddr_in6 *any_ipv6 = &any.ipv6;
323         struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
324 
325         any_ipv6->sin6_family = AF_INET6;
326         any_ipv6->sin6_port = htons (port);
327         any_ipv6->sin6_flowinfo = 0;
328         any_ipv6->sin6_scope_id = 0;
329 
330         rc = test_inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr);
331         if (rc == 0) {
332             goto out;
333         }
334 
335         *mcast_ipv6 = *any_ipv6;
336 
337         rc = test_inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr);
338         if (rc == 0) {
339             goto out;
340         }
341 
342         sl = sizeof (*any_ipv6);
343     } else {
344         struct sockaddr_in *any_ipv4 = &any.ipv4;
345         struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
346 
347         any_ipv4->sin_family = AF_INET;
348         any_ipv4->sin_port = htons (5555);
349 
350         rc = test_inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr);
351         if (rc == 0) {
352             goto out;
353         }
354 
355         *mcast_ipv4 = *any_ipv4;
356 
357         rc = test_inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr);
358         if (rc == 0) {
359             goto out;
360         }
361 
362         sl = sizeof (*any_ipv4);
363     }
364 
365     bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
366     if (bind_sock < 0) {
367         goto out;
368     }
369 
370     send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
371     if (bind_sock < 0) {
372         goto out;
373     }
374 
375     rc = bind (bind_sock, &any.generic, sl);
376     if (rc < 0) {
377         goto out;
378     }
379 
380     if (ipv6_) {
381         struct ipv6_mreq mreq;
382         const sockaddr_in6 *const mcast_ipv6 = &mcast.ipv6;
383 
384         mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
385         mreq.ipv6mr_interface = 0;
386 
387         rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
388                          as_setsockopt_opt_t (&mreq), sizeof (mreq));
389         if (rc < 0) {
390             goto out;
391         }
392 
393         int loop = 1;
394         rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
395                          as_setsockopt_opt_t (&loop), sizeof (loop));
396         if (rc < 0) {
397             goto out;
398         }
399     } else {
400         struct ip_mreq mreq;
401         const sockaddr_in *const mcast_ipv4 = &mcast.ipv4;
402 
403         mreq.imr_multiaddr = mcast_ipv4->sin_addr;
404         mreq.imr_interface.s_addr = htonl (INADDR_ANY);
405 
406         rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
407                          as_setsockopt_opt_t (&mreq), sizeof (mreq));
408         if (rc < 0) {
409             goto out;
410         }
411 
412         int loop = 1;
413         rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP,
414                          as_setsockopt_opt_t (&loop), sizeof (loop));
415         if (rc < 0) {
416             goto out;
417         }
418     }
419 
420     msleep (SETTLE_TIME);
421 
422     rc = sendto (send_sock, msg, static_cast<socklen_t> (strlen (msg)), 0,
423                  &mcast.generic, sl);
424     if (rc < 0) {
425         goto out;
426     }
427 
428     msleep (SETTLE_TIME);
429 
430     rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0);
431     if (rc < 0) {
432         goto out;
433     }
434 
435     buf[rc] = '\0';
436 
437     success = (strcmp (msg, buf) == 0);
438 
439 out:
440     if (bind_sock >= 0) {
441         close (bind_sock);
442     }
443 
444     if (send_sock >= 0) {
445         close (send_sock);
446     }
447 
448     return success;
449 }
450 
ignore_if_unavailable(int ipv6_)451 static void ignore_if_unavailable (int ipv6_)
452 {
453     if (ipv6_ && !is_ipv6_available ())
454         TEST_IGNORE_MESSAGE ("No IPV6 available");
455     if (!is_multicast_available (ipv6_))
456         TEST_IGNORE_MESSAGE ("No multicast available");
457 }
458 
test_radio_dish_mcast(int ipv6_)459 static void test_radio_dish_mcast (int ipv6_)
460 {
461     ignore_if_unavailable (ipv6_);
462 
463     void *radio = test_context_socket (ZMQ_RADIO);
464     void *dish = test_context_socket (ZMQ_DISH);
465 
466     TEST_ASSERT_SUCCESS_ERRNO (
467       zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
468     TEST_ASSERT_SUCCESS_ERRNO (
469       zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
470 
471     const char *url = mcast_url (ipv6_);
472 
473     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
474     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
475 
476     msleep (SETTLE_TIME);
477 
478     TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
479 
480     msg_send_expect_success (radio, "TV", "Friends");
481     msg_recv_cmp (dish, "TV", "Friends");
482 
483     test_context_socket_close (dish);
484     test_context_socket_close (radio);
485 }
MAKE_TEST_V4V6(test_radio_dish_mcast)486 MAKE_TEST_V4V6 (test_radio_dish_mcast)
487 
488 static void test_radio_dish_no_loop (int ipv6_)
489 {
490 #ifdef _WIN32
491     TEST_IGNORE_MESSAGE (
492       "ZMQ_MULTICAST_LOOP=false does not appear to work on Windows (TODO)");
493 #endif
494     ignore_if_unavailable (ipv6_);
495 
496     void *radio = test_context_socket (ZMQ_RADIO);
497     void *dish = test_context_socket (ZMQ_DISH);
498 
499     TEST_ASSERT_SUCCESS_ERRNO (
500       zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
501     TEST_ASSERT_SUCCESS_ERRNO (
502       zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
503 
504     //  Disable multicast loop for radio
505     int loop = 0;
506     TEST_ASSERT_SUCCESS_ERRNO (
507       zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int)));
508 
509     const char *url = mcast_url (ipv6_);
510 
511     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
512     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
513 
514     msleep (SETTLE_TIME);
515 
516     TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
517 
518     msg_send_expect_success (radio, "TV", "Friends");
519 
520     // Looping is disabled, we shouldn't receive anything
521     msleep (SETTLE_TIME);
522 
523     TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dish, NULL, 0, ZMQ_DONTWAIT));
524 
525     test_context_socket_close (dish);
526     test_context_socket_close (radio);
527 }
MAKE_TEST_V4V6(test_radio_dish_no_loop)528 MAKE_TEST_V4V6 (test_radio_dish_no_loop)
529 
530 int main (void)
531 {
532     setup_test_environment ();
533 
534     UNITY_BEGIN ();
535     RUN_TEST (test_leave_unjoined_fails);
536     RUN_TEST (test_join_too_long_fails);
537     RUN_TEST (test_long_group);
538     RUN_TEST (test_join_twice_fails);
539     RUN_TEST (test_radio_bind_fails_ipv4);
540     RUN_TEST (test_radio_bind_fails_ipv6);
541     RUN_TEST (test_dish_connect_fails_ipv4);
542     RUN_TEST (test_dish_connect_fails_ipv6);
543     RUN_TEST (test_radio_dish_tcp_poll_ipv4);
544     RUN_TEST (test_radio_dish_tcp_poll_ipv6);
545     RUN_TEST (test_radio_dish_udp_ipv4);
546     RUN_TEST (test_radio_dish_udp_ipv6);
547 
548     RUN_TEST (test_radio_dish_mcast_ipv4);
549     RUN_TEST (test_radio_dish_no_loop_ipv4);
550 
551     RUN_TEST (test_radio_dish_mcast_ipv6);
552     RUN_TEST (test_radio_dish_no_loop_ipv6);
553 
554     return UNITY_END ();
555 }
556