1 /*  =========================================================================
2     zpoller - trivial socket poller class
3 
4     Copyright (c) the Contributors as noted in the AUTHORS file.
5     This file is part of CZMQ, the high-level C binding for 0MQ:
6     http://czmq.zeromq.org.
7 
8     This Source Code Form is subject to the terms of the Mozilla Public
9     License, v. 2.0. If a copy of the MPL was not distributed with this
10     file, You can obtain one at http://mozilla.org/MPL/2.0/.
11     =========================================================================*/
12 
13 /*
14 @header
15     The zpoller class provides a minimalist interface to ZeroMQ's zmq_poll
16     API, for the very common case of reading from a number of sockets.
17     It does not provide polling for output, nor polling on file handles.
18     If you need either of these, use the zmq_poll API directly.
19 @discuss
20     The class implements the poller using the zmq_poller API if that exists,
21     else does the work itself.
22 @end
23 */
24 
25 #include "czmq_classes.h"
26 
27 //  Structure of our class
28 
29 struct _zpoller_t {
30 #ifdef ZMQ_HAVE_POLLER
31     void *zmq_poller;           //  ZMQ poller structure
32 #else
33     zlist_t *reader_list;       //  List of sockets to read from
34     zmq_pollitem_t *poll_set;   //  Current zmq_poll set
35     void **poll_readers;        //  Matching table of socket readers
36     size_t poll_size;           //  Size of poll set
37     bool need_rebuild;          //  Does pollset need rebuilding?
38 #endif
39     bool expired;               //  Did poll timer expire?
40     bool terminated;            //  Did poll call end with EINTR?
41     bool nonstop;               //  Don't stop running on Ctrl-C
42 };
43 
44 
45 #ifndef ZMQ_HAVE_POLLER
46 static int
s_rebuild_poll_set(zpoller_t * self)47 s_rebuild_poll_set (zpoller_t *self)
48 {
49     freen (self->poll_set);
50     self->poll_set = NULL;
51     freen (self->poll_readers);
52     self->poll_readers = NULL;
53 
54     self->poll_size = zlist_size (self->reader_list);
55     self->poll_set = (zmq_pollitem_t *)
56                       zmalloc (self->poll_size * sizeof (zmq_pollitem_t));
57     self->poll_readers = (void **) zmalloc (self->poll_size * sizeof (void *));
58     if (!self->poll_set || !self->poll_readers)
59         return -1;
60 
61     uint reader_nbr = 0;
62     void *reader = zlist_first (self->reader_list);
63     while (reader) {
64         self->poll_readers [reader_nbr] = reader;
65         void *socket = zsock_resolve (reader);
66         if (socket == NULL) {
67             self->poll_set [reader_nbr].socket = NULL;
68             self->poll_set [reader_nbr].fd = *(SOCKET *) reader;
69         }
70         else
71             self->poll_set [reader_nbr].socket = socket;
72         self->poll_set [reader_nbr].events = ZMQ_POLLIN;
73 
74         reader_nbr++;
75         reader = zlist_next (self->reader_list);
76     }
77     self->need_rebuild = false;
78     return 0;
79 }
80 #endif
81 
82 
83 //  --------------------------------------------------------------------------
84 //  Create new poller, specifying zero or more readers. The list of
85 //  readers ends in a NULL. Each reader can be a zsock_t instance, a
86 //  zactor_t instance, a libzmq socket (void *), or a file handle.
87 
88 zpoller_t *
zpoller_new(void * reader,...)89 zpoller_new (void *reader, ...)
90 {
91     zpoller_t *self = (zpoller_t *) zmalloc (sizeof (zpoller_t));
92     assert (self);
93 #ifdef ZMQ_HAVE_POLLER
94     self->zmq_poller = zmq_poller_new ();
95     assert (self->zmq_poller);
96 #else
97     self->reader_list = zlist_new ();
98     assert (self->reader_list);
99 #endif
100     va_list args;
101     va_start (args, reader);
102     while (reader) {
103         if (zpoller_add (self, reader)) {
104             zpoller_destroy (&self);
105             break;
106         }
107         reader = va_arg (args, void *);
108     }
109     va_end (args);
110     return self;
111 }
112 
113 
114 //  --------------------------------------------------------------------------
115 //  Destroy a poller
116 
117 void
zpoller_destroy(zpoller_t ** self_p)118 zpoller_destroy (zpoller_t **self_p)
119 {
120     assert (self_p);
121     if (*self_p) {
122         zpoller_t *self = *self_p;
123 #ifdef ZMQ_HAVE_POLLER
124         zmq_poller_destroy (&self->zmq_poller);
125 #else
126         zlist_destroy (&self->reader_list);
127         freen (self->poll_readers);
128         freen (self->poll_set);
129 #endif
130         freen (self);
131         *self_p = NULL;
132     }
133 }
134 
135 
136 //  --------------------------------------------------------------------------
137 //  Add a reader to be polled. Returns 0 if OK, -1 on failure. The reader may
138 //  be a libzmq void * socket, a zsock_t instance, or a zactor_t instance.
139 
140 int
zpoller_add(zpoller_t * self,void * reader)141 zpoller_add (zpoller_t *self, void *reader)
142 {
143     assert (self);
144     assert (reader);
145     int rc = 0;
146 #ifdef ZMQ_HAVE_POLLER
147     void *socket = zsock_resolve (reader);
148     if (socket)
149         rc = zmq_poller_add (self->zmq_poller, socket, reader, ZMQ_POLLIN);
150     else
151         rc = zmq_poller_add_fd (self->zmq_poller, *(SOCKET *) reader, reader, ZMQ_POLLIN);
152 #else
153     zlist_append (self->reader_list, reader);
154     self->need_rebuild = true;
155 #endif
156     return rc;
157 }
158 
159 
160 //  --------------------------------------------------------------------------
161 //  Remove a reader from the poller; returns 0 if OK, -1 on failure. The reader
162 //  must have been passed during construction, or in an zpoller_add () call.
163 
164 int
zpoller_remove(zpoller_t * self,void * reader)165 zpoller_remove (zpoller_t *self, void *reader)
166 {
167     assert (self);
168     assert (reader);
169     int rc = 0;
170 #ifdef ZMQ_HAVE_POLLER
171     void *socket = zsock_resolve (reader);
172     if (socket)
173         rc = zmq_poller_remove (self->zmq_poller, socket);
174     else
175         rc = zmq_poller_remove_fd (self->zmq_poller, *(SOCKET *) reader);
176 #else
177     size_t num_readers_before = zlist_size (self->reader_list);
178     zlist_remove (self->reader_list, reader); // won't fail with non-existent reader
179     size_t num_readers_after = zlist_size (self->reader_list);
180     if (num_readers_before != num_readers_after)
181         self->need_rebuild = true;
182     else {
183         errno = EINVAL;
184         rc    = -1;
185     }
186 #endif
187     return rc;
188 }
189 
190 
191 //  --------------------------------------------------------------------------
192 //  By default the poller stops if the process receives a SIGINT or SIGTERM
193 //  signal. This makes it impossible to shut-down message based architectures
194 //  like zactors. This method lets you switch off break handling. The default
195 //  nonstop setting is off (false).
196 
197 void
zpoller_set_nonstop(zpoller_t * self,bool nonstop)198 zpoller_set_nonstop (zpoller_t *self, bool nonstop)
199 {
200     assert (self);
201     self->nonstop = nonstop;
202 }
203 
204 
205 //  --------------------------------------------------------------------------
206 //  Poll the registered readers for I/O, return first reader that has input.
207 //  The reader will be a libzmq void * socket, or a zsock_t or zactor_t
208 //  instance as specified in zpoller_new/zpoller_add. The timeout should be
209 //  zero or greater, or -1 to wait indefinitely. Socket priority is defined
210 //  by their order in the poll list. If you need a balanced poll, use the low
211 //  level zmq_poll method directly. If the poll call was interrupted (SIGINT),
212 //  or the ZMQ context was destroyed, or the timeout expired, returns NULL.
213 //  You can test the actual exit condition by calling zpoller_expired () and
214 //  zpoller_terminated (). The timeout is in msec.
215 
216 void *
zpoller_wait(zpoller_t * self,int timeout)217 zpoller_wait (zpoller_t *self, int timeout)
218 {
219     assert (self);
220     self->expired = false;
221     if (zsys_interrupted && !self->nonstop) {
222         self->terminated = true;
223         return NULL;
224     }
225     else
226         self->terminated = false;
227 
228 #ifdef ZMQ_HAVE_POLLER
229     zmq_poller_event_t event;
230     if (!zmq_poller_wait (self->zmq_poller, &event, timeout))
231         return event.user_data;
232     else
233     if (errno == ETIMEDOUT || errno == EAGAIN)
234         self->expired = true;
235     else
236     if (zsys_interrupted && !self->nonstop)
237         self->terminated = true;
238 
239     return NULL;
240 #else
241     if (self->need_rebuild)
242         s_rebuild_poll_set (self);
243     int rc = zmq_poll (self->poll_set, (int) self->poll_size, timeout * ZMQ_POLL_MSEC);
244     if (rc > 0) {
245         uint reader = 0;
246         for (reader = 0; reader < self->poll_size; reader++)
247             if (self->poll_set [reader].revents & ZMQ_POLLIN)
248                 return self->poll_readers [reader];
249     }
250     else
251     if (rc == -1 || (zsys_interrupted && !self->nonstop))
252         self->terminated = true;
253     else
254     if (rc == 0)
255         self->expired = true;
256 
257     return NULL;
258 #endif
259 }
260 
261 
262 //  --------------------------------------------------------------------------
263 //  Return true if the last zpoller_wait () call ended because the timeout
264 //  expired, without any error.
265 
266 bool
zpoller_expired(zpoller_t * self)267 zpoller_expired (zpoller_t *self)
268 {
269     assert (self);
270     return self->expired;
271 }
272 
273 
274 //  --------------------------------------------------------------------------
275 //  Return true if the last zpoller_wait () call ended because the process
276 //  was interrupted, or the parent context was destroyed.
277 
278 bool
zpoller_terminated(zpoller_t * self)279 zpoller_terminated (zpoller_t *self)
280 {
281     assert (self);
282     return self->terminated;
283 }
284 
285 
286 //  --------------------------------------------------------------------------
287 //  Self test of this class
288 
289 void
zpoller_test(bool verbose)290 zpoller_test (bool verbose)
291 {
292     printf (" * zpoller: ");
293 
294     //  @selftest
295     //  Create a few sockets
296     zsock_t *vent = zsock_new (ZMQ_PUSH);
297     assert (vent);
298     int port_nbr = zsock_bind (vent, "tcp://127.0.0.1:*");
299     assert (port_nbr != -1);
300     zsock_t *sink = zsock_new (ZMQ_PULL);
301     assert (sink);
302     int rc = zsock_connect (sink, "tcp://127.0.0.1:%d", port_nbr);
303     assert (rc != -1);
304     zsock_t *bowl = zsock_new (ZMQ_PULL);
305     assert (bowl);
306     zsock_t *dish = zsock_new (ZMQ_PULL);
307     assert (dish);
308 
309     //  Set up poller
310     zpoller_t *poller = zpoller_new (bowl, dish, NULL);
311     assert (poller);
312 
313     // Add a reader to the existing poller
314     rc = zpoller_add (poller, sink);
315     assert (rc == 0);
316 
317     zstr_send (vent, "Hello, World");
318 
319     //  We expect a message only on the sink
320     zsock_t *which = (zsock_t *) zpoller_wait (poller, -1);
321     assert (which == sink);
322     assert (zpoller_expired (poller) == false);
323     assert (zpoller_terminated (poller) == false);
324     char *message = zstr_recv (which);
325     assert (streq (message, "Hello, World"));
326     zstr_free (&message);
327 
328     //  Stop polling reader
329     rc = zpoller_remove (poller, sink);
330     assert (rc == 0);
331 
332     // Removing a non-existent reader shall fail
333     rc = zpoller_remove (poller, sink);
334     assert (rc == -1);
335     assert (errno == EINVAL);
336 
337     //  Check we can poll an FD
338     rc = zsock_connect (bowl, "tcp://127.0.0.1:%d", port_nbr);
339     assert (rc != -1);
340     SOCKET fd = zsock_fd (bowl);
341     rc = zpoller_add (poller, (void *) &fd);
342     assert (rc != -1);
343     zstr_send (vent, "Hello again, world");
344     assert (zpoller_wait (poller, 500) == &fd);
345 
346     // Check zpoller_set_nonstop ()
347     zsys_interrupted = 1;
348     zpoller_wait (poller, 0);
349     assert (zpoller_terminated (poller));
350     zpoller_set_nonstop (poller, true);
351     zpoller_wait (poller, 0);
352     assert (!zpoller_terminated (poller));
353     zsys_interrupted = 0;
354 
355     zpoller_destroy (&poller);
356     zsock_destroy (&vent);
357     zsock_destroy (&sink);
358     zsock_destroy (&bowl);
359     zsock_destroy (&dish);
360 
361 #ifdef ZMQ_SERVER
362     //  Check thread safe sockets
363     zpoller_destroy (&poller);
364     zsock_t *client = zsock_new (ZMQ_CLIENT);
365     assert (client);
366     zsock_t *server = zsock_new (ZMQ_SERVER);
367     assert (server);
368     poller = zpoller_new (client, server, NULL);
369     assert (poller);
370     port_nbr = zsock_bind (server, "tcp://127.0.0.1:*");
371     assert (port_nbr != -1);
372     rc = zsock_connect (client, "tcp://127.0.0.1:%d", port_nbr);
373     assert (rc != -1);
374 
375     zstr_send (client, "Hello, World");
376 
377     //  We expect a message only on the server
378     which = (zsock_t *) zpoller_wait (poller, -1);
379     assert (which == server);
380     assert (zpoller_expired (poller) == false);
381     assert (zpoller_terminated (poller) == false);
382     message = zstr_recv (which);
383     assert (streq (message, "Hello, World"));
384     zstr_free (&message);
385 
386     zpoller_destroy (&poller);
387     zsock_destroy (&client);
388     zsock_destroy (&server);
389 #endif
390 
391 #if defined (__WINDOWS__)
392     zsys_shutdown();
393 #endif
394     //  @end
395 
396     printf ("OK\n");
397 }
398