1 /* =========================================================================
2 zloop - event-driven reactor
3
4 Copyright (c) the Contributors as noted in the AUTHORS file.
5 This file is part of CZMQ, the high-level C binding for 0MQ:
6 http://czmq.zeromq.org.
7
8 This Source Code Form is subject to the terms of the Mozilla Public
9 License, v. 2.0. If a copy of the MPL was not distributed with this
10 file, You can obtain one at http://mozilla.org/MPL/2.0/.
11 =========================================================================
12 */
13
14 /*
15 @header
16 The zloop class provides an event-driven reactor pattern. The reactor
17 handles zmq_pollitem_t items (pollers or writers, sockets or fds), and
18 once-off or repeated timers. Its resolution is 1 msec. It uses a tickless
19 timer to reduce CPU interrupts in inactive processes.
20 @discuss
21 @end
22 */
23
24 #include "czmq_classes.h"
25
26 typedef struct _s_reader_t s_reader_t;
27 typedef struct _s_poller_t s_poller_t;
28 typedef struct _s_timer_t s_timer_t;
29 typedef struct _s_ticket_t s_ticket_t;
30
31 // Structure of our class
32
33 struct _zloop_t {
34 zlistx_t *readers; // List of socket readers
35 zlistx_t *pollers; // List of poll items
36 zlistx_t *timers; // List of timers
37 zlistx_t *tickets; // List of tickets
38 int last_timer_id; // Most recent timer id
39 size_t max_timers; // Limit on number of timers
40 size_t ticket_delay; // Ticket delay value
41 size_t poll_size; // Size of poll set
42 zmq_pollitem_t *pollset; // zmq_poll set
43 s_reader_t *readact; // Readers for this poll set
44 s_poller_t *pollact; // Pollers for this poll set
45 bool need_rebuild; // True if pollset needs rebuilding
46 bool verbose; // True if verbose tracing wanted
47 bool terminated; // True when stopped running
48 bool nonstop; // Don't stop running on Ctrl-C
49 zlistx_t *zombies; // List of timers to kill
50 };
51
52 // Reactor elements are held as structures of their own
53
54 struct _s_reader_t {
55 void *list_handle; // Handle into list
56 zsock_t *sock; // Socket to read from
57 zloop_reader_fn *handler; // Function to execute
58 void *arg; // Application argument to poll item
59 int errors; // If too many errors, kill reader
60 bool tolerant; // Unless configured as tolerant
61 };
62
63 struct _s_poller_t {
64 void *list_handle; // Handle into list
65 zmq_pollitem_t item; // ZeroMQ socket or file descriptor
66 zloop_fn *handler; // Function to execute
67 void *arg; // Application argument to poll item
68 int errors; // If too many errors, kill poller
69 bool tolerant; // Unless configured as tolerant
70 };
71
72 struct _s_timer_t {
73 void *list_handle; // Handle into list
74 int timer_id; // Unique timer id, used to cancel timer
75 zloop_timer_fn *handler; // Function to execute
76 size_t delay; // Delay (ms) between executing
77 size_t times; // Number of times to repeat, 0 for forever
78 void *arg; // Application argument to timer
79 int64_t when; // Clock time when alarm goes off
80 };
81
82 // As we pass void * to/from the caller for working with tickets, we
83 // check validity using an object tag. This value is unique in CZMQ.
84 #define TICKET_TAG 0xcafe0007
85
86 struct _s_ticket_t {
87 uint32_t tag; // Object tag for runtime detection
88 void *list_handle; // Handle into list
89 size_t delay; // Delay (ms) before executing
90 int64_t when; // Clock time to invoke the ticket
91 zloop_timer_fn *handler; // Function to execute (use timer fn)
92 void *arg; // Application argument to function
93 bool deleted; // Flag as deleted (to clean up later)
94 };
95
96 static int
s_next_timer_id(zloop_t * self)97 s_next_timer_id (zloop_t *self)
98 {
99 return ++self->last_timer_id;
100 }
101
102 static s_reader_t *
s_reader_new(zsock_t * sock,zloop_reader_fn handler,void * arg)103 s_reader_new (zsock_t *sock, zloop_reader_fn handler, void *arg)
104 {
105 s_reader_t *self = (s_reader_t *) zmalloc (sizeof (s_reader_t));
106 assert (self);
107 self->sock = sock;
108 self->handler = handler;
109 self->arg = arg;
110 self->tolerant = false; // By default, errors are bad
111 return self;
112 }
113
114 static void
s_reader_destroy(s_reader_t ** self_p)115 s_reader_destroy (s_reader_t **self_p)
116 {
117 assert (self_p);
118 s_reader_t *self = *self_p;
119 if (self) {
120 freen (self);
121 *self_p = NULL;
122 }
123 }
124
125 static s_poller_t *
s_poller_new(zmq_pollitem_t * item,zloop_fn handler,void * arg)126 s_poller_new (zmq_pollitem_t *item, zloop_fn handler, void *arg)
127 {
128 s_poller_t *self = (s_poller_t *) zmalloc (sizeof (s_poller_t));
129 assert (self);
130 self->item = *item;
131 self->handler = handler;
132 self->arg = arg;
133 self->tolerant = false; // By default, errors are bad
134 return self;
135 }
136
137 static void
s_poller_destroy(s_poller_t ** self_p)138 s_poller_destroy (s_poller_t **self_p)
139 {
140 assert (self_p);
141 s_poller_t *self = *self_p;
142 if (self) {
143 freen (self);
144 *self_p = NULL;
145 }
146 }
147
148
149 static s_timer_t *
s_timer_new(int timer_id,size_t delay,size_t times,zloop_timer_fn handler,void * arg)150 s_timer_new (int timer_id, size_t delay, size_t times, zloop_timer_fn handler, void *arg)
151 {
152 s_timer_t *self = (s_timer_t *) zmalloc (sizeof (s_timer_t));
153 assert (self);
154 self->timer_id = timer_id;
155 self->delay = delay;
156 self->times = times;
157 self->when = zclock_mono () + delay;
158 self->handler = handler;
159 self->arg = arg;
160 return self;
161 }
162
163 static void
s_timer_destroy(s_timer_t ** self_p)164 s_timer_destroy (s_timer_t **self_p)
165 {
166 assert (self_p);
167 s_timer_t *self = *self_p;
168 if (self) {
169 freen (self);
170 *self_p = NULL;
171 }
172 }
173
174 static int
s_timer_comparator(s_timer_t * first,s_timer_t * second)175 s_timer_comparator (s_timer_t *first, s_timer_t *second)
176 {
177 if (first->when > second->when)
178 return 1;
179 else
180 if (first->when < second->when)
181 return -1;
182 else
183 return 0;
184 }
185
186 static s_ticket_t *
s_ticket_new(size_t delay,zloop_timer_fn handler,void * arg)187 s_ticket_new (size_t delay, zloop_timer_fn handler, void *arg)
188 {
189 s_ticket_t *self = (s_ticket_t *) zmalloc (sizeof (s_ticket_t));
190 assert (self);
191 self->tag = TICKET_TAG;
192 self->delay = delay;
193 self->when = zclock_mono () + delay;
194 self->handler = handler;
195 self->arg = arg;
196 return self;
197 }
198
199 static void
s_ticket_destroy(s_ticket_t ** self_p)200 s_ticket_destroy (s_ticket_t **self_p)
201 {
202 assert (self_p);
203 s_ticket_t *self = *self_p;
204 if (self) {
205 self->tag = 0xDeadBeef;
206 freen (self);
207 *self_p = NULL;
208 }
209 }
210
211 static int
s_ticket_comparator(s_ticket_t * first,s_ticket_t * second)212 s_ticket_comparator (s_ticket_t *first, s_ticket_t *second)
213 {
214 if (first->when > second->when)
215 return 1;
216 else
217 if (first->when < second->when)
218 return -1;
219 else
220 return 0;
221 }
222
223 // Remove timer with specified id, if it exists
224
225 static void
s_timer_remove(zloop_t * self,int timer_id)226 s_timer_remove (zloop_t *self, int timer_id)
227 {
228 s_timer_t *timer = (s_timer_t *) zlistx_first (self->timers);
229 while (timer) {
230 if (timer->timer_id == timer_id) {
231 zlistx_delete (self->timers, timer->list_handle);
232 break;
233 }
234 timer = (s_timer_t *) zlistx_next (self->timers);
235 }
236 }
237
238
239 // We hold an array of pollers that matches the pollset, so we can
240 // register/cancel pollers orthogonally to executing the pollset
241 // activity on pollers. Returns 0 on success, -1 on failure.
242
243 static int
s_rebuild_pollset(zloop_t * self)244 s_rebuild_pollset (zloop_t *self)
245 {
246 self->poll_size = zlistx_size (self->readers) + zlistx_size (self->pollers);
247
248 freen (self->pollset);
249 self->pollset = (zmq_pollitem_t *) zmalloc (self->poll_size * sizeof (zmq_pollitem_t));
250 assert (self->pollset);
251
252 freen (self->readact);
253 self->readact = (s_reader_t *) zmalloc (self->poll_size * sizeof (s_reader_t));
254 assert (self->readact);
255
256 freen (self->pollact);
257 self->pollact = (s_poller_t *) zmalloc (self->poll_size * sizeof (s_poller_t));
258 assert (self->pollact);
259
260 s_reader_t *reader = (s_reader_t *) zlistx_first (self->readers);
261 uint item_nbr = 0;
262 while (reader) {
263 zmq_pollitem_t poll_item = { zsock_resolve (reader->sock), 0, ZMQ_POLLIN, 0 };
264 self->pollset [item_nbr] = poll_item;
265 self->readact [item_nbr] = *reader;
266 item_nbr++;
267 reader = (s_reader_t *) zlistx_next (self->readers);
268 }
269 s_poller_t *poller = (s_poller_t *) zlistx_first (self->pollers);
270 while (poller) {
271 self->pollset [item_nbr] = poller->item;
272 self->pollact [item_nbr] = *poller;
273 item_nbr++;
274 poller = (s_poller_t *) zlistx_next (self->pollers);
275 }
276 self->need_rebuild = false;
277 return 0;
278 }
279
280 static long
s_tickless(zloop_t * self)281 s_tickless (zloop_t *self)
282 {
283 // Calculate tickless timer, up to 1 hour
284 int64_t tickless = zclock_mono () + 1000 * 3600;
285
286 // Scan timers, which are not sorted
287 // TODO: sort timers properly on insertion
288 s_timer_t *timer = (s_timer_t *) zlistx_first (self->timers);
289 while (timer) {
290 // Find earliest timer
291 if (tickless > timer->when)
292 tickless = timer->when;
293 timer = (s_timer_t *) zlistx_next (self->timers);
294 }
295 // Tickets are sorted, so check first ticket
296 s_ticket_t *ticket = (s_ticket_t *) zlistx_first (self->tickets);
297 if (ticket && tickless > ticket->when)
298 tickless = ticket->when;
299
300 long timeout = (long) (tickless - zclock_mono ());
301 if (timeout < 0)
302 timeout = 0;
303 if (self->verbose)
304 zsys_debug ("zloop polling for %d msec", (int) timeout);
305
306 return timeout * ZMQ_POLL_MSEC;
307 }
308
309
310 // --------------------------------------------------------------------------
311 // Constructor
312
313 zloop_t *
zloop_new(void)314 zloop_new (void)
315 {
316 zloop_t *self = (zloop_t *) zmalloc (sizeof (zloop_t));
317 assert (self);
318
319 self->readers = zlistx_new ();
320 assert (self->readers);
321
322 self->pollers = zlistx_new ();
323 assert (self->pollers);
324
325 self->timers = zlistx_new ();
326 assert (self->timers);
327
328 self->zombies = zlistx_new ();
329 assert (self->zombies);
330
331 self->tickets = zlistx_new ();
332 assert (self->tickets);
333 self->last_timer_id = 0;
334
335 zlistx_set_destructor (self->readers, (czmq_destructor *) s_reader_destroy);
336 zlistx_set_destructor (self->pollers, (czmq_destructor *) s_poller_destroy);
337 zlistx_set_destructor (self->timers, (czmq_destructor *) s_timer_destroy);
338 zlistx_set_comparator (self->timers, (czmq_comparator *) s_timer_comparator);
339 zlistx_set_destructor (self->tickets, (czmq_destructor *) s_ticket_destroy);
340 zlistx_set_comparator (self->tickets, (czmq_comparator *) s_ticket_comparator);
341
342 return self;
343 }
344
345
346 // --------------------------------------------------------------------------
347 // Destructor
348
349 void
zloop_destroy(zloop_t ** self_p)350 zloop_destroy (zloop_t **self_p)
351 {
352 assert (self_p);
353 if (*self_p) {
354 zloop_t *self = *self_p;
355
356 // If we never started the loop, yet manipulated timers, we'll have
357 // a zombie list
358 while (zlistx_first (self->zombies)) {
359 // Get timer_id back from pointer
360 ptrdiff_t timer_id = (byte *) zlistx_detach (self->zombies, NULL) - (byte *) NULL;
361 s_timer_remove (self, (int) timer_id);
362 }
363 zlistx_destroy (&self->zombies);
364 zlistx_destroy (&self->readers);
365 zlistx_destroy (&self->pollers);
366 zlistx_destroy (&self->timers);
367 zlistx_destroy (&self->tickets);
368 freen (self->pollset);
369 freen (self->readact);
370 freen (self->pollact);
371 freen (self);
372 *self_p = NULL;
373 }
374 }
375
376
377 // --------------------------------------------------------------------------
378 // Register socket reader with the reactor. When the reader has messages,
379 // the reactor will call the handler, passing the arg. Returns 0 if OK, -1
380 // if there was an error. If you register the same socket more than once,
381 // each instance will invoke its corresponding handler.
382
383 int
zloop_reader(zloop_t * self,zsock_t * sock,zloop_reader_fn handler,void * arg)384 zloop_reader (zloop_t *self, zsock_t *sock, zloop_reader_fn handler, void *arg)
385 {
386 assert (self);
387 assert (sock);
388
389 s_reader_t *reader = s_reader_new (sock, handler, arg);
390 if (reader) {
391 reader->list_handle = zlistx_add_end (self->readers, reader);
392 assert (reader->list_handle);
393 self->need_rebuild = true;
394 if (self->verbose)
395 zsys_debug ("zloop: register %s reader", zsock_type_str (sock));
396 return 0;
397 }
398 else
399 return -1;
400 }
401
402
403 // --------------------------------------------------------------------------
404 // Cancel a socket reader from the reactor. If multiple readers exist for
405 // same socket, cancels ALL of them.
406
407 void
zloop_reader_end(zloop_t * self,zsock_t * sock)408 zloop_reader_end (zloop_t *self, zsock_t *sock)
409 {
410 assert (self);
411 assert (sock);
412
413 s_reader_t *reader = (s_reader_t *) zlistx_first (self->readers);
414 while (reader) {
415 if (reader->sock == sock) {
416 zlistx_delete (self->readers, reader->list_handle);
417 self->need_rebuild = true;
418 }
419 reader = (s_reader_t *) zlistx_next (self->readers);
420 }
421 if (self->verbose)
422 zsys_debug ("zloop: cancel %s reader", zsock_type_str (sock));
423 }
424
425
426 // --------------------------------------------------------------------------
427 // Configure a registered reader to ignore errors. If you do not set this,
428 // then reader that have errors are removed from the reactor silently.
429
430 void
zloop_reader_set_tolerant(zloop_t * self,zsock_t * sock)431 zloop_reader_set_tolerant (zloop_t *self, zsock_t *sock)
432 {
433 assert (self);
434 assert (sock);
435
436 s_reader_t *reader = (s_reader_t *) zlistx_first (self->readers);
437 while (reader) {
438 if (reader->sock == sock)
439 reader->tolerant = true;
440 reader = (s_reader_t *) zlistx_next (self->readers);
441 }
442 }
443
444
445 // --------------------------------------------------------------------------
446 // Register low-level libzmq pollitem with the reactor. When the pollitem
447 // is ready, will call the handler, passing the arg. Returns 0 if OK, -1
448 // if there was an error. If you register the pollitem more than once, each
449 // instance will invoke its corresponding handler. A pollitem with
450 // socket=NULL and fd=0 means 'poll on FD zero'.
451
452 int
zloop_poller(zloop_t * self,zmq_pollitem_t * item,zloop_fn handler,void * arg)453 zloop_poller (zloop_t *self, zmq_pollitem_t *item, zloop_fn handler, void *arg)
454 {
455 assert (self);
456
457 if (item->socket
458 && streq (zsys_sockname (zsock_type (item->socket)), "UNKNOWN"))
459 return -1;
460
461 s_poller_t *poller = s_poller_new (item, handler, arg);
462 assert (poller);
463
464 poller->list_handle = zlistx_add_end (self->pollers, poller);
465 assert (poller->list_handle);
466 self->need_rebuild = true;
467 if (self->verbose)
468 zsys_debug ("zloop: register %s poller (%p, %d)",
469 item->socket? zsys_sockname (zsock_type (item->socket)): "FD",
470 item->socket, item->fd);
471 return 0;
472 }
473
474
475 // --------------------------------------------------------------------------
476 // Cancel a pollitem from the reactor, specified by socket or FD. If both
477 // are specified, uses only socket. If multiple poll items exist for same
478 // socket/FD, cancels ALL of them.
479
480 void
zloop_poller_end(zloop_t * self,zmq_pollitem_t * item)481 zloop_poller_end (zloop_t *self, zmq_pollitem_t *item)
482 {
483 assert (self);
484
485 s_poller_t *poller = (s_poller_t *) zlistx_first (self->pollers);
486 while (poller) {
487 bool match = false;
488 if (item->socket) {
489 if (item->socket == poller->item.socket)
490 match = true;
491 }
492 else {
493 if (item->fd == poller->item.fd)
494 match = true;
495 }
496 if (match) {
497 zlistx_delete (self->pollers, poller->list_handle);
498 // Force rebuild to avoid reading from freed poller
499 self->need_rebuild = true;
500 }
501 poller = (s_poller_t *) zlistx_next (self->pollers);
502 }
503 if (self->verbose)
504 zsys_debug ("zloop: cancel %s poller (%p, %d)",
505 item->socket? zsys_sockname (zsock_type (item->socket)): "FD",
506 item->socket, item->fd);
507 }
508
509
510 // --------------------------------------------------------------------------
511 // Configure a registered poller to ignore errors. If you do not set this,
512 // then poller that have errors are removed from the reactor silently.
513
514 void
zloop_poller_set_tolerant(zloop_t * self,zmq_pollitem_t * item)515 zloop_poller_set_tolerant (zloop_t *self, zmq_pollitem_t *item)
516 {
517 assert (self);
518
519 // Find matching poller(s) and mark as tolerant
520 s_poller_t *poller = (s_poller_t *) zlistx_first (self->pollers);
521 while (poller) {
522 bool match = false;
523 if (item->socket) {
524 if (item->socket == poller->item.socket)
525 match = true;
526 }
527 else {
528 if (item->fd == poller->item.fd)
529 match = true;
530 }
531 if (match)
532 poller->tolerant = true;
533
534 poller = (s_poller_t *) zlistx_next (self->pollers);
535 }
536 }
537
538
539 // --------------------------------------------------------------------------
540 // Register a timer that expires after some delay and repeats some number of
541 // times. At each expiry, will call the handler, passing the arg. To run a
542 // timer forever, use 0 times. Returns a timer_id that is used to cancel the
543 // timer in the future. Returns -1 if there was an error.
544
545 int
zloop_timer(zloop_t * self,size_t delay,size_t times,zloop_timer_fn handler,void * arg)546 zloop_timer (zloop_t *self, size_t delay, size_t times, zloop_timer_fn handler, void *arg)
547 {
548 assert (self);
549 // Catch excessive use of timers
550 if (self->max_timers && zlistx_size (self->timers) == self->max_timers) {
551 zsys_error ("zloop: timer limit reached (max=%d)", self->max_timers);
552 return -1;
553 }
554 int timer_id = s_next_timer_id (self);
555 s_timer_t *timer = s_timer_new (timer_id, delay, times, handler, arg);
556 if (timer) {
557 timer->list_handle = zlistx_add_end (self->timers, timer);
558 assert (timer->list_handle);
559 if (self->verbose)
560 zsys_debug ("zloop: register timer id=%d delay=%d times=%d",
561 timer_id, (int) delay, (int) times);
562 return timer_id;
563 }
564 else
565 return -1;
566 }
567
568
569 // --------------------------------------------------------------------------
570 // Cancel a timer by timer id (as returned by zloop_timer()).
571 // Returns 0 on success.
572
573 int
zloop_timer_end(zloop_t * self,int timer_id)574 zloop_timer_end (zloop_t *self, int timer_id)
575 {
576 assert (self);
577
578 if (self->terminated)
579 s_timer_remove (self, timer_id);
580 else
581 // We cannot touch self->timers because we may be executing that
582 // from inside the poll loop. So, we hold the arg on the zombie
583 // list, and process that list when we're done executing timers.
584 // This hack lets us store an integer timer ID as a pointer
585 zlistx_add_end (self->zombies, (byte *) NULL + timer_id);
586
587 if (self->verbose)
588 zsys_debug ("zloop: cancel timer id=%d", timer_id);
589
590 return 0;
591 }
592
593
594 // --------------------------------------------------------------------------
595 // Register a ticket timer. Ticket timers are very fast in the case where
596 // you use a lot of timers (thousands), and frequently remove and add them.
597 // The main use case is expiry timers for servers that handle many clients,
598 // and which reset the expiry timer for each message received from a client.
599 // Whereas normal timers perform poorly as the number of clients grows, the
600 // cost of ticket timers is constant, no matter the number of clients. You
601 // must set the ticket delay using zloop_set_ticket_delay before creating a
602 // ticket. Returns a handle to the timer that you should use in
603 // zloop_ticket_reset and zloop_ticket_delete.
604
605 void *
zloop_ticket(zloop_t * self,zloop_timer_fn handler,void * arg)606 zloop_ticket (zloop_t *self, zloop_timer_fn handler, void *arg)
607 {
608 assert (self);
609 assert (self->ticket_delay > 0);
610 s_ticket_t *ticket = s_ticket_new (self->ticket_delay, handler, arg);
611 if (ticket) {
612 ticket->list_handle = zlistx_add_end (self->tickets, ticket);
613 assert (ticket->list_handle);
614 }
615 return ticket;
616 }
617
618
619 // --------------------------------------------------------------------------
620 // Reset a ticket timer, which moves it to the end of the ticket list and
621 // resets its execution time. This is a very fast operation.
622
623 void
zloop_ticket_reset(zloop_t * self,void * handle)624 zloop_ticket_reset (zloop_t *self, void *handle)
625 {
626 s_ticket_t *ticket = (s_ticket_t *) handle;
627 assert (ticket->tag == TICKET_TAG);
628 ticket->when = zclock_mono () + ticket->delay;
629 zlistx_move_end (self->tickets, ticket->list_handle);
630 }
631
632
633 // --------------------------------------------------------------------------
634 // Delete a ticket timer. We do not actually delete the ticket here, as
635 // other code may still refer to the ticket. We mark as deleted, and remove
636 // later and safely.
637
638 void
zloop_ticket_delete(zloop_t * self,void * handle)639 zloop_ticket_delete (zloop_t *self, void *handle)
640 {
641 s_ticket_t *ticket = (s_ticket_t *) handle;
642 assert (ticket->tag == TICKET_TAG);
643 ticket->deleted = true;
644 // Move deleted tickets to end of list for fast cleanup
645 zlistx_move_end (self->tickets, ticket->list_handle);
646 }
647
648
649 // --------------------------------------------------------------------------
650 // Set the ticket delay, which applies to all tickets. If you lower the
651 // delay and there are already tickets created, the results are undefined.
652
653 void
zloop_set_ticket_delay(zloop_t * self,size_t ticket_delay)654 zloop_set_ticket_delay (zloop_t *self, size_t ticket_delay)
655 {
656 assert (self);
657 self->ticket_delay = ticket_delay;
658 }
659
660
661 // --------------------------------------------------------------------------
662 // Set verbose tracing of reactor on/off
663
664 void
zloop_set_verbose(zloop_t * self,bool verbose)665 zloop_set_verbose (zloop_t *self, bool verbose)
666 {
667 assert (self);
668 self->verbose = verbose;
669 }
670
671 // --------------------------------------------------------------------------
672 // By default the reactor stops if the process receives a SIGINT or SIGTERM
673 // signal. This makes it impossible to shut-down message based architectures
674 // like zactors. This method lets you switch off break handling. The default
675 // nonstop setting is off (false).
676
677 void
zloop_set_nonstop(zloop_t * self,bool nonstop)678 zloop_set_nonstop (zloop_t *self, bool nonstop)
679 {
680 assert (self);
681 self->nonstop = nonstop;
682 }
683
684
685 // --------------------------------------------------------------------------
686 // Set hard limit on number of timers allowed. Setting more than a small
687 // number of timers (10-100) can have a dramatic impact on the performance
688 // of the reactor. For high-volume cases, use ticket timers. If the hard
689 // limit is reached, the reactor stops creating new timers and logs an
690 // error.
691
692 void
zloop_set_max_timers(zloop_t * self,size_t max_timers)693 zloop_set_max_timers (zloop_t *self, size_t max_timers)
694 {
695 assert (self);
696 self->max_timers = max_timers;
697 }
698
699
700 // --------------------------------------------------------------------------
701 // Start the reactor. Takes control of the thread and returns when the 0MQ
702 // context is terminated or the process is interrupted, or any event handler
703 // returns -1. Event handlers may register new sockets and timers, and
704 // cancel sockets. Returns 0 if interrupted, -1 if canceled by a handler,
705 // positive on internal error
706
707 int
zloop_start(zloop_t * self)708 zloop_start (zloop_t *self)
709 {
710 assert (self);
711 int rc = 0;
712
713 // Main reactor loop
714 while (!zsys_interrupted || self->nonstop) {
715 if (rc == -1) // somebody wanted us to quit
716 break;
717 if (self->need_rebuild) {
718 // If s_rebuild_pollset() fails, break out of the loop and
719 // return its error
720 rc = s_rebuild_pollset (self);
721 if (rc)
722 break;
723 }
724 rc = zmq_poll (self->pollset, (int) self->poll_size, s_tickless (self));
725 if (rc == -1 || (zsys_interrupted && !self->nonstop)) {
726 if (self->verbose) {
727 if (rc == -1)
728 zsys_debug ("zloop: interrupted: %s", strerror (errno));
729 else
730 zsys_debug ("zloop: zsys_interrupted");
731 }
732 rc = 0;
733 break; // Context has been shut down
734 }
735
736 // Handle any timers that have now expired
737 int64_t time_now = zclock_mono ();
738 s_timer_t *timer = (s_timer_t *) zlistx_first (self->timers);
739 while (timer) {
740 if (time_now >= timer->when) {
741 if (self->verbose)
742 zsys_debug ("zloop: call timer handler id=%d", timer->timer_id);
743 rc = timer->handler (self, timer->timer_id, timer->arg);
744 if (timer->times && --timer->times == 0)
745 zlistx_delete (self->timers, timer->list_handle);
746 else
747 timer->when += timer->delay;
748 if (rc == -1)
749 break; // Timer handler signaled break
750 }
751 timer = (s_timer_t *) zlistx_next (self->timers);
752 }
753
754 // Handle any tickets that have now expired
755 s_ticket_t *ticket = (s_ticket_t *) zlistx_first (self->tickets);
756 while (ticket && time_now >= ticket->when) {
757 if (self->verbose)
758 zsys_debug ("zloop: call ticket handler");
759 if (!ticket->deleted
760 && ticket->handler (self, 0, ticket->arg) == -1) {
761 rc = -1; // Trigger exit from zloop_start
762 break; // Ticket handler signaled break
763 }
764 zlistx_delete (self->tickets, ticket->list_handle);
765 ticket = (s_ticket_t *) zlistx_next (self->tickets);
766 }
767
768 // Handle any tickets that were flagged for deletion
769 ticket = (s_ticket_t *) zlistx_last (self->tickets);
770 while (ticket && ticket->deleted) {
771 zlistx_delete (self->tickets, ticket->list_handle);
772 ticket = (s_ticket_t *) zlistx_last (self->tickets);
773 }
774
775 // Check if timers changed pollset
776 if (self->need_rebuild)
777 continue;
778
779 // Handle any readers and pollers that are ready
780 size_t item_nbr;
781 for (item_nbr = 0; item_nbr < self->poll_size && rc >= 0; item_nbr++) {
782 s_reader_t *reader = &self->readact [item_nbr];
783 if (reader->handler) {
784 if ((self->pollset [item_nbr].revents & ZMQ_POLLERR)
785 && !reader->tolerant) {
786 if (self->verbose)
787 zsys_warning ("zloop: can't read %s socket: %s",
788 zsock_type_str (reader->sock),
789 zmq_strerror (zmq_errno ()));
790 // Give handler one chance to handle error, then kill
791 // reader because it'll disrupt the reactor otherwise.
792 if (reader->errors++) {
793 zloop_reader_end (self, reader->sock);
794 self->pollset [item_nbr].revents = 0;
795 }
796 }
797 else
798 reader->errors = 0; // A non-error happened
799
800 if (self->pollset [item_nbr].revents) {
801 if (self->verbose)
802 zsys_debug ("zloop: call %s socket handler",
803 zsock_type_str (reader->sock));
804 rc = reader->handler (self, reader->sock, reader->arg);
805 if (rc == -1 || self->need_rebuild)
806 break;
807 }
808 }
809 else {
810 s_poller_t *poller = &self->pollact [item_nbr];
811 assert (self->pollset [item_nbr].socket == poller->item.socket);
812
813 if ((self->pollset [item_nbr].revents & ZMQ_POLLERR)
814 && !poller->tolerant) {
815 if (self->verbose)
816 zsys_warning ("zloop: can't poll %s socket (%p, %d): %s",
817 poller->item.socket?
818 zsys_sockname (zsock_type (poller->item.socket)): "FD",
819 poller->item.socket, poller->item.fd,
820 zmq_strerror (zmq_errno ()));
821 // Give handler one chance to handle error, then kill
822 // poller because it'll disrupt the reactor otherwise.
823 if (poller->errors++) {
824 zloop_poller_end (self, &poller->item);
825 self->pollset [item_nbr].revents = 0;
826 }
827 }
828 else
829 poller->errors = 0; // A non-error happened
830
831 if (self->pollset [item_nbr].revents) {
832 if (self->verbose)
833 zsys_debug ("zloop: call %s socket handler (%p, %d)",
834 poller->item.socket?
835 zsys_sockname (zsock_type (poller->item.socket)): "FD",
836 poller->item.socket, poller->item.fd);
837 rc = poller->handler (self, &self->pollset [item_nbr], poller->arg);
838 if (rc == -1 || self->need_rebuild)
839 break;
840 }
841 }
842 }
843 // Now handle any timer zombies
844 // This is going to be slow if we have many timers; we might use
845 // a faster lookup on the timer list.
846 while (zlistx_first (self->zombies)) {
847 // Get timer_id back from pointer
848 ptrdiff_t timer_id = (byte *) zlistx_detach (self->zombies, NULL) - (byte *) NULL;
849 s_timer_remove (self, (int) timer_id);
850 }
851 }
852 self->terminated = true;
853 return rc;
854 }
855
856
857 // --------------------------------------------------------------------------
858 // Selftest
859
860 static int
s_cancel_timer_event(zloop_t * loop,int timer_id,void * arg)861 s_cancel_timer_event (zloop_t *loop, int timer_id, void *arg)
862 {
863 // We are handling timer 2, and will cancel timer 1
864 int cancel_timer_id = *((int *) arg);
865 return zloop_timer_end (loop, cancel_timer_id);
866 }
867
868
869 static int
s_timer_event(zloop_t * loop,int timer_id,void * output)870 s_timer_event (zloop_t *loop, int timer_id, void *output)
871 {
872 zstr_send (output, "PING");
873 return 0;
874 }
875
876 static int
s_socket_event(zloop_t * loop,zsock_t * handle,void * arg)877 s_socket_event (zloop_t *loop, zsock_t *handle, void *arg)
878 {
879 // Just end the reactor
880 return -1;
881 }
882
883 static int
s_timer_event3(zloop_t * loop,int timer_id,void * called)884 s_timer_event3 (zloop_t *loop, int timer_id, void *called)
885 {
886 *((bool*) called) = true;
887 // end the reactor
888 return -1;
889 }
890
891 static int
s_socket_event1(zloop_t * loop,zsock_t * reader,void * called)892 s_socket_event1 (zloop_t *loop, zsock_t *reader, void *called)
893 {
894 *((bool*) called) = true;
895 // end the reactor
896 return -1;
897 }
898
899 static int
s_timer_event4(zloop_t * loop,int timer_id,void * arg)900 s_timer_event4 (zloop_t *loop, int timer_id, void *arg)
901 {
902 // Just end the looper
903 return -1;
904 }
905
906 static int
s_timer_event5(zloop_t * loop,int timer_id,void * arg)907 s_timer_event5 (zloop_t *loop, int timer_id, void *arg)
908 {
909 // remove reader from loop
910 zloop_reader_end(loop, (zsock_t *) arg);
911
912 // end reactor on next run
913 zloop_timer(loop, 1, 1, s_timer_event4, NULL);
914
915 return 0;
916 }
917
918 void
zloop_test(bool verbose)919 zloop_test (bool verbose)
920 {
921 printf (" * zloop: ");
922 int rc = 0;
923 // @selftest
924 // Create two PAIR sockets and connect over inproc
925 zsock_t *output = zsock_new (ZMQ_PAIR);
926 assert (output);
927 zsock_bind (output, "inproc://zloop.test");
928
929 zsock_t *input = zsock_new (ZMQ_PAIR);
930 assert (input);
931 zsock_connect (input, "inproc://zloop.test");
932
933 zloop_t *loop = zloop_new ();
934 assert (loop);
935 zloop_set_verbose (loop, verbose);
936
937 // Create a timer that will be cancelled
938 int timer_id = zloop_timer (loop, 1000, 1, s_timer_event, NULL);
939 zloop_timer (loop, 5, 1, s_cancel_timer_event, &timer_id);
940
941 // After 20 msecs, send a ping message to output3
942 zloop_timer (loop, 20, 1, s_timer_event, output);
943
944 // Set up some tickets that will never expire
945 zloop_set_ticket_delay (loop, 10000);
946 void *ticket1 = zloop_ticket (loop, s_timer_event, NULL);
947 void *ticket2 = zloop_ticket (loop, s_timer_event, NULL);
948 void *ticket3 = zloop_ticket (loop, s_timer_event, NULL);
949
950 // When we get the ping message, end the reactor
951 rc = zloop_reader (loop, input, s_socket_event, NULL);
952 assert (rc == 0);
953 zloop_reader_set_tolerant (loop, input);
954 zloop_start (loop);
955
956 zloop_ticket_delete (loop, ticket1);
957 zloop_ticket_delete (loop, ticket2);
958 zloop_ticket_delete (loop, ticket3);
959
960 // Check whether loop properly ignores zsys_interrupted flag
961 // when asked to
962 zloop_destroy (&loop);
963 loop = zloop_new ();
964
965 bool timer_event_called = false;
966 zloop_timer (loop, 1, 1, s_timer_event3, &timer_event_called);
967
968 zsys_interrupted = 1;
969 zloop_start (loop);
970 // zloop returns immediately without giving any handler a chance to run
971 assert (!timer_event_called);
972
973 zloop_set_nonstop (loop, true);
974 zloop_start (loop);
975 // zloop runs the handler which will terminate the loop
976 assert (timer_event_called);
977 zsys_interrupted = 0;
978
979 // Check if reader removed in timer is not called
980 zloop_destroy (&loop);
981 loop = zloop_new ();
982
983 bool socket_event_called = false;
984 zloop_reader (loop, output, s_socket_event1, &socket_event_called);
985 zloop_timer (loop, 0, 1, s_timer_event5, output);
986
987 zstr_send (input, "PING");
988
989 zloop_start (loop);
990 assert (!socket_event_called);
991
992 // cleanup
993 zloop_destroy (&loop);
994 assert (loop == NULL);
995
996 zsock_destroy (&input);
997 zsock_destroy (&output);
998
999 #if defined (__WINDOWS__)
1000 zsys_shutdown();
1001 #endif
1002 // @end
1003 printf ("OK\n");
1004 }
1005