1 /*  =========================================================================
2     zmonitor - socket event monitor
3 
4     Copyright (c) the Contributors as noted in the AUTHORS file.
5     This file is part of CZMQ, the high-level C binding for 0MQ:
6     http://czmq.zeromq.org.
7 
8     This Source Code Form is subject to the terms of the Mozilla Public
9     License, v. 2.0. If a copy of the MPL was not distributed with this
10     file, You can obtain one at http://mozilla.org/MPL/2.0/.
11     =========================================================================
12 */
13 
14 /*
15 @header
16     The zmonitor actor provides an API for obtaining socket events such as
17     connected, listen, disconnected, etc. Socket events are only available
18     for sockets connecting or bound to ipc:// and tcp:// endpoints.
19 @discuss
20     This class wraps the ZMQ socket monitor API, see zmq_socket_monitor for
21     details. Works on all versions of libzmq from 3.2 onwards. This class
22     replaces zproxy_v2, and is meant for applications that use the CZMQ v3
23     API (meaning, zsock).
24 @end
25 */
26 
27 #include "czmq_classes.h"
28 
29 //  --------------------------------------------------------------------------
30 //  The self_t structure holds the state for one actor instance
31 
32 typedef struct {
33     zsock_t *pipe;              //  Actor command pipe
34     zpoller_t *poller;          //  Socket poller
35     void *monitored;            //  Monitored libzmq socket
36     zsock_t *sink;              //  Sink for monitor events
37     int events;                 //  Monitored event mask
38     bool terminated;            //  Did caller ask us to quit?
39     bool verbose;               //  Verbose logging enabled?
40 } self_t;
41 
42 static void
s_self_destroy(self_t ** self_p)43 s_self_destroy (self_t **self_p)
44 {
45     assert (self_p);
46     if (*self_p) {
47         self_t *self = *self_p;
48 #if defined (ZMQ_EVENT_ALL)
49         zmq_socket_monitor (self->monitored, NULL, 0);
50 #endif
51         zpoller_destroy (&self->poller);
52         zsock_destroy (&self->sink);
53         freen (self);
54         *self_p = NULL;
55     }
56 }
57 
58 static self_t *
s_self_new(zsock_t * pipe,void * sock)59 s_self_new (zsock_t *pipe, void *sock)
60 {
61     self_t *self = (self_t *) zmalloc (sizeof (self_t));
62     assert (self);
63     self->pipe = pipe;
64     self->monitored = zsock_resolve (sock);
65     self->poller = zpoller_new (self->pipe, NULL);
66     assert (self->poller);
67     return self;
68 }
69 
70 
71 //  --------------------------------------------------------------------------
72 //  Add listener for specified event
73 
74 static void
s_self_listen(self_t * self,const char * event)75 s_self_listen (self_t *self, const char *event)
76 {
77 #if defined (ZMQ_EVENT_ALL)
78     if (streq (event, "CONNECTED"))
79         self->events |= ZMQ_EVENT_CONNECTED;
80     else
81     if (streq (event, "CONNECT_DELAYED"))
82         self->events |= ZMQ_EVENT_CONNECT_DELAYED;
83     else
84     if (streq (event, "CONNECT_RETRIED"))
85         self->events |= ZMQ_EVENT_CONNECT_RETRIED;
86     else
87     if (streq (event, "LISTENING"))
88         self->events |= ZMQ_EVENT_LISTENING;
89     else
90     if (streq (event, "BIND_FAILED"))
91         self->events |= ZMQ_EVENT_BIND_FAILED;
92     else
93     if (streq (event, "ACCEPTED"))
94         self->events |= ZMQ_EVENT_ACCEPTED;
95     else
96     if (streq (event, "ACCEPT_FAILED"))
97         self->events |= ZMQ_EVENT_ACCEPT_FAILED;
98     else
99     if (streq (event, "CLOSED"))
100         self->events |= ZMQ_EVENT_CLOSED;
101     else
102     if (streq (event, "CLOSE_FAILED"))
103         self->events |= ZMQ_EVENT_CLOSE_FAILED;
104     else
105     if (streq (event, "DISCONNECTED"))
106         self->events |= ZMQ_EVENT_DISCONNECTED;
107     else
108 #if defined (ZMQ_EVENT_MONITOR_STOPPED)
109     if (streq (event, "MONITOR_STOPPED"))
110         self->events |= ZMQ_EVENT_MONITOR_STOPPED;
111     else
112 #endif
113 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED)
114     if (streq (event, "HANDSHAKE_FAILED"))
115         self->events |= ZMQ_EVENT_HANDSHAKE_FAILED;
116     else
117 #endif
118 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEED)
119     if (streq (event, "HANDSHAKE_SUCCEED"))
120         self->events |= ZMQ_EVENT_HANDSHAKE_SUCCEED;
121     else
122 #endif
123 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEEDED)
124     if (streq (event, "HANDSHAKE_SUCCEEDED"))
125         self->events |= ZMQ_EVENT_HANDSHAKE_SUCCEEDED;
126     else
127 #endif
128 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL)
129     if (streq (event, "HANDSHAKE_FAILED_NO_DETAIL"))
130         self->events |= ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL;
131     else
132 #endif
133 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL)
134     if (streq (event, "HANDSHAKE_FAILED_PROTOCOL"))
135         self->events |= ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL;
136     else
137 #endif
138 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)
139     if (streq (event, "HANDSHAKE_FAILED_AUTH"))
140         self->events |= ZMQ_EVENT_HANDSHAKE_FAILED_AUTH;
141     else
142 #endif
143     if (streq (event, "ALL"))
144         self->events |= ZMQ_EVENT_ALL;
145     else
146         zsys_warning ("zmonitor: - invalid listen event=%s", event);
147 #endif
148 }
149 
150 
151 //  --------------------------------------------------------------------------
152 //  Start monitoring
153 
154 static void
s_self_start(self_t * self)155 s_self_start (self_t *self)
156 {
157     assert (!self->sink);
158     char *endpoint = zsys_sprintf ("inproc://zmonitor-%p", self->monitored);
159     assert (endpoint);
160     int rc;
161 #if defined (ZMQ_EVENT_ALL)
162     rc = zmq_socket_monitor (self->monitored, endpoint, self->events);
163     assert (rc == 0);
164 #endif
165     self->sink = zsock_new (ZMQ_PAIR);
166     assert (self->sink);
167     rc = zsock_connect (self->sink, "%s", endpoint);
168     assert (rc == 0);
169     zpoller_add (self->poller, self->sink);
170     freen (endpoint);
171 }
172 
173 
174 //  --------------------------------------------------------------------------
175 //  Handle a command from calling application
176 
177 static int
s_self_handle_pipe(self_t * self)178 s_self_handle_pipe (self_t *self)
179 {
180     //  Get the whole message off the pipe in one go
181     zmsg_t *request = zmsg_recv (self->pipe);
182     if (!request)
183         return -1;                  //  Interrupted
184 
185     char *command = zmsg_popstr (request);
186     if (!command) {
187         s_self_destroy (&self);
188         return -1;
189     }
190     if (self->verbose)
191         zsys_info ("zmonitor: API command=%s", command);
192 
193     if (streq (command, "LISTEN")) {
194         char *event = zmsg_popstr (request);
195         while (event) {
196             if (self->verbose)
197                 zsys_info ("zmonitor: - listening to event=%s", event);
198             s_self_listen (self, event);
199             zstr_free (&event);
200             event = zmsg_popstr (request);
201         }
202     }
203     else
204     if (streq (command, "START")) {
205         s_self_start (self);
206         zsock_signal (self->pipe, 0);
207     }
208     else
209     if (streq (command, "VERBOSE"))
210         self->verbose = true;
211     else
212     if (streq (command, "$TERM"))
213         self->terminated = true;
214     else {
215         zsys_error ("zmonitor: - invalid command: %s", command);
216         assert (false);
217     }
218     zstr_free (&command);
219     zmsg_destroy (&request);
220     return 0;
221 }
222 
223 
224 //  Handle event from socket monitor
225 
226 static void
s_self_handle_sink(self_t * self)227 s_self_handle_sink (self_t *self)
228 {
229 #if defined (ZMQ_EVENT_ALL)
230 #if (ZMQ_VERSION_MAJOR >= 4)
231     //  First frame is event number and value
232     zframe_t *frame = zframe_recv (self->sink);
233     int event = *(uint16_t *) (zframe_data (frame));
234     int value = *(uint32_t *) (zframe_data (frame) + 2);
235     //  Address is in second message frame
236     char *address = zstr_recv (self->sink);
237     zframe_destroy (&frame);
238 
239 #elif (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR == 2)
240     //  zmq_event_t is passed as-is in the frame
241     zframe_t *frame = zframe_recv (self->sink);
242     zmq_event_t *eptr = (zmq_event_t *) zframe_data (frame);
243     int event = eptr->event;
244     int value = eptr->data.listening.fd;
245     char *address = strdup (eptr->data.listening.addr);
246     assert (address);
247     zframe_destroy (&frame);
248 
249 #else
250     //  We can't plausibly be here with other versions of libzmq
251     assert (false);
252 #endif
253 
254     //  Now map event to text equivalent
255     char *name;
256     switch (event) {
257         case ZMQ_EVENT_ACCEPTED:
258             name = "ACCEPTED";
259             break;
260         case ZMQ_EVENT_ACCEPT_FAILED:
261             name = "ACCEPT_FAILED";
262             break;
263         case ZMQ_EVENT_BIND_FAILED:
264             name = "BIND_FAILED";
265             break;
266         case ZMQ_EVENT_CLOSED:
267             name = "CLOSED";
268             break;
269         case ZMQ_EVENT_CLOSE_FAILED:
270             name = "CLOSE_FAILED";
271             break;
272         case ZMQ_EVENT_DISCONNECTED:
273             name = "DISCONNECTED";
274             break;
275         case ZMQ_EVENT_CONNECTED:
276             name = "CONNECTED";
277             break;
278         case ZMQ_EVENT_CONNECT_DELAYED:
279             name = "CONNECT_DELAYED";
280             break;
281         case ZMQ_EVENT_CONNECT_RETRIED:
282             name = "CONNECT_RETRIED";
283             break;
284         case ZMQ_EVENT_LISTENING:
285             name = "LISTENING";
286             break;
287 #if defined (ZMQ_EVENT_MONITOR_STOPPED)
288         case ZMQ_EVENT_MONITOR_STOPPED:
289             name = "MONITOR_STOPPED";
290             break;
291 #endif
292 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED)
293         case ZMQ_EVENT_HANDSHAKE_FAILED:
294             name = "HANDSHAKE_FAILED";
295             break;
296 #endif
297 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEED)
298         case ZMQ_EVENT_HANDSHAKE_SUCCEED:
299             name = "HANDSHAKE_SUCCEED";
300             break;
301 #endif
302 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEEDED)
303         case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
304             name = "HANDSHAKE_SUCCEEDED";
305             break;
306 #endif
307 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL)
308         case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
309             name = "HANDSHAKE_FAILED_NO_DETAIL";
310             break;
311 #endif
312 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL)
313         case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
314             name = "HANDSHAKE_FAILED_PROTOCOL";
315             break;
316 #endif
317 #if defined (ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)
318         case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
319             name = "HANDSHAKE_FAILED_AUTH";
320             break;
321 #endif
322         default:
323             zsys_error ("illegal socket monitor event: %d", event);
324             name = "UNKNOWN";
325             break;
326     }
327     if (self->verbose)
328         zsys_info ("zmonitor: %s - %s", name, address);
329 
330     zstr_sendfm (self->pipe, "%s", name);
331     zstr_sendfm (self->pipe, "%d", value);
332     zstr_send (self->pipe, address);
333     freen (address);
334 #endif
335 }
336 
337 
338 //  --------------------------------------------------------------------------
339 //  zmonitor() implements the zmonitor actor interface
340 
341 void
zmonitor(zsock_t * pipe,void * sock)342 zmonitor (zsock_t *pipe, void *sock)
343 {
344     self_t *self = s_self_new (pipe, sock);
345     assert (self);
346     //  Signal successful initialization
347     zsock_signal (pipe, 0);
348 
349     while (!self->terminated) {
350         zsock_t *which = (zsock_t *) zpoller_wait (self->poller, -1);
351         if (which == self->pipe)
352             s_self_handle_pipe (self);
353         else
354         if (which == self->sink)
355             s_self_handle_sink (self);
356         else
357         if (zpoller_terminated (self->poller))
358             break;          //  Interrupted
359     }
360     s_self_destroy (&self);
361 }
362 
363 
364 //  --------------------------------------------------------------------------
365 //  Selftest
366 
367 #if defined (ZMQ_EVENT_ALL)
368 static void
s_assert_event(zactor_t * self,char * expected)369 s_assert_event (zactor_t *self, char *expected)
370 {
371     zmsg_t *msg = zmsg_recv (self);
372     assert (msg);
373     char *event = zmsg_popstr (msg);
374     assert (streq (event, expected));
375     freen (event);
376     zmsg_destroy (&msg);
377 }
378 #endif
379 
380 void
zmonitor_test(bool verbose)381 zmonitor_test (bool verbose)
382 {
383     printf (" * zmonitor: ");
384     if (verbose)
385         printf ("\n");
386 
387 #if ZMQ_VERSION_MAJOR < 3
388     printf ("SKIPPED (on zmq pre-3)\n");
389     return;
390 #endif
391 
392     //  @selftest
393     zsock_t *client = zsock_new (ZMQ_DEALER);
394     assert (client);
395     zactor_t *clientmon = zactor_new (zmonitor, client);
396     assert (clientmon);
397     if (verbose)
398         zstr_sendx (clientmon, "VERBOSE", NULL);
399     zstr_sendx (clientmon, "LISTEN", "LISTENING", "ACCEPTED", NULL);
400 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEED)
401     zstr_sendx (clientmon, "LISTEN", "HANDSHAKE_SUCCEED", NULL);
402 #endif
403 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEEDED)
404     zstr_sendx (clientmon, "LISTEN", "HANDSHAKE_SUCCEEDED", NULL);
405 #endif
406     zstr_sendx (clientmon, "START", NULL);
407     zsock_wait (clientmon);
408 
409     zsock_t *server = zsock_new (ZMQ_DEALER);
410     assert (server);
411     zactor_t *servermon = zactor_new (zmonitor, server);
412     assert (servermon);
413     if (verbose)
414         zstr_sendx (servermon, "VERBOSE", NULL);
415     zstr_sendx (servermon, "LISTEN", "CONNECTED", "DISCONNECTED", NULL);
416     zstr_sendx (servermon, "START", NULL);
417     zsock_wait (servermon);
418 
419     //  Allow a brief time for the message to get there...
420     zmq_poll (NULL, 0, 200);
421 
422     //  Check client is now listening
423     int port_nbr = zsock_bind (client, "tcp://127.0.0.1:*");
424     assert (port_nbr != -1);
425     s_assert_event (clientmon, "LISTENING");
426 
427     //  Check server connected to client
428     zsock_connect (server, "tcp://127.0.0.1:%d", port_nbr);
429     s_assert_event (servermon, "CONNECTED");
430 
431     //  Check client accepted connection
432     s_assert_event (clientmon, "ACCEPTED");
433 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEED)
434     s_assert_event (clientmon, "HANDSHAKE_SUCCEED");
435 #endif
436 #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEEDED)
437     s_assert_event (clientmon, "HANDSHAKE_SUCCEEDED");
438 #endif
439 
440     zactor_destroy (&clientmon);
441     zactor_destroy (&servermon);
442     zsock_destroy (&client);
443     zsock_destroy (&server);
444 
445 #if defined (__WINDOWS__)
446     zsys_shutdown();
447 #endif
448     //  @end
449     printf ("OK\n");
450 }
451