1 /* =========================================================================
2 zbeacon - LAN discovery and presence
3
4 Copyright (c) the Contributors as noted in the AUTHORS file.
5 This file is part of CZMQ, the high-level C binding for 0MQ:
6 http://czmq.zeromq.org.
7
8 This Source Code Form is subject to the terms of the Mozilla Public
9 License, v. 2.0. If a copy of the MPL was not distributed with this
10 file, You can obtain one at http://mozilla.org/MPL/2.0/.
11 =========================================================================
12 */
13
14 /*
15 @header
16 The zbeacon class implements a peer-to-peer discovery service for local
17 networks. A beacon can broadcast and/or capture service announcements
18 using UDP messages on the local area network. This implementation uses
19 IPv4 UDP broadcasts. You can define the format of your outgoing beacons,
20 and set a filter that validates incoming beacons. Beacons are sent and
21 received asynchronously in the background.
22 @discuss
23 This class replaces zbeacon_v2, and is meant for applications that use
24 the CZMQ v3 API (meaning, zsock).
25 @end
26 */
27
28 #include "czmq_classes.h"
29
30 // Constants
31 #define INTERVAL_DFLT 1000 // Default interval = 1 second
32
33 // --------------------------------------------------------------------------
34 // The self_t structure holds the state for one actor instance
35
36 typedef struct {
37 zsock_t *pipe; // Actor command pipe
38 SOCKET udpsock; // UDP socket for send/recv
39 SOCKET udpsock_send; // UDP socket for IPv6 send
40 char port_nbr [7]; // UDP port number we work on
41 int interval; // Beacon broadcast interval
42 int64_t ping_at; // Next broadcast time
43 zframe_t *transmit; // Beacon transmit data
44 zframe_t *filter; // Beacon filter data
45 inaddr_storage_t broadcast; // Our broadcast address
46 bool terminated; // Did caller ask us to quit?
47 bool verbose; // Verbose logging enabled?
48 char hostname [NI_MAXHOST]; // Saved host name
49 } self_t;
50
51 static void
s_self_destroy(self_t ** self_p)52 s_self_destroy (self_t **self_p)
53 {
54 assert (self_p);
55 if (*self_p) {
56 self_t *self = *self_p;
57 zframe_destroy (&self->transmit);
58 zframe_destroy (&self->filter);
59 if (self->udpsock) // don't close STDIN
60 zsys_udp_close (self->udpsock);
61 freen (self);
62 *self_p = NULL;
63 }
64 }
65
66 static self_t *
s_self_new(zsock_t * pipe)67 s_self_new (zsock_t *pipe)
68 {
69 self_t *self = (self_t *) zmalloc (sizeof (self_t));
70 assert (self);
71 self->pipe = pipe;
72 return self;
73 }
74
75
76 // --------------------------------------------------------------------------
77 // Prepare beacon to work on specified UPD port.
78
79 static void
s_self_prepare_udp(self_t * self)80 s_self_prepare_udp (self_t *self)
81 {
82 // Create our UDP socket
83 if (self->udpsock)
84 zsys_udp_close (self->udpsock);
85 if (self->udpsock_send)
86 zsys_udp_close (self->udpsock_send);
87
88 self->hostname [0] = 0;
89 // For IPv6 we need two sockets. At least on Linux, IPv6 multicast packets
90 // are NOT received despite joining the group and setting the interface
91 // option UNLESS the socket is bound to in6_addrany, which means the kernel
92 // will select an arbitrary IP address as the source when sending beacons
93 // out. This breaks zbeacon as the protocol uses the source address of a
94 // beacon to find the endpoint of a peer, which is then random and
95 // useless (could even be associated with a different interface, eg: a
96 // virtual bridge).
97 // As a workaround, use a different socket to send packets. So the socket
98 // that receives can be bound to in6_addrany, and the socket that sends
99 // can be bound to the actual intended host address.
100 self->udpsock = zsys_udp_new (false);
101 if (self->udpsock == INVALID_SOCKET) {
102 self->udpsock_send = INVALID_SOCKET;
103 return;
104 }
105 self->udpsock_send = zsys_udp_new (false);
106 if (self->udpsock_send == INVALID_SOCKET) {
107 zsys_udp_close (self->udpsock);
108 self->udpsock = INVALID_SOCKET;
109 return;
110 }
111
112 // Get the network interface fro ZSYS_INTERFACE or else use first
113 // broadcast interface defined on system. ZSYS_INTERFACE=* means
114 // use INADDR_ANY + INADDR_BROADCAST.
115 const char *iface = zsys_interface ();
116 struct addrinfo *bind_to = NULL;
117 struct addrinfo *send_to = NULL;
118 struct addrinfo hint;
119 memset (&hint, 0, sizeof(struct addrinfo));
120 hint.ai_flags = AI_NUMERICHOST;
121 #if !defined (CZMQ_HAVE_ANDROID) && !defined (CZMQ_HAVE_FREEBSD)
122 hint.ai_flags |= AI_V4MAPPED;
123 #endif
124 hint.ai_socktype = SOCK_DGRAM;
125 hint.ai_protocol = IPPROTO_UDP;
126 hint.ai_family = zsys_ipv6 () ? AF_INET6 : AF_INET;
127 int rc;
128 int found_iface = 0;
129 unsigned int if_index = 0;
130
131 if (streq (iface, "*")) {
132 // Wildcard means bind to INADDR_ANY and send to INADDR_BROADCAST
133 // IE - getaddrinfo with NULL as first parameter or 255.255.255.255
134 // (or IPv6 multicast link-local all-node group ff02::1
135 hint.ai_flags = hint.ai_flags | AI_PASSIVE;
136 rc = getaddrinfo (NULL, self->port_nbr, &hint, &bind_to);
137 assert (rc == 0);
138
139 if (zsys_ipv6()) {
140 // Default is link-local all-node multicast group
141 rc = getaddrinfo (zsys_ipv6_mcast_address (), self->port_nbr, &hint,
142 &send_to);
143 assert (rc == 0);
144 }
145 else {
146 rc = getaddrinfo ("255.255.255.255", self->port_nbr, &hint,
147 &send_to);
148 assert (rc == 0);
149 }
150
151 found_iface = 1;
152 }
153 // if ZSYS_INTERFACE is a single digit, use the corresponding interface in
154 // the interface list
155 else if (strlen (iface) == 1 && iface[0] >= '0' && iface[0] <= '9')
156 {
157 int if_number = atoi (iface);
158 ziflist_t *iflist = ziflist_new_ipv6 ();
159 assert (iflist);
160 const char *name = ziflist_first (iflist);
161 int idx = -1;
162 while (name) {
163 idx++;
164 if (idx == if_number &&
165 ((ziflist_is_ipv6 (iflist) && zsys_ipv6 ()) ||
166 (!ziflist_is_ipv6 (iflist) && !zsys_ipv6 ()))) {
167 // Using inet_addr instead of inet_aton or inet_atop
168 // because these are not supported in Win XP
169 rc = getaddrinfo (ziflist_address (iflist), self->port_nbr,
170 &hint, &bind_to);
171 assert (rc == 0);
172 rc = getaddrinfo (ziflist_broadcast (iflist), self->port_nbr,
173 &hint, &send_to);
174 assert (rc == 0);
175 if_index = if_nametoindex (name);
176
177 if (self->verbose)
178 zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
179 name, ziflist_address (iflist), ziflist_broadcast (iflist));
180 found_iface = 1;
181 break; // iface is known, so allow it
182 }
183 name = ziflist_next (iflist);
184 }
185 ziflist_destroy (&iflist);
186 }
187 else if (zsys_ipv6 () && strneq("", zsys_ipv6_address ()) && strneq (iface, "")) {
188 rc = getaddrinfo (zsys_ipv6_address (), self->port_nbr,
189 &hint, &bind_to);
190 assert (rc == 0);
191 // A user might set a link-local address without appending %iface
192 if (IN6_IS_ADDR_LINKLOCAL (&((in6addr_t *)bind_to->ai_addr)->sin6_addr) &&
193 !strchr (zsys_ipv6_address (), '%')) {
194 char address_and_iface [NI_MAXHOST] = {0};
195 strcat (address_and_iface, zsys_ipv6_address ());
196 strcat (address_and_iface, "%");
197 strcat (address_and_iface, iface);
198 rc = getaddrinfo (address_and_iface, self->port_nbr, &hint, &bind_to);
199 assert (rc == 0);
200 }
201 rc = getaddrinfo (zsys_ipv6_mcast_address (), self->port_nbr,
202 &hint, &send_to);
203 assert (rc == 0);
204 if_index = if_nametoindex (iface);
205
206 if (self->verbose)
207 zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
208 iface, zsys_ipv6_address (), zsys_ipv6_mcast_address ());
209 found_iface = 1;
210 }
211 else {
212 // Look for matching interface, or first ziflist item
213 ziflist_t *iflist = ziflist_new_ipv6 ();
214 assert (iflist);
215 const char *name = ziflist_first (iflist);
216 while (name) {
217 // If IPv6 is not enabled ignore IPv6 interfaces.
218 if ((streq (iface, name) || streq (iface, "")) &&
219 ((ziflist_is_ipv6 (iflist) && zsys_ipv6 ()) ||
220 (!ziflist_is_ipv6 (iflist) && !zsys_ipv6 ()))) {
221 rc = getaddrinfo (ziflist_address (iflist), self->port_nbr,
222 &hint, &bind_to);
223 assert (rc == 0);
224 rc = getaddrinfo (ziflist_broadcast (iflist), self->port_nbr,
225 &hint, &send_to);
226 assert (rc == 0);
227 if_index = if_nametoindex (name);
228
229 if (self->verbose)
230 zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
231 name, ziflist_address (iflist), ziflist_broadcast (iflist));
232 found_iface = 1;
233 break; // iface is known, so allow it
234 }
235 name = ziflist_next (iflist);
236 }
237 ziflist_destroy (&iflist);
238 }
239 if (found_iface) {
240 inaddr_storage_t bind_address;
241
242 // On Windows we bind to the host address
243 // On *NIX we bind to INADDR_ANY or in6addr_any, otherwise multicast
244 // packets will be filtered out despite joining the group
245 #if (defined (__WINDOWS__))
246 memcpy (&bind_address, bind_to->ai_addr, bind_to->ai_addrlen);
247 #else
248 memcpy (&bind_address, send_to->ai_addr, send_to->ai_addrlen);
249 if (zsys_ipv6 ())
250 bind_address.__inaddr_u.__addr6.sin6_addr = in6addr_any;
251 else
252 bind_address.__inaddr_u.__addr.sin_addr.s_addr = htonl (INADDR_ANY);
253 #endif
254 memcpy (&self->broadcast, send_to->ai_addr, send_to->ai_addrlen);
255
256 if (zsys_ipv6()) {
257 struct ipv6_mreq mreq;
258 mreq.ipv6mr_interface = if_index;
259 memcpy (&mreq.ipv6mr_multiaddr,
260 &(((in6addr_t *)(send_to->ai_addr))->sin6_addr),
261 sizeof (struct in6_addr));
262
263 if (setsockopt (self->udpsock, IPPROTO_IPV6, IPV6_JOIN_GROUP,
264 (char *)&mreq, sizeof (mreq)))
265 zsys_socket_error ("zbeacon: setsockopt IPV6_JOIN_GROUP failed");
266
267 if (setsockopt (self->udpsock, IPPROTO_IPV6, IPV6_MULTICAST_IF,
268 (char *)&if_index, sizeof (if_index)))
269 zsys_socket_error ("zbeacon: setsockopt IPV6_MULTICAST_IF failed");
270
271 if (setsockopt (self->udpsock_send, IPPROTO_IPV6, IPV6_JOIN_GROUP,
272 (char *)&mreq, sizeof (mreq)))
273 zsys_socket_error ("zbeacon: setsockopt IPV6_JOIN_GROUP failed");
274
275 if (setsockopt (self->udpsock_send, IPPROTO_IPV6, IPV6_MULTICAST_IF,
276 (char *)&if_index, sizeof (if_index)))
277 zsys_socket_error ("zbeacon: setsockopt IPV6_MULTICAST_IF failed");
278 }
279
280 // If bind fails, we close the socket for opening again later (next poll interval)
281 if (bind (self->udpsock_send, bind_to->ai_addr, bind_to->ai_addrlen) ||
282 bind (self->udpsock, (struct sockaddr *)&bind_address,
283 zsys_ipv6 () ? sizeof (in6addr_t) : sizeof (inaddr_t))) {
284 zsys_debug ("zbeacon: Unable to bind to broadcast address, reason=%s", strerror (errno));
285 zsys_udp_close (self->udpsock);
286 self->udpsock = INVALID_SOCKET;
287 zsys_udp_close (self->udpsock_send);
288 self->udpsock_send = INVALID_SOCKET;
289 }
290 else if (streq (iface, "*")) {
291 strcpy(self->hostname, "*");
292 if (self->verbose)
293 zsys_info ("zbeacon: configured, hostname=%s", self->hostname);
294 }
295 else if (getnameinfo (bind_to->ai_addr, bind_to->ai_addrlen,
296 self->hostname, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) {
297 if (self->verbose)
298 zsys_info ("zbeacon: configured, hostname=%s", self->hostname);
299 }
300 }
301 else
302 {
303 // No valid interface. Close the socket so that we can try again later
304 zsys_udp_close(self->udpsock);
305 self->udpsock = INVALID_SOCKET;
306 zsys_udp_close (self->udpsock_send);
307 self->udpsock_send = INVALID_SOCKET;
308 }
309
310 freeaddrinfo (bind_to);
311 freeaddrinfo (send_to);
312 }
313
314
315 // --------------------------------------------------------------------------
316 // Prepare beacon to work on specified UPD port, reply hostname to
317 // pipe (or "" if this failed)
318
319 static void
s_self_configure(self_t * self,int port_nbr)320 s_self_configure (self_t *self, int port_nbr)
321 {
322 assert (port_nbr);
323 snprintf (self->port_nbr, 7, "%d", port_nbr);
324 s_self_prepare_udp (self);
325 zstr_send (self->pipe, self->hostname);
326 if (streq (self->hostname, ""))
327 zsys_error ("No broadcast interface found, (ZSYS_INTERFACE=%s)", zsys_interface ());
328 }
329
330
331 // --------------------------------------------------------------------------
332 // Handle a command from calling application
333
334 static int
s_self_handle_pipe(self_t * self)335 s_self_handle_pipe (self_t *self)
336 {
337 // Get just the command off the pipe
338 char *command = zstr_recv (self->pipe);
339 if (!command)
340 return -1; // Interrupted
341
342 if (self->verbose)
343 zsys_info ("zbeacon: API command=%s", command);
344
345 if (streq (command, "VERBOSE"))
346 self->verbose = true;
347 else
348 if (streq (command, "CONFIGURE")) {
349 int port;
350 int rc = zsock_recv (self->pipe, "i", &port);
351 assert (rc == 0);
352 s_self_configure (self, port);
353 }
354 else
355 if (streq (command, "PUBLISH")) {
356 zframe_destroy (&self->transmit);
357 zsock_recv (self->pipe, "fi", &self->transmit, &self->interval);
358 assert (zframe_size (self->transmit) <= UDP_FRAME_MAX);
359 if (self->interval == 0)
360 self->interval = INTERVAL_DFLT;
361 // Start broadcasting immediately
362 self->ping_at = zclock_mono ();
363 }
364 else
365 if (streq (command, "SILENCE"))
366 zframe_destroy (&self->transmit);
367 else
368 if (streq (command, "SUBSCRIBE")) {
369 zframe_destroy (&self->filter);
370 self->filter = zframe_recv (self->pipe);
371 assert (zframe_size (self->filter) <= UDP_FRAME_MAX);
372 }
373 else
374 if (streq (command, "UNSUBSCRIBE"))
375 zframe_destroy (&self->filter);
376 else
377 if (streq (command, "$TERM"))
378 self->terminated = true;
379 else {
380 zsys_error ("zbeacon: - invalid command: %s", command);
381 assert (false);
382 }
383 zstr_free (&command);
384 return 0;
385 }
386
387
388 // --------------------------------------------------------------------------
389 // Receive and filter the waiting beacon
390
391 static void
s_self_handle_udp(self_t * self)392 s_self_handle_udp (self_t *self)
393 {
394 assert (self);
395
396 char peername [NI_MAXHOST];
397 zframe_t *frame = zsys_udp_recv (self->udpsock, peername, NI_MAXHOST);
398 if (!frame)
399 return;
400
401 // If filter is set, check that beacon matches it
402 bool is_valid = false;
403 if (self->filter) {
404 byte *filter_data = zframe_data (self->filter);
405 size_t filter_size = zframe_size (self->filter);
406 if (zframe_size (frame) >= filter_size
407 && memcmp (zframe_data (frame), filter_data, filter_size) == 0)
408 is_valid = true;
409 }
410 // If valid, discard our own broadcasts, which UDP echoes to us
411 if (is_valid && self->transmit) {
412 byte *transmit_data = zframe_data (self->transmit);
413 size_t transmit_size = zframe_size (self->transmit);
414 if (zframe_size (frame) == transmit_size
415 && memcmp (zframe_data (frame), transmit_data, transmit_size) == 0)
416 is_valid = false;
417 }
418 // If still a valid beacon, send on to the API
419 if (is_valid) {
420 zmsg_t *msg = zmsg_new ();
421 assert (msg);
422 zmsg_addstr (msg, peername);
423 zmsg_append (msg, &frame);
424 if (zmsg_send (&msg, self->pipe) < 0)
425 zmsg_destroy (&msg);
426 }
427 else
428 zframe_destroy (&frame);
429 }
430
431
432 // --------------------------------------------------------------------------
433 // Send the beacon over UDP
434
435 static int
s_emit_beacon(self_t * self)436 s_emit_beacon (self_t *self)
437 {
438 #if defined (__WINDOWS__)
439 // Windows doesn't broadcast on all interfaces when using INADDR_BROADCAST
440 // only the interface with the highest metric (as seen in `route print`)
441 // so send a packet per interface to each broadcast address
442 if (streq (zsys_interface (), "*") && !zsys_ipv6 ()) {
443 INTERFACE_INFO interface_list [64];
444 DWORD bytes_received = 0;
445
446 int rc = WSAIoctl (self->udpsock, SIO_GET_INTERFACE_LIST, 0, 0,
447 &interface_list, sizeof (interface_list), &bytes_received, NULL, NULL);
448 assert (rc != SOCKET_ERROR);
449
450 int num_interfaces = bytes_received / sizeof (INTERFACE_INFO);
451
452 // iiBroadcastAddress is always 255.255.255.255 need to calculate the specific broadcast address using the netmask
453 // keep the same parameters as self->broadcast but just replace the address for each interface
454 inaddr_t addr;
455 memcpy(&addr, &self->broadcast, sizeof (inaddr_t));
456
457 for (int i = 0; i < num_interfaces; ++i)
458 {
459 addr.sin_addr.S_un.S_addr = (interface_list[i].iiAddress.AddressIn.sin_addr.S_un.S_addr
460 | ~(interface_list[i].iiNetmask.AddressIn.sin_addr.S_un.S_addr));
461
462 if (zsys_udp_send (self->udpsock_send, self->transmit,
463 (inaddr_t *)&addr, sizeof (inaddr_t))) {
464 // Send failed, cause zbeacon to re-init socket
465 return -1;
466 }
467 }
468 return 0;
469 }
470 #endif
471
472 return zsys_udp_send (self->udpsock_send, self->transmit,
473 (inaddr_t *)&self->broadcast,
474 zsys_ipv6 () ? sizeof (in6addr_t) : sizeof (inaddr_t));
475 }
476
477
478 // --------------------------------------------------------------------------
479 // zbeacon() implements the zbeacon actor interface
480
481 void
zbeacon(zsock_t * pipe,void * args)482 zbeacon (zsock_t *pipe, void *args)
483 {
484 self_t *self = s_self_new (pipe);
485 assert (self);
486 // Signal successful initialization
487 zsock_signal (pipe, 0);
488
489 while (!self->terminated) {
490 // Poll on API pipe and on UDP socket
491 zmq_pollitem_t pollitems [] = {
492 { zsock_resolve (self->pipe), 0, ZMQ_POLLIN, 0 },
493 { NULL, self->udpsock, ZMQ_POLLIN, 0 }
494 };
495 long timeout = -1;
496 if (self->transmit) {
497 timeout = (long) (self->ping_at - zclock_mono ());
498 if (timeout < 0)
499 timeout = 0;
500 }
501 int pollset_size = (self->udpsock && self->udpsock != INVALID_SOCKET) ? 2: 1;
502 if (zmq_poll (pollitems, pollset_size, timeout * ZMQ_POLL_MSEC) == -1)
503 break; // Interrupted
504
505 if (pollitems [0].revents & ZMQ_POLLIN)
506 s_self_handle_pipe (self);
507 if (pollitems [1].revents & ZMQ_POLLIN)
508 s_self_handle_udp (self);
509
510 if (self->transmit
511 && zclock_mono () >= self->ping_at) {
512 // Send beacon to any listening peers
513 if (!self->udpsock_send || self->udpsock_send == INVALID_SOCKET || s_emit_beacon(self))
514 {
515 const char *reason = (!self->udpsock_send || self->udpsock_send == INVALID_SOCKET) ? "invalid socket" : strerror (errno);
516 zsys_debug ("zbeacon: failed to transmit, attempting reconnection. reason=%s", reason);
517 // Try to recreate UDP socket on interface
518 s_self_prepare_udp (self);
519 }
520 self->ping_at = zclock_mono () + self->interval;
521 }
522 }
523 s_self_destroy (&self);
524 }
525
526
527 // --------------------------------------------------------------------------
528 // Selftest
529
530 void
zbeacon_test(bool verbose)531 zbeacon_test (bool verbose)
532 {
533 printf (" * zbeacon: ");
534 if (verbose)
535 printf ("\n");
536
537 // @selftest
538 // Test 1 - two beacons, one speaking, one listening
539 // Create speaker beacon to broadcast our service
540 zactor_t *speaker = zactor_new (zbeacon, NULL);
541 assert (speaker);
542 if (verbose)
543 zstr_sendx (speaker, "VERBOSE", NULL);
544
545 zsock_send (speaker, "si", "CONFIGURE", 9999);
546 char *hostname = zstr_recv (speaker);
547 if (!*hostname) {
548 printf ("OK (skipping test, no UDP broadcasting)\n");
549 zactor_destroy (&speaker);
550 freen (hostname);
551 return;
552 }
553 freen (hostname);
554
555 // Create listener beacon on port 9999 to lookup service
556 zactor_t *listener = zactor_new (zbeacon, NULL);
557 assert (listener);
558 if (verbose)
559 zstr_sendx (listener, "VERBOSE", NULL);
560 zsock_send (listener, "si", "CONFIGURE", 9999);
561 hostname = zstr_recv (listener);
562 assert (*hostname);
563 freen (hostname);
564
565 // We will broadcast the magic value 0xCAFE
566 byte announcement [2] = { 0xCA, 0xFE };
567 zsock_send (speaker, "sbi", "PUBLISH", announcement, 2, 100);
568 // We will listen to anything (empty subscription)
569 zsock_send (listener, "sb", "SUBSCRIBE", "", 0);
570
571 // Wait for at most 1/2 second if there's no broadcasting
572 zsock_set_rcvtimeo (listener, 500);
573 char *ipaddress = zstr_recv (listener);
574 if (ipaddress) {
575 zframe_t *content = zframe_recv (listener);
576 assert (zframe_size (content) == 2);
577 assert (zframe_data (content) [0] == 0xCA);
578 assert (zframe_data (content) [1] == 0xFE);
579 zframe_destroy (&content);
580 zstr_free (&ipaddress);
581 zstr_sendx (speaker, "SILENCE", NULL);
582 }
583 zactor_destroy (&listener);
584 zactor_destroy (&speaker);
585
586 // Test subscription filter using a 3-node setup
587 zactor_t *node1 = zactor_new (zbeacon, NULL);
588 assert (node1);
589 zsock_send (node1, "si", "CONFIGURE", 5670);
590 hostname = zstr_recv (node1);
591 assert (*hostname);
592 freen (hostname);
593
594 zactor_t *node2 = zactor_new (zbeacon, NULL);
595 assert (node2);
596 zsock_send (node2, "si", "CONFIGURE", 5670);
597 hostname = zstr_recv (node2);
598 assert (*hostname);
599 freen (hostname);
600
601 zactor_t *node3 = zactor_new (zbeacon, NULL);
602 assert (node3);
603 zsock_send (node3, "si", "CONFIGURE", 5670);
604 hostname = zstr_recv (node3);
605 assert (*hostname);
606 freen (hostname);
607
608 zsock_send (node1, "sbi", "PUBLISH", "NODE/1", 6, 250);
609 zsock_send (node2, "sbi", "PUBLISH", "NODE/2", 6, 250);
610 zsock_send (node3, "sbi", "PUBLISH", "RANDOM", 6, 250);
611 zsock_send (node1, "sb", "SUBSCRIBE", "NODE", 4);
612
613 // Poll on three API sockets at once
614 zpoller_t *poller = zpoller_new (node1, node2, node3, NULL);
615 assert (poller);
616 int64_t stop_at = zclock_mono () + 1000;
617 while (zclock_mono () < stop_at) {
618 long timeout = (long) (stop_at - zclock_mono ());
619 if (timeout < 0)
620 timeout = 0;
621 void *which = zpoller_wait (poller, timeout * ZMQ_POLL_MSEC);
622 if (which) {
623 assert (which == node1);
624 char *ipaddress, *received;
625 zstr_recvx (node1, &ipaddress, &received, NULL);
626 assert (streq (received, "NODE/2"));
627 zstr_free (&ipaddress);
628 zstr_free (&received);
629 }
630 }
631 zpoller_destroy (&poller);
632
633 // Stop listening
634 zstr_sendx (node1, "UNSUBSCRIBE", NULL);
635
636 // Stop all node broadcasts
637 zstr_sendx (node1, "SILENCE", NULL);
638 zstr_sendx (node2, "SILENCE", NULL);
639 zstr_sendx (node3, "SILENCE", NULL);
640
641 // Destroy the test nodes
642 zactor_destroy (&node1);
643 zactor_destroy (&node2);
644 zactor_destroy (&node3);
645
646 #if defined (__WINDOWS__)
647 zsys_shutdown();
648 #endif
649 // @end
650 printf ("OK\n");
651 }
652