1 /*  =========================================================================
2     zbeacon - LAN discovery and presence
3 
4     Copyright (c) the Contributors as noted in the AUTHORS file.
5     This file is part of CZMQ, the high-level C binding for 0MQ:
6     http://czmq.zeromq.org.
7 
8     This Source Code Form is subject to the terms of the Mozilla Public
9     License, v. 2.0. If a copy of the MPL was not distributed with this
10     file, You can obtain one at http://mozilla.org/MPL/2.0/.
11     =========================================================================
12 */
13 
14 /*
15 @header
16     The zbeacon class implements a peer-to-peer discovery service for local
17     networks. A beacon can broadcast and/or capture service announcements
18     using UDP messages on the local area network. This implementation uses
19     IPv4 UDP broadcasts. You can define the format of your outgoing beacons,
20     and set a filter that validates incoming beacons. Beacons are sent and
21     received asynchronously in the background.
22 @discuss
23     This class replaces zbeacon_v2, and is meant for applications that use
24     the CZMQ v3 API (meaning, zsock).
25 @end
26 */
27 
28 #include "czmq_classes.h"
29 
30 //  Constants
31 #define INTERVAL_DFLT  1000         //  Default interval = 1 second
32 
33 //  --------------------------------------------------------------------------
34 //  The self_t structure holds the state for one actor instance
35 
36 typedef struct {
37     zsock_t *pipe;              //  Actor command pipe
38     SOCKET udpsock;             //  UDP socket for send/recv
39     SOCKET udpsock_send;        //  UDP socket for IPv6 send
40     char port_nbr [7];          //  UDP port number we work on
41     int interval;               //  Beacon broadcast interval
42     int64_t ping_at;            //  Next broadcast time
43     zframe_t *transmit;         //  Beacon transmit data
44     zframe_t *filter;           //  Beacon filter data
45     inaddr_storage_t broadcast; //  Our broadcast address
46     bool terminated;            //  Did caller ask us to quit?
47     bool verbose;               //  Verbose logging enabled?
48     char hostname [NI_MAXHOST]; //  Saved host name
49 } self_t;
50 
51 static void
s_self_destroy(self_t ** self_p)52 s_self_destroy (self_t **self_p)
53 {
54     assert (self_p);
55     if (*self_p) {
56         self_t *self = *self_p;
57         zframe_destroy (&self->transmit);
58         zframe_destroy (&self->filter);
59         if (self->udpsock) // don't close STDIN
60             zsys_udp_close (self->udpsock);
61         freen (self);
62         *self_p = NULL;
63     }
64 }
65 
66 static self_t *
s_self_new(zsock_t * pipe)67 s_self_new (zsock_t *pipe)
68 {
69     self_t *self = (self_t *) zmalloc (sizeof (self_t));
70     assert (self);
71     self->pipe = pipe;
72     return self;
73 }
74 
75 
76 //  --------------------------------------------------------------------------
77 //  Prepare beacon to work on specified UPD port.
78 
79 static void
s_self_prepare_udp(self_t * self)80 s_self_prepare_udp (self_t *self)
81 {
82     //  Create our UDP socket
83     if (self->udpsock)
84         zsys_udp_close (self->udpsock);
85     if (self->udpsock_send)
86         zsys_udp_close (self->udpsock_send);
87 
88     self->hostname [0] = 0;
89     //  For IPv6 we need two sockets. At least on Linux, IPv6 multicast packets
90     //  are NOT received despite joining the group and setting the interface
91     //  option UNLESS the socket is bound to in6_addrany, which means the kernel
92     //  will select an arbitrary IP address as the source when sending beacons
93     //  out. This breaks zbeacon as the protocol uses the source address of a
94     //  beacon to find the endpoint of a peer, which is then random and
95     //  useless (could even be associated with a different interface, eg: a
96     //  virtual bridge).
97     //  As a workaround, use a different socket to send packets. So the socket
98     //  that receives can be bound to in6_addrany, and the socket that sends
99     //  can be bound to the actual intended host address.
100     self->udpsock = zsys_udp_new (false);
101     if (self->udpsock == INVALID_SOCKET) {
102         self->udpsock_send = INVALID_SOCKET;
103         return;
104     }
105     self->udpsock_send = zsys_udp_new (false);
106     if (self->udpsock_send == INVALID_SOCKET) {
107         zsys_udp_close (self->udpsock);
108         self->udpsock = INVALID_SOCKET;
109         return;
110     }
111 
112     //  Get the network interface fro ZSYS_INTERFACE or else use first
113     //  broadcast interface defined on system. ZSYS_INTERFACE=* means
114     //  use INADDR_ANY + INADDR_BROADCAST.
115     const char *iface = zsys_interface ();
116     struct addrinfo *bind_to = NULL;
117     struct addrinfo *send_to = NULL;
118     struct addrinfo hint;
119     memset (&hint, 0, sizeof(struct addrinfo));
120     hint.ai_flags = AI_NUMERICHOST;
121 #if !defined (CZMQ_HAVE_ANDROID) && !defined (CZMQ_HAVE_FREEBSD)
122     hint.ai_flags |= AI_V4MAPPED;
123 #endif
124     hint.ai_socktype = SOCK_DGRAM;
125     hint.ai_protocol = IPPROTO_UDP;
126     hint.ai_family = zsys_ipv6 () ? AF_INET6 : AF_INET;
127     int rc;
128     int found_iface = 0;
129     unsigned int if_index = 0;
130 
131     if (streq (iface, "*")) {
132         //  Wildcard means bind to INADDR_ANY and send to INADDR_BROADCAST
133         //  IE - getaddrinfo with NULL as first parameter or 255.255.255.255
134         //  (or IPv6 multicast link-local all-node group ff02::1
135         hint.ai_flags = hint.ai_flags | AI_PASSIVE;
136         rc = getaddrinfo (NULL, self->port_nbr, &hint, &bind_to);
137         assert (rc == 0);
138 
139         if (zsys_ipv6()) {
140             //  Default is link-local all-node multicast group
141             rc = getaddrinfo (zsys_ipv6_mcast_address (), self->port_nbr, &hint,
142                     &send_to);
143             assert (rc == 0);
144         }
145         else {
146             rc = getaddrinfo ("255.255.255.255", self->port_nbr, &hint,
147                     &send_to);
148             assert (rc == 0);
149         }
150 
151         found_iface = 1;
152     }
153     // if ZSYS_INTERFACE is a single digit, use the corresponding interface in
154     // the interface list
155     else if (strlen (iface) == 1 && iface[0] >= '0' && iface[0] <= '9')
156     {
157         int if_number = atoi (iface);
158         ziflist_t *iflist = ziflist_new_ipv6 ();
159         assert (iflist);
160         const char *name = ziflist_first (iflist);
161         int idx = -1;
162         while (name) {
163             idx++;
164             if (idx == if_number &&
165                     ((ziflist_is_ipv6 (iflist) && zsys_ipv6 ()) ||
166                             (!ziflist_is_ipv6 (iflist) && !zsys_ipv6 ()))) {
167                 //  Using inet_addr instead of inet_aton or inet_atop
168                 //  because these are not supported in Win XP
169                 rc = getaddrinfo (ziflist_address (iflist), self->port_nbr,
170                         &hint, &bind_to);
171                 assert (rc == 0);
172                 rc = getaddrinfo (ziflist_broadcast (iflist), self->port_nbr,
173                         &hint, &send_to);
174                 assert (rc == 0);
175                 if_index = if_nametoindex (name);
176 
177                 if (self->verbose)
178                     zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
179                             name, ziflist_address (iflist), ziflist_broadcast (iflist));
180                 found_iface = 1;
181                 break;      //  iface is known, so allow it
182             }
183             name = ziflist_next (iflist);
184         }
185         ziflist_destroy (&iflist);
186     }
187     else if (zsys_ipv6 () && strneq("", zsys_ipv6_address ()) && strneq (iface, "")) {
188         rc = getaddrinfo (zsys_ipv6_address (), self->port_nbr,
189                 &hint, &bind_to);
190         assert (rc == 0);
191         //  A user might set a link-local address without appending %iface
192         if (IN6_IS_ADDR_LINKLOCAL (&((in6addr_t *)bind_to->ai_addr)->sin6_addr) &&
193                 !strchr (zsys_ipv6_address (), '%')) {
194             char address_and_iface [NI_MAXHOST] = {0};
195             strcat (address_and_iface, zsys_ipv6_address ());
196             strcat (address_and_iface, "%");
197             strcat (address_and_iface, iface);
198             rc = getaddrinfo (address_and_iface, self->port_nbr, &hint, &bind_to);
199             assert (rc == 0);
200         }
201         rc = getaddrinfo (zsys_ipv6_mcast_address (), self->port_nbr,
202                 &hint, &send_to);
203         assert (rc == 0);
204         if_index = if_nametoindex (iface);
205 
206         if (self->verbose)
207             zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
208                     iface, zsys_ipv6_address (), zsys_ipv6_mcast_address ());
209         found_iface = 1;
210     }
211     else {
212         //  Look for matching interface, or first ziflist item
213         ziflist_t *iflist = ziflist_new_ipv6 ();
214         assert (iflist);
215         const char *name = ziflist_first (iflist);
216         while (name) {
217             //  If IPv6 is not enabled ignore IPv6 interfaces.
218             if ((streq (iface, name) || streq (iface, "")) &&
219                     ((ziflist_is_ipv6 (iflist) && zsys_ipv6 ()) ||
220                             (!ziflist_is_ipv6 (iflist) && !zsys_ipv6 ()))) {
221                 rc = getaddrinfo (ziflist_address (iflist), self->port_nbr,
222                         &hint, &bind_to);
223                 assert (rc == 0);
224                 rc = getaddrinfo (ziflist_broadcast (iflist), self->port_nbr,
225                         &hint, &send_to);
226                 assert (rc == 0);
227                 if_index = if_nametoindex (name);
228 
229                 if (self->verbose)
230                     zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
231                                name, ziflist_address (iflist), ziflist_broadcast (iflist));
232                 found_iface = 1;
233                 break;      //  iface is known, so allow it
234             }
235             name = ziflist_next (iflist);
236         }
237         ziflist_destroy (&iflist);
238     }
239     if (found_iface) {
240         inaddr_storage_t bind_address;
241 
242         //  On Windows we bind to the host address
243         //  On *NIX we bind to INADDR_ANY or in6addr_any, otherwise multicast
244         //  packets will be filtered out despite joining the group
245 #if (defined (__WINDOWS__))
246         memcpy (&bind_address, bind_to->ai_addr, bind_to->ai_addrlen);
247 #else
248         memcpy (&bind_address, send_to->ai_addr, send_to->ai_addrlen);
249         if (zsys_ipv6 ())
250             bind_address.__inaddr_u.__addr6.sin6_addr = in6addr_any;
251         else
252             bind_address.__inaddr_u.__addr.sin_addr.s_addr = htonl (INADDR_ANY);
253 #endif
254         memcpy (&self->broadcast, send_to->ai_addr, send_to->ai_addrlen);
255 
256         if (zsys_ipv6()) {
257             struct ipv6_mreq mreq;
258             mreq.ipv6mr_interface = if_index;
259             memcpy (&mreq.ipv6mr_multiaddr,
260                     &(((in6addr_t *)(send_to->ai_addr))->sin6_addr),
261                     sizeof (struct in6_addr));
262 
263             if (setsockopt (self->udpsock, IPPROTO_IPV6, IPV6_JOIN_GROUP,
264                     (char *)&mreq, sizeof (mreq)))
265                 zsys_socket_error ("zbeacon: setsockopt IPV6_JOIN_GROUP failed");
266 
267             if (setsockopt (self->udpsock, IPPROTO_IPV6, IPV6_MULTICAST_IF,
268                     (char *)&if_index, sizeof (if_index)))
269                 zsys_socket_error ("zbeacon: setsockopt IPV6_MULTICAST_IF failed");
270 
271             if (setsockopt (self->udpsock_send, IPPROTO_IPV6, IPV6_JOIN_GROUP,
272                     (char *)&mreq, sizeof (mreq)))
273                 zsys_socket_error ("zbeacon: setsockopt IPV6_JOIN_GROUP failed");
274 
275             if (setsockopt (self->udpsock_send, IPPROTO_IPV6, IPV6_MULTICAST_IF,
276                     (char *)&if_index, sizeof (if_index)))
277                 zsys_socket_error ("zbeacon: setsockopt IPV6_MULTICAST_IF failed");
278         }
279 
280         //  If bind fails, we close the socket for opening again later (next poll interval)
281         if (bind (self->udpsock_send, bind_to->ai_addr, bind_to->ai_addrlen) ||
282                 bind (self->udpsock, (struct sockaddr *)&bind_address,
283                 zsys_ipv6 () ? sizeof (in6addr_t) : sizeof (inaddr_t))) {
284             zsys_debug ("zbeacon: Unable to bind to broadcast address, reason=%s", strerror (errno));
285             zsys_udp_close (self->udpsock);
286             self->udpsock = INVALID_SOCKET;
287             zsys_udp_close (self->udpsock_send);
288             self->udpsock_send = INVALID_SOCKET;
289         }
290         else if (streq (iface, "*")) {
291             strcpy(self->hostname, "*");
292             if (self->verbose)
293                 zsys_info ("zbeacon: configured, hostname=%s", self->hostname);
294         }
295         else if (getnameinfo (bind_to->ai_addr, bind_to->ai_addrlen,
296                         self->hostname, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) {
297             if (self->verbose)
298                 zsys_info ("zbeacon: configured, hostname=%s", self->hostname);
299         }
300     }
301     else
302     {
303         //  No valid interface. Close the socket so that we can try again later
304         zsys_udp_close(self->udpsock);
305         self->udpsock = INVALID_SOCKET;
306         zsys_udp_close (self->udpsock_send);
307         self->udpsock_send = INVALID_SOCKET;
308     }
309 
310     freeaddrinfo (bind_to);
311     freeaddrinfo (send_to);
312 }
313 
314 
315 //  --------------------------------------------------------------------------
316 //  Prepare beacon to work on specified UPD port, reply hostname to
317 //  pipe (or "" if this failed)
318 
319 static void
s_self_configure(self_t * self,int port_nbr)320 s_self_configure (self_t *self, int port_nbr)
321 {
322     assert (port_nbr);
323     snprintf (self->port_nbr, 7, "%d", port_nbr);
324     s_self_prepare_udp (self);
325     zstr_send (self->pipe, self->hostname);
326     if (streq (self->hostname, ""))
327         zsys_error ("No broadcast interface found, (ZSYS_INTERFACE=%s)", zsys_interface ());
328 }
329 
330 
331 //  --------------------------------------------------------------------------
332 //  Handle a command from calling application
333 
334 static int
s_self_handle_pipe(self_t * self)335 s_self_handle_pipe (self_t *self)
336 {
337     //  Get just the command off the pipe
338     char *command = zstr_recv (self->pipe);
339     if (!command)
340         return -1;                  //  Interrupted
341 
342     if (self->verbose)
343         zsys_info ("zbeacon: API command=%s", command);
344 
345     if (streq (command, "VERBOSE"))
346         self->verbose = true;
347     else
348     if (streq (command, "CONFIGURE")) {
349         int port;
350         int rc = zsock_recv (self->pipe, "i", &port);
351         assert (rc == 0);
352         s_self_configure (self, port);
353     }
354     else
355     if (streq (command, "PUBLISH")) {
356         zframe_destroy (&self->transmit);
357         zsock_recv (self->pipe, "fi", &self->transmit, &self->interval);
358         assert (zframe_size (self->transmit) <= UDP_FRAME_MAX);
359         if (self->interval == 0)
360             self->interval = INTERVAL_DFLT;
361         //  Start broadcasting immediately
362         self->ping_at = zclock_mono ();
363     }
364     else
365     if (streq (command, "SILENCE"))
366         zframe_destroy (&self->transmit);
367     else
368     if (streq (command, "SUBSCRIBE")) {
369         zframe_destroy (&self->filter);
370         self->filter = zframe_recv (self->pipe);
371         assert (zframe_size (self->filter) <= UDP_FRAME_MAX);
372     }
373     else
374     if (streq (command, "UNSUBSCRIBE"))
375         zframe_destroy (&self->filter);
376     else
377     if (streq (command, "$TERM"))
378         self->terminated = true;
379     else {
380         zsys_error ("zbeacon: - invalid command: %s", command);
381         assert (false);
382     }
383     zstr_free (&command);
384     return 0;
385 }
386 
387 
388 //  --------------------------------------------------------------------------
389 //  Receive and filter the waiting beacon
390 
391 static void
s_self_handle_udp(self_t * self)392 s_self_handle_udp (self_t *self)
393 {
394     assert (self);
395 
396     char peername [NI_MAXHOST];
397     zframe_t *frame = zsys_udp_recv (self->udpsock, peername, NI_MAXHOST);
398     if (!frame)
399         return;
400 
401     //  If filter is set, check that beacon matches it
402     bool is_valid = false;
403     if (self->filter) {
404         byte  *filter_data = zframe_data (self->filter);
405         size_t filter_size = zframe_size (self->filter);
406         if (zframe_size (frame) >= filter_size
407         && memcmp (zframe_data (frame), filter_data, filter_size) == 0)
408             is_valid = true;
409     }
410     //  If valid, discard our own broadcasts, which UDP echoes to us
411     if (is_valid && self->transmit) {
412         byte  *transmit_data = zframe_data (self->transmit);
413         size_t transmit_size = zframe_size (self->transmit);
414         if (zframe_size (frame) == transmit_size
415         && memcmp (zframe_data (frame), transmit_data, transmit_size) == 0)
416             is_valid = false;
417     }
418     //  If still a valid beacon, send on to the API
419     if (is_valid) {
420         zmsg_t *msg = zmsg_new ();
421         assert (msg);
422         zmsg_addstr (msg, peername);
423         zmsg_append (msg, &frame);
424         if (zmsg_send (&msg, self->pipe) < 0)
425             zmsg_destroy (&msg);
426     }
427     else
428         zframe_destroy (&frame);
429 }
430 
431 
432 //  --------------------------------------------------------------------------
433 //  Send the beacon over UDP
434 
435 static int
s_emit_beacon(self_t * self)436 s_emit_beacon (self_t *self)
437 {
438 #if defined (__WINDOWS__)
439     // Windows doesn't broadcast on all interfaces when using INADDR_BROADCAST
440     // only the interface with the highest metric (as seen in `route print`)
441     // so send a packet per interface to each broadcast address
442     if (streq (zsys_interface (), "*") && !zsys_ipv6 ()) {
443         INTERFACE_INFO interface_list [64];
444         DWORD bytes_received = 0;
445 
446         int rc = WSAIoctl (self->udpsock, SIO_GET_INTERFACE_LIST, 0, 0,
447             &interface_list, sizeof (interface_list), &bytes_received, NULL, NULL);
448         assert (rc != SOCKET_ERROR);
449 
450         int num_interfaces = bytes_received / sizeof (INTERFACE_INFO);
451 
452         // iiBroadcastAddress is always 255.255.255.255 need to calculate the specific broadcast address using the netmask
453         // keep the same parameters as self->broadcast but just replace the address for each interface
454         inaddr_t addr;
455         memcpy(&addr, &self->broadcast, sizeof (inaddr_t));
456 
457         for (int i = 0; i < num_interfaces; ++i)
458         {
459             addr.sin_addr.S_un.S_addr = (interface_list[i].iiAddress.AddressIn.sin_addr.S_un.S_addr
460                 | ~(interface_list[i].iiNetmask.AddressIn.sin_addr.S_un.S_addr));
461 
462             if (zsys_udp_send (self->udpsock_send, self->transmit,
463                 (inaddr_t *)&addr, sizeof (inaddr_t))) {
464                 // Send failed, cause zbeacon to re-init socket
465                 return -1;
466             }
467         }
468         return 0;
469     }
470 #endif
471 
472     return zsys_udp_send (self->udpsock_send, self->transmit,
473         (inaddr_t *)&self->broadcast,
474         zsys_ipv6 () ? sizeof (in6addr_t) : sizeof (inaddr_t));
475 }
476 
477 
478 //  --------------------------------------------------------------------------
479 //  zbeacon() implements the zbeacon actor interface
480 
481 void
zbeacon(zsock_t * pipe,void * args)482 zbeacon (zsock_t *pipe, void *args)
483 {
484     self_t *self = s_self_new (pipe);
485     assert (self);
486     //  Signal successful initialization
487     zsock_signal (pipe, 0);
488 
489     while (!self->terminated) {
490         //  Poll on API pipe and on UDP socket
491         zmq_pollitem_t pollitems [] = {
492             { zsock_resolve (self->pipe), 0, ZMQ_POLLIN, 0 },
493             { NULL, self->udpsock, ZMQ_POLLIN, 0 }
494         };
495         long timeout = -1;
496         if (self->transmit) {
497             timeout = (long) (self->ping_at - zclock_mono ());
498             if (timeout < 0)
499                 timeout = 0;
500         }
501         int pollset_size = (self->udpsock && self->udpsock != INVALID_SOCKET) ? 2: 1;
502         if (zmq_poll (pollitems, pollset_size, timeout * ZMQ_POLL_MSEC) == -1)
503             break;              //  Interrupted
504 
505         if (pollitems [0].revents & ZMQ_POLLIN)
506             s_self_handle_pipe (self);
507         if (pollitems [1].revents & ZMQ_POLLIN)
508             s_self_handle_udp (self);
509 
510         if (self->transmit
511         &&  zclock_mono () >= self->ping_at) {
512             //  Send beacon to any listening peers
513             if (!self->udpsock_send || self->udpsock_send == INVALID_SOCKET || s_emit_beacon(self))
514             {
515                 const char *reason = (!self->udpsock_send || self->udpsock_send == INVALID_SOCKET) ? "invalid socket" : strerror (errno);
516                 zsys_debug ("zbeacon: failed to transmit, attempting reconnection. reason=%s", reason);
517                 //  Try to recreate UDP socket on interface
518                 s_self_prepare_udp (self);
519             }
520             self->ping_at = zclock_mono () + self->interval;
521         }
522     }
523     s_self_destroy (&self);
524 }
525 
526 
527 //  --------------------------------------------------------------------------
528 //  Selftest
529 
530 void
zbeacon_test(bool verbose)531 zbeacon_test (bool verbose)
532 {
533     printf (" * zbeacon: ");
534     if (verbose)
535         printf ("\n");
536 
537     //  @selftest
538     //  Test 1 - two beacons, one speaking, one listening
539     //  Create speaker beacon to broadcast our service
540     zactor_t *speaker = zactor_new (zbeacon, NULL);
541     assert (speaker);
542     if (verbose)
543         zstr_sendx (speaker, "VERBOSE", NULL);
544 
545     zsock_send (speaker, "si", "CONFIGURE", 9999);
546     char *hostname = zstr_recv (speaker);
547     if (!*hostname) {
548         printf ("OK (skipping test, no UDP broadcasting)\n");
549         zactor_destroy (&speaker);
550         freen (hostname);
551         return;
552     }
553     freen (hostname);
554 
555     //  Create listener beacon on port 9999 to lookup service
556     zactor_t *listener = zactor_new (zbeacon, NULL);
557     assert (listener);
558     if (verbose)
559         zstr_sendx (listener, "VERBOSE", NULL);
560     zsock_send (listener, "si", "CONFIGURE", 9999);
561     hostname = zstr_recv (listener);
562     assert (*hostname);
563     freen (hostname);
564 
565     //  We will broadcast the magic value 0xCAFE
566     byte announcement [2] = { 0xCA, 0xFE };
567     zsock_send (speaker, "sbi", "PUBLISH", announcement, 2, 100);
568     //  We will listen to anything (empty subscription)
569     zsock_send (listener, "sb", "SUBSCRIBE", "", 0);
570 
571     //  Wait for at most 1/2 second if there's no broadcasting
572     zsock_set_rcvtimeo (listener, 500);
573     char *ipaddress = zstr_recv (listener);
574     if (ipaddress) {
575         zframe_t *content = zframe_recv (listener);
576         assert (zframe_size (content) == 2);
577         assert (zframe_data (content) [0] == 0xCA);
578         assert (zframe_data (content) [1] == 0xFE);
579         zframe_destroy (&content);
580         zstr_free (&ipaddress);
581         zstr_sendx (speaker, "SILENCE", NULL);
582     }
583     zactor_destroy (&listener);
584     zactor_destroy (&speaker);
585 
586     //  Test subscription filter using a 3-node setup
587     zactor_t *node1 = zactor_new (zbeacon, NULL);
588     assert (node1);
589     zsock_send (node1, "si", "CONFIGURE", 5670);
590     hostname = zstr_recv (node1);
591     assert (*hostname);
592     freen (hostname);
593 
594     zactor_t *node2 = zactor_new (zbeacon, NULL);
595     assert (node2);
596     zsock_send (node2, "si", "CONFIGURE", 5670);
597     hostname = zstr_recv (node2);
598     assert (*hostname);
599     freen (hostname);
600 
601     zactor_t *node3 = zactor_new (zbeacon, NULL);
602     assert (node3);
603     zsock_send (node3, "si", "CONFIGURE", 5670);
604     hostname = zstr_recv (node3);
605     assert (*hostname);
606     freen (hostname);
607 
608     zsock_send (node1, "sbi", "PUBLISH", "NODE/1", 6, 250);
609     zsock_send (node2, "sbi", "PUBLISH", "NODE/2", 6, 250);
610     zsock_send (node3, "sbi", "PUBLISH", "RANDOM", 6, 250);
611     zsock_send (node1, "sb", "SUBSCRIBE", "NODE", 4);
612 
613     //  Poll on three API sockets at once
614     zpoller_t *poller = zpoller_new (node1, node2, node3, NULL);
615     assert (poller);
616     int64_t stop_at = zclock_mono () + 1000;
617     while (zclock_mono () < stop_at) {
618         long timeout = (long) (stop_at - zclock_mono ());
619         if (timeout < 0)
620             timeout = 0;
621         void *which = zpoller_wait (poller, timeout * ZMQ_POLL_MSEC);
622         if (which) {
623             assert (which == node1);
624             char *ipaddress, *received;
625             zstr_recvx (node1, &ipaddress, &received, NULL);
626             assert (streq (received, "NODE/2"));
627             zstr_free (&ipaddress);
628             zstr_free (&received);
629         }
630     }
631     zpoller_destroy (&poller);
632 
633     //  Stop listening
634     zstr_sendx (node1, "UNSUBSCRIBE", NULL);
635 
636     //  Stop all node broadcasts
637     zstr_sendx (node1, "SILENCE", NULL);
638     zstr_sendx (node2, "SILENCE", NULL);
639     zstr_sendx (node3, "SILENCE", NULL);
640 
641     //  Destroy the test nodes
642     zactor_destroy (&node1);
643     zactor_destroy (&node2);
644     zactor_destroy (&node3);
645 
646 #if defined (__WINDOWS__)
647     zsys_shutdown();
648 #endif
649     //  @end
650     printf ("OK\n");
651 }
652