1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "testutil.hpp"
31 #include "testutil_unity.hpp"
32
33 #include <string.h>
34
35 #ifndef _WIN32
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <arpa/inet.h>
39 #include <unistd.h>
40 #endif
41
42 // Helper macro to define the v4/v6 function pairs
43 #define MAKE_TEST_V4V6(_test) \
44 static void _test##_ipv4 () { _test (false); } \
45 \
46 static void _test##_ipv6 () \
47 { \
48 if (!is_ipv6_available ()) { \
49 TEST_IGNORE_MESSAGE ("ipv6 is not available"); \
50 } \
51 _test (true); \
52 }
53
54 SETUP_TEARDOWN_TESTCONTEXT
55
msg_send_expect_success(void * s_,const char * group_,const char * body_)56 void msg_send_expect_success (void *s_, const char *group_, const char *body_)
57 {
58 zmq_msg_t msg;
59 const size_t len = strlen (body_);
60 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, len));
61
62 memcpy (zmq_msg_data (&msg), body_, len);
63
64 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_group (&msg, group_));
65
66 int rc = zmq_msg_send (&msg, s_, 0);
67 TEST_ASSERT_EQUAL_INT ((int) len, rc);
68
69 // TODO isn't the msg closed by zmq_msg_send?
70 zmq_msg_close (&msg);
71 }
72
msg_recv_cmp(void * s_,const char * group_,const char * body_)73 void msg_recv_cmp (void *s_, const char *group_, const char *body_)
74 {
75 zmq_msg_t msg;
76 const size_t len = strlen (body_);
77 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
78
79 int recv_rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, s_, 0));
80 TEST_ASSERT_EQUAL_INT (len, recv_rc);
81
82 TEST_ASSERT_EQUAL_STRING (group_, zmq_msg_group (&msg));
83
84 TEST_ASSERT_EQUAL_STRING_LEN (body_, zmq_msg_data (&msg), len);
85
86 zmq_msg_close (&msg);
87 }
88
test_leave_unjoined_fails()89 void test_leave_unjoined_fails ()
90 {
91 void *dish = test_context_socket (ZMQ_DISH);
92
93 // Leaving a group which we didn't join
94 TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_leave (dish, "Movies"));
95
96 test_context_socket_close (dish);
97 }
98
test_long_group()99 void test_long_group ()
100 {
101 size_t len = MAX_SOCKET_STRING;
102 char my_endpoint[MAX_SOCKET_STRING];
103
104 void *radio = test_context_socket (ZMQ_RADIO);
105 bind_loopback (radio, false, my_endpoint, len);
106
107 void *dish = test_context_socket (ZMQ_DISH);
108
109 // Joining to a long group, over 14 chars
110 char group[19] = "0123456789ABCDEFGH";
111 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, group));
112
113 // Connecting
114 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
115
116 msleep (SETTLE_TIME);
117
118 // This is going to be sent to the dish
119 msg_send_expect_success (radio, group, "HELLO");
120
121 // Check the correct message arrived
122 msg_recv_cmp (dish, group, "HELLO");
123
124 test_context_socket_close (dish);
125 test_context_socket_close (radio);
126 }
127
test_join_too_long_fails()128 void test_join_too_long_fails ()
129 {
130 void *dish = test_context_socket (ZMQ_DISH);
131
132 // Joining too long group
133 char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2];
134 for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++)
135 too_long_group[index] = 'A';
136 too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0';
137 TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, too_long_group));
138
139 test_context_socket_close (dish);
140 }
141
test_join_twice_fails()142 void test_join_twice_fails ()
143 {
144 void *dish = test_context_socket (ZMQ_DISH);
145
146 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
147
148 // Duplicate Joining
149 TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, "Movies"));
150
151 test_context_socket_close (dish);
152 }
153
test_radio_dish_tcp_poll(int ipv6_)154 void test_radio_dish_tcp_poll (int ipv6_)
155 {
156 size_t len = MAX_SOCKET_STRING;
157 char my_endpoint[MAX_SOCKET_STRING];
158
159 void *radio = test_context_socket (ZMQ_RADIO);
160 bind_loopback (radio, ipv6_, my_endpoint, len);
161
162 void *dish = test_context_socket (ZMQ_DISH);
163
164 TEST_ASSERT_SUCCESS_ERRNO (
165 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
166
167 // Joining
168 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
169
170 // Connecting
171 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
172
173 msleep (SETTLE_TIME);
174
175 // This is not going to be sent as dish only subscribe to "Movies"
176 msg_send_expect_success (radio, "TV", "Friends");
177
178 // This is going to be sent to the dish
179 msg_send_expect_success (radio, "Movies", "Godfather");
180
181 // Check the correct message arrived
182 msg_recv_cmp (dish, "Movies", "Godfather");
183
184 // Join group during connection optvallen
185 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
186
187 zmq_sleep (1);
188
189 // This should arrive now as we joined the group
190 msg_send_expect_success (radio, "TV", "Friends");
191
192 // Check the correct message arrived
193 msg_recv_cmp (dish, "TV", "Friends");
194
195 // Leaving group
196 TEST_ASSERT_SUCCESS_ERRNO (zmq_leave (dish, "TV"));
197
198 zmq_sleep (1);
199
200 // This is not going to be sent as dish only subscribe to "Movies"
201 msg_send_expect_success (radio, "TV", "Friends");
202
203 // This is going to be sent to the dish
204 msg_send_expect_success (radio, "Movies", "Godfather");
205
206 // test zmq_poll with dish
207 zmq_pollitem_t items[] = {
208 {radio, 0, ZMQ_POLLIN, 0}, // read publications
209 {dish, 0, ZMQ_POLLIN, 0}, // read subscriptions
210 };
211 int rc = zmq_poll (items, 2, 2000);
212 TEST_ASSERT_EQUAL_INT (1, rc);
213 TEST_ASSERT_EQUAL_INT (ZMQ_POLLIN, items[1].revents);
214
215 // Check the correct message arrived
216 msg_recv_cmp (dish, "Movies", "Godfather");
217
218 test_context_socket_close (dish);
219 test_context_socket_close (radio);
220 }
MAKE_TEST_V4V6(test_radio_dish_tcp_poll)221 MAKE_TEST_V4V6 (test_radio_dish_tcp_poll)
222
223 void test_dish_connect_fails (int ipv6_)
224 {
225 void *dish = test_context_socket (ZMQ_DISH);
226
227 TEST_ASSERT_SUCCESS_ERRNO (
228 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
229
230 const char *url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
231
232 // Connecting dish should fail
233 TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, zmq_connect (dish, url));
234
235 test_context_socket_close (dish);
236 }
MAKE_TEST_V4V6(test_dish_connect_fails)237 MAKE_TEST_V4V6 (test_dish_connect_fails)
238
239 void test_radio_bind_fails (int ipv6_)
240 {
241 void *radio = test_context_socket (ZMQ_RADIO);
242
243 TEST_ASSERT_SUCCESS_ERRNO (
244 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
245
246 // Connecting dish should fail
247 // Bind radio should fail
248 TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO,
249 zmq_bind (radio, "udp://*:5556"));
250
251 test_context_socket_close (radio);
252 }
MAKE_TEST_V4V6(test_radio_bind_fails)253 MAKE_TEST_V4V6 (test_radio_bind_fails)
254
255 void test_radio_dish_udp (int ipv6_)
256 {
257 void *radio = test_context_socket (ZMQ_RADIO);
258 void *dish = test_context_socket (ZMQ_DISH);
259
260 TEST_ASSERT_SUCCESS_ERRNO (
261 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
262 TEST_ASSERT_SUCCESS_ERRNO (
263 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
264
265 const char *radio_url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
266
267 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, "udp://*:5556"));
268 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, radio_url));
269
270 msleep (SETTLE_TIME);
271
272 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
273
274 msg_send_expect_success (radio, "TV", "Friends");
275 msg_recv_cmp (dish, "TV", "Friends");
276
277 test_context_socket_close (dish);
278 test_context_socket_close (radio);
279 }
MAKE_TEST_V4V6(test_radio_dish_udp) const280 MAKE_TEST_V4V6 (test_radio_dish_udp)
281
282 #define MCAST_IPV4 "226.8.5.5"
283 #define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"
284
285 static const char *mcast_url (int ipv6_)
286 {
287 if (ipv6_) {
288 return "udp://[" MCAST_IPV6 "]:5555";
289 }
290 return "udp://" MCAST_IPV4 ":5555";
291 }
292
293 // OSX uses a different name for this socket option
294 #ifndef IPV6_ADD_MEMBERSHIP
295 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
296 #endif
297
298 union sa_u
299 {
300 struct sockaddr generic;
301 struct sockaddr_in ipv4;
302 struct sockaddr_in6 ipv6;
303 };
304
305 // Test if multicast is available on this machine by attempting to
306 // send a receive a multicast datagram
is_multicast_available(int ipv6_)307 static bool is_multicast_available (int ipv6_)
308 {
309 int family = ipv6_ ? AF_INET6 : AF_INET;
310 fd_t bind_sock = retired_fd;
311 fd_t send_sock = retired_fd;
312 int port = 5555;
313 bool success = false;
314 const char *msg = "it works";
315 char buf[32];
316 union sa_u any;
317 union sa_u mcast;
318 socklen_t sl;
319 int rc;
320
321 if (ipv6_) {
322 struct sockaddr_in6 *any_ipv6 = &any.ipv6;
323 struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
324
325 any_ipv6->sin6_family = AF_INET6;
326 any_ipv6->sin6_port = htons (port);
327 any_ipv6->sin6_flowinfo = 0;
328 any_ipv6->sin6_scope_id = 0;
329
330 rc = test_inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr);
331 if (rc == 0) {
332 goto out;
333 }
334
335 *mcast_ipv6 = *any_ipv6;
336
337 rc = test_inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr);
338 if (rc == 0) {
339 goto out;
340 }
341
342 sl = sizeof (*any_ipv6);
343 } else {
344 struct sockaddr_in *any_ipv4 = &any.ipv4;
345 struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
346
347 any_ipv4->sin_family = AF_INET;
348 any_ipv4->sin_port = htons (5555);
349
350 rc = test_inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr);
351 if (rc == 0) {
352 goto out;
353 }
354
355 *mcast_ipv4 = *any_ipv4;
356
357 rc = test_inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr);
358 if (rc == 0) {
359 goto out;
360 }
361
362 sl = sizeof (*any_ipv4);
363 }
364
365 bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
366 if (bind_sock < 0) {
367 goto out;
368 }
369
370 send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
371 if (bind_sock < 0) {
372 goto out;
373 }
374
375 rc = bind (bind_sock, &any.generic, sl);
376 if (rc < 0) {
377 goto out;
378 }
379
380 if (ipv6_) {
381 struct ipv6_mreq mreq;
382 const sockaddr_in6 *const mcast_ipv6 = &mcast.ipv6;
383
384 mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
385 mreq.ipv6mr_interface = 0;
386
387 rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
388 as_setsockopt_opt_t (&mreq), sizeof (mreq));
389 if (rc < 0) {
390 goto out;
391 }
392
393 int loop = 1;
394 rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
395 as_setsockopt_opt_t (&loop), sizeof (loop));
396 if (rc < 0) {
397 goto out;
398 }
399 } else {
400 struct ip_mreq mreq;
401 const sockaddr_in *const mcast_ipv4 = &mcast.ipv4;
402
403 mreq.imr_multiaddr = mcast_ipv4->sin_addr;
404 mreq.imr_interface.s_addr = htonl (INADDR_ANY);
405
406 rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
407 as_setsockopt_opt_t (&mreq), sizeof (mreq));
408 if (rc < 0) {
409 goto out;
410 }
411
412 int loop = 1;
413 rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP,
414 as_setsockopt_opt_t (&loop), sizeof (loop));
415 if (rc < 0) {
416 goto out;
417 }
418 }
419
420 msleep (SETTLE_TIME);
421
422 rc = sendto (send_sock, msg, static_cast<socklen_t> (strlen (msg)), 0,
423 &mcast.generic, sl);
424 if (rc < 0) {
425 goto out;
426 }
427
428 msleep (SETTLE_TIME);
429
430 rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0);
431 if (rc < 0) {
432 goto out;
433 }
434
435 buf[rc] = '\0';
436
437 success = (strcmp (msg, buf) == 0);
438
439 out:
440 if (bind_sock >= 0) {
441 close (bind_sock);
442 }
443
444 if (send_sock >= 0) {
445 close (send_sock);
446 }
447
448 return success;
449 }
450
ignore_if_unavailable(int ipv6_)451 static void ignore_if_unavailable (int ipv6_)
452 {
453 if (ipv6_ && !is_ipv6_available ())
454 TEST_IGNORE_MESSAGE ("No IPV6 available");
455 if (!is_multicast_available (ipv6_))
456 TEST_IGNORE_MESSAGE ("No multicast available");
457 }
458
test_radio_dish_mcast(int ipv6_)459 static void test_radio_dish_mcast (int ipv6_)
460 {
461 ignore_if_unavailable (ipv6_);
462
463 void *radio = test_context_socket (ZMQ_RADIO);
464 void *dish = test_context_socket (ZMQ_DISH);
465
466 TEST_ASSERT_SUCCESS_ERRNO (
467 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
468 TEST_ASSERT_SUCCESS_ERRNO (
469 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
470
471 const char *url = mcast_url (ipv6_);
472
473 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
474 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
475
476 msleep (SETTLE_TIME);
477
478 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
479
480 msg_send_expect_success (radio, "TV", "Friends");
481 msg_recv_cmp (dish, "TV", "Friends");
482
483 test_context_socket_close (dish);
484 test_context_socket_close (radio);
485 }
MAKE_TEST_V4V6(test_radio_dish_mcast)486 MAKE_TEST_V4V6 (test_radio_dish_mcast)
487
488 static void test_radio_dish_no_loop (int ipv6_)
489 {
490 #ifdef _WIN32
491 TEST_IGNORE_MESSAGE (
492 "ZMQ_MULTICAST_LOOP=false does not appear to work on Windows (TODO)");
493 #endif
494 ignore_if_unavailable (ipv6_);
495
496 void *radio = test_context_socket (ZMQ_RADIO);
497 void *dish = test_context_socket (ZMQ_DISH);
498
499 TEST_ASSERT_SUCCESS_ERRNO (
500 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
501 TEST_ASSERT_SUCCESS_ERRNO (
502 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
503
504 // Disable multicast loop for radio
505 int loop = 0;
506 TEST_ASSERT_SUCCESS_ERRNO (
507 zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int)));
508
509 const char *url = mcast_url (ipv6_);
510
511 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
512 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
513
514 msleep (SETTLE_TIME);
515
516 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
517
518 msg_send_expect_success (radio, "TV", "Friends");
519
520 // Looping is disabled, we shouldn't receive anything
521 msleep (SETTLE_TIME);
522
523 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dish, NULL, 0, ZMQ_DONTWAIT));
524
525 test_context_socket_close (dish);
526 test_context_socket_close (radio);
527 }
MAKE_TEST_V4V6(test_radio_dish_no_loop)528 MAKE_TEST_V4V6 (test_radio_dish_no_loop)
529
530 int main (void)
531 {
532 setup_test_environment ();
533
534 UNITY_BEGIN ();
535 RUN_TEST (test_leave_unjoined_fails);
536 RUN_TEST (test_join_too_long_fails);
537 RUN_TEST (test_long_group);
538 RUN_TEST (test_join_twice_fails);
539 RUN_TEST (test_radio_bind_fails_ipv4);
540 RUN_TEST (test_radio_bind_fails_ipv6);
541 RUN_TEST (test_dish_connect_fails_ipv4);
542 RUN_TEST (test_dish_connect_fails_ipv6);
543 RUN_TEST (test_radio_dish_tcp_poll_ipv4);
544 RUN_TEST (test_radio_dish_tcp_poll_ipv6);
545 RUN_TEST (test_radio_dish_udp_ipv4);
546 RUN_TEST (test_radio_dish_udp_ipv6);
547
548 RUN_TEST (test_radio_dish_mcast_ipv4);
549 RUN_TEST (test_radio_dish_no_loop_ipv4);
550
551 RUN_TEST (test_radio_dish_mcast_ipv6);
552 RUN_TEST (test_radio_dish_no_loop_ipv6);
553
554 return UNITY_END ();
555 }
556