1 /* =========================================================================
2 zpoller - trivial socket poller class
3
4 Copyright (c) the Contributors as noted in the AUTHORS file.
5 This file is part of CZMQ, the high-level C binding for 0MQ:
6 http://czmq.zeromq.org.
7
8 This Source Code Form is subject to the terms of the Mozilla Public
9 License, v. 2.0. If a copy of the MPL was not distributed with this
10 file, You can obtain one at http://mozilla.org/MPL/2.0/.
11 =========================================================================*/
12
13 /*
14 @header
15 The zpoller class provides a minimalist interface to ZeroMQ's zmq_poll
16 API, for the very common case of reading from a number of sockets.
17 It does not provide polling for output, nor polling on file handles.
18 If you need either of these, use the zmq_poll API directly.
19 @discuss
20 The class implements the poller using the zmq_poller API if that exists,
21 else does the work itself.
22 @end
23 */
24
25 #include "czmq_classes.h"
26
27 // Structure of our class
28
29 struct _zpoller_t {
30 #ifdef ZMQ_HAVE_POLLER
31 void *zmq_poller; // ZMQ poller structure
32 #else
33 zlist_t *reader_list; // List of sockets to read from
34 zmq_pollitem_t *poll_set; // Current zmq_poll set
35 void **poll_readers; // Matching table of socket readers
36 size_t poll_size; // Size of poll set
37 bool need_rebuild; // Does pollset need rebuilding?
38 #endif
39 bool expired; // Did poll timer expire?
40 bool terminated; // Did poll call end with EINTR?
41 bool nonstop; // Don't stop running on Ctrl-C
42 };
43
44
45 #ifndef ZMQ_HAVE_POLLER
46 static int
s_rebuild_poll_set(zpoller_t * self)47 s_rebuild_poll_set (zpoller_t *self)
48 {
49 freen (self->poll_set);
50 self->poll_set = NULL;
51 freen (self->poll_readers);
52 self->poll_readers = NULL;
53
54 self->poll_size = zlist_size (self->reader_list);
55 self->poll_set = (zmq_pollitem_t *)
56 zmalloc (self->poll_size * sizeof (zmq_pollitem_t));
57 self->poll_readers = (void **) zmalloc (self->poll_size * sizeof (void *));
58 if (!self->poll_set || !self->poll_readers)
59 return -1;
60
61 uint reader_nbr = 0;
62 void *reader = zlist_first (self->reader_list);
63 while (reader) {
64 self->poll_readers [reader_nbr] = reader;
65 void *socket = zsock_resolve (reader);
66 if (socket == NULL) {
67 self->poll_set [reader_nbr].socket = NULL;
68 self->poll_set [reader_nbr].fd = *(SOCKET *) reader;
69 }
70 else
71 self->poll_set [reader_nbr].socket = socket;
72 self->poll_set [reader_nbr].events = ZMQ_POLLIN;
73
74 reader_nbr++;
75 reader = zlist_next (self->reader_list);
76 }
77 self->need_rebuild = false;
78 return 0;
79 }
80 #endif
81
82
83 // --------------------------------------------------------------------------
84 // Create new poller, specifying zero or more readers. The list of
85 // readers ends in a NULL. Each reader can be a zsock_t instance, a
86 // zactor_t instance, a libzmq socket (void *), or a file handle.
87
88 zpoller_t *
zpoller_new(void * reader,...)89 zpoller_new (void *reader, ...)
90 {
91 zpoller_t *self = (zpoller_t *) zmalloc (sizeof (zpoller_t));
92 assert (self);
93 #ifdef ZMQ_HAVE_POLLER
94 self->zmq_poller = zmq_poller_new ();
95 assert (self->zmq_poller);
96 #else
97 self->reader_list = zlist_new ();
98 assert (self->reader_list);
99 #endif
100 va_list args;
101 va_start (args, reader);
102 while (reader) {
103 if (zpoller_add (self, reader)) {
104 zpoller_destroy (&self);
105 break;
106 }
107 reader = va_arg (args, void *);
108 }
109 va_end (args);
110 return self;
111 }
112
113
114 // --------------------------------------------------------------------------
115 // Destroy a poller
116
117 void
zpoller_destroy(zpoller_t ** self_p)118 zpoller_destroy (zpoller_t **self_p)
119 {
120 assert (self_p);
121 if (*self_p) {
122 zpoller_t *self = *self_p;
123 #ifdef ZMQ_HAVE_POLLER
124 zmq_poller_destroy (&self->zmq_poller);
125 #else
126 zlist_destroy (&self->reader_list);
127 freen (self->poll_readers);
128 freen (self->poll_set);
129 #endif
130 freen (self);
131 *self_p = NULL;
132 }
133 }
134
135
136 // --------------------------------------------------------------------------
137 // Add a reader to be polled. Returns 0 if OK, -1 on failure. The reader may
138 // be a libzmq void * socket, a zsock_t instance, or a zactor_t instance.
139
140 int
zpoller_add(zpoller_t * self,void * reader)141 zpoller_add (zpoller_t *self, void *reader)
142 {
143 assert (self);
144 assert (reader);
145 int rc = 0;
146 #ifdef ZMQ_HAVE_POLLER
147 void *socket = zsock_resolve (reader);
148 if (socket)
149 rc = zmq_poller_add (self->zmq_poller, socket, reader, ZMQ_POLLIN);
150 else
151 rc = zmq_poller_add_fd (self->zmq_poller, *(SOCKET *) reader, reader, ZMQ_POLLIN);
152 #else
153 zlist_append (self->reader_list, reader);
154 self->need_rebuild = true;
155 #endif
156 return rc;
157 }
158
159
160 // --------------------------------------------------------------------------
161 // Remove a reader from the poller; returns 0 if OK, -1 on failure. The reader
162 // must have been passed during construction, or in an zpoller_add () call.
163
164 int
zpoller_remove(zpoller_t * self,void * reader)165 zpoller_remove (zpoller_t *self, void *reader)
166 {
167 assert (self);
168 assert (reader);
169 int rc = 0;
170 #ifdef ZMQ_HAVE_POLLER
171 void *socket = zsock_resolve (reader);
172 if (socket)
173 rc = zmq_poller_remove (self->zmq_poller, socket);
174 else
175 rc = zmq_poller_remove_fd (self->zmq_poller, *(SOCKET *) reader);
176 #else
177 size_t num_readers_before = zlist_size (self->reader_list);
178 zlist_remove (self->reader_list, reader); // won't fail with non-existent reader
179 size_t num_readers_after = zlist_size (self->reader_list);
180 if (num_readers_before != num_readers_after)
181 self->need_rebuild = true;
182 else {
183 errno = EINVAL;
184 rc = -1;
185 }
186 #endif
187 return rc;
188 }
189
190
191 // --------------------------------------------------------------------------
192 // By default the poller stops if the process receives a SIGINT or SIGTERM
193 // signal. This makes it impossible to shut-down message based architectures
194 // like zactors. This method lets you switch off break handling. The default
195 // nonstop setting is off (false).
196
197 void
zpoller_set_nonstop(zpoller_t * self,bool nonstop)198 zpoller_set_nonstop (zpoller_t *self, bool nonstop)
199 {
200 assert (self);
201 self->nonstop = nonstop;
202 }
203
204
205 // --------------------------------------------------------------------------
206 // Poll the registered readers for I/O, return first reader that has input.
207 // The reader will be a libzmq void * socket, or a zsock_t or zactor_t
208 // instance as specified in zpoller_new/zpoller_add. The timeout should be
209 // zero or greater, or -1 to wait indefinitely. Socket priority is defined
210 // by their order in the poll list. If you need a balanced poll, use the low
211 // level zmq_poll method directly. If the poll call was interrupted (SIGINT),
212 // or the ZMQ context was destroyed, or the timeout expired, returns NULL.
213 // You can test the actual exit condition by calling zpoller_expired () and
214 // zpoller_terminated (). The timeout is in msec.
215
216 void *
zpoller_wait(zpoller_t * self,int timeout)217 zpoller_wait (zpoller_t *self, int timeout)
218 {
219 assert (self);
220 self->expired = false;
221 if (zsys_interrupted && !self->nonstop) {
222 self->terminated = true;
223 return NULL;
224 }
225 else
226 self->terminated = false;
227
228 #ifdef ZMQ_HAVE_POLLER
229 zmq_poller_event_t event;
230 if (!zmq_poller_wait (self->zmq_poller, &event, timeout))
231 return event.user_data;
232 else
233 if (errno == ETIMEDOUT || errno == EAGAIN)
234 self->expired = true;
235 else
236 if (zsys_interrupted && !self->nonstop)
237 self->terminated = true;
238
239 return NULL;
240 #else
241 if (self->need_rebuild)
242 s_rebuild_poll_set (self);
243 int rc = zmq_poll (self->poll_set, (int) self->poll_size, timeout * ZMQ_POLL_MSEC);
244 if (rc > 0) {
245 uint reader = 0;
246 for (reader = 0; reader < self->poll_size; reader++)
247 if (self->poll_set [reader].revents & ZMQ_POLLIN)
248 return self->poll_readers [reader];
249 }
250 else
251 if (rc == -1 || (zsys_interrupted && !self->nonstop))
252 self->terminated = true;
253 else
254 if (rc == 0)
255 self->expired = true;
256
257 return NULL;
258 #endif
259 }
260
261
262 // --------------------------------------------------------------------------
263 // Return true if the last zpoller_wait () call ended because the timeout
264 // expired, without any error.
265
266 bool
zpoller_expired(zpoller_t * self)267 zpoller_expired (zpoller_t *self)
268 {
269 assert (self);
270 return self->expired;
271 }
272
273
274 // --------------------------------------------------------------------------
275 // Return true if the last zpoller_wait () call ended because the process
276 // was interrupted, or the parent context was destroyed.
277
278 bool
zpoller_terminated(zpoller_t * self)279 zpoller_terminated (zpoller_t *self)
280 {
281 assert (self);
282 return self->terminated;
283 }
284
285
286 // --------------------------------------------------------------------------
287 // Self test of this class
288
289 void
zpoller_test(bool verbose)290 zpoller_test (bool verbose)
291 {
292 printf (" * zpoller: ");
293
294 // @selftest
295 // Create a few sockets
296 zsock_t *vent = zsock_new (ZMQ_PUSH);
297 assert (vent);
298 int port_nbr = zsock_bind (vent, "tcp://127.0.0.1:*");
299 assert (port_nbr != -1);
300 zsock_t *sink = zsock_new (ZMQ_PULL);
301 assert (sink);
302 int rc = zsock_connect (sink, "tcp://127.0.0.1:%d", port_nbr);
303 assert (rc != -1);
304 zsock_t *bowl = zsock_new (ZMQ_PULL);
305 assert (bowl);
306 zsock_t *dish = zsock_new (ZMQ_PULL);
307 assert (dish);
308
309 // Set up poller
310 zpoller_t *poller = zpoller_new (bowl, dish, NULL);
311 assert (poller);
312
313 // Add a reader to the existing poller
314 rc = zpoller_add (poller, sink);
315 assert (rc == 0);
316
317 zstr_send (vent, "Hello, World");
318
319 // We expect a message only on the sink
320 zsock_t *which = (zsock_t *) zpoller_wait (poller, -1);
321 assert (which == sink);
322 assert (zpoller_expired (poller) == false);
323 assert (zpoller_terminated (poller) == false);
324 char *message = zstr_recv (which);
325 assert (streq (message, "Hello, World"));
326 zstr_free (&message);
327
328 // Stop polling reader
329 rc = zpoller_remove (poller, sink);
330 assert (rc == 0);
331
332 // Removing a non-existent reader shall fail
333 rc = zpoller_remove (poller, sink);
334 assert (rc == -1);
335 assert (errno == EINVAL);
336
337 // Check we can poll an FD
338 rc = zsock_connect (bowl, "tcp://127.0.0.1:%d", port_nbr);
339 assert (rc != -1);
340 SOCKET fd = zsock_fd (bowl);
341 rc = zpoller_add (poller, (void *) &fd);
342 assert (rc != -1);
343 zstr_send (vent, "Hello again, world");
344 assert (zpoller_wait (poller, 500) == &fd);
345
346 // Check zpoller_set_nonstop ()
347 zsys_interrupted = 1;
348 zpoller_wait (poller, 0);
349 assert (zpoller_terminated (poller));
350 zpoller_set_nonstop (poller, true);
351 zpoller_wait (poller, 0);
352 assert (!zpoller_terminated (poller));
353 zsys_interrupted = 0;
354
355 zpoller_destroy (&poller);
356 zsock_destroy (&vent);
357 zsock_destroy (&sink);
358 zsock_destroy (&bowl);
359 zsock_destroy (&dish);
360
361 #ifdef ZMQ_SERVER
362 // Check thread safe sockets
363 zpoller_destroy (&poller);
364 zsock_t *client = zsock_new (ZMQ_CLIENT);
365 assert (client);
366 zsock_t *server = zsock_new (ZMQ_SERVER);
367 assert (server);
368 poller = zpoller_new (client, server, NULL);
369 assert (poller);
370 port_nbr = zsock_bind (server, "tcp://127.0.0.1:*");
371 assert (port_nbr != -1);
372 rc = zsock_connect (client, "tcp://127.0.0.1:%d", port_nbr);
373 assert (rc != -1);
374
375 zstr_send (client, "Hello, World");
376
377 // We expect a message only on the server
378 which = (zsock_t *) zpoller_wait (poller, -1);
379 assert (which == server);
380 assert (zpoller_expired (poller) == false);
381 assert (zpoller_terminated (poller) == false);
382 message = zstr_recv (which);
383 assert (streq (message, "Hello, World"));
384 zstr_free (&message);
385
386 zpoller_destroy (&poller);
387 zsock_destroy (&client);
388 zsock_destroy (&server);
389 #endif
390
391 #if defined (__WINDOWS__)
392 zsys_shutdown();
393 #endif
394 // @end
395
396 printf ("OK\n");
397 }
398