1 /*  =========================================================================
2     zloop - event-driven reactor
3 
4     Copyright (c) the Contributors as noted in the AUTHORS file.
5     This file is part of CZMQ, the high-level C binding for 0MQ:
6     http://czmq.zeromq.org.
7 
8     This Source Code Form is subject to the terms of the Mozilla Public
9     License, v. 2.0. If a copy of the MPL was not distributed with this
10     file, You can obtain one at http://mozilla.org/MPL/2.0/.
11     =========================================================================
12 */
13 
14 /*
15 @header
16     The zloop class provides an event-driven reactor pattern. The reactor
17     handles zmq_pollitem_t items (pollers or writers, sockets or fds), and
18     once-off or repeated timers. Its resolution is 1 msec. It uses a tickless
19     timer to reduce CPU interrupts in inactive processes.
20 @discuss
21 @end
22 */
23 
24 #include "czmq_classes.h"
25 
26 typedef struct _s_reader_t s_reader_t;
27 typedef struct _s_poller_t s_poller_t;
28 typedef struct _s_timer_t s_timer_t;
29 typedef struct _s_ticket_t s_ticket_t;
30 
31 //  Structure of our class
32 
33 struct _zloop_t {
34     zlistx_t *readers;          //  List of socket readers
35     zlistx_t *pollers;          //  List of poll items
36     zlistx_t *timers;           //  List of timers
37     zlistx_t *tickets;          //  List of tickets
38     int last_timer_id;          //  Most recent timer id
39     size_t max_timers;          //  Limit on number of timers
40     size_t ticket_delay;        //  Ticket delay value
41     size_t poll_size;           //  Size of poll set
42     zmq_pollitem_t *pollset;    //  zmq_poll set
43     s_reader_t *readact;        //  Readers for this poll set
44     s_poller_t *pollact;        //  Pollers for this poll set
45     bool need_rebuild;          //  True if pollset needs rebuilding
46     bool verbose;               //  True if verbose tracing wanted
47     bool terminated;            //  True when stopped running
48     bool nonstop;               //  Don't stop running on Ctrl-C
49     zlistx_t *zombies;          //  List of timers to kill
50 };
51 
52 //  Reactor elements are held as structures of their own
53 
54 struct _s_reader_t {
55     void *list_handle;          //  Handle into list
56     zsock_t *sock;              //  Socket to read from
57     zloop_reader_fn *handler;   //  Function to execute
58     void *arg;                  //  Application argument to poll item
59     int errors;                 //  If too many errors, kill reader
60     bool tolerant;              //  Unless configured as tolerant
61 };
62 
63 struct _s_poller_t {
64     void *list_handle;          //  Handle into list
65     zmq_pollitem_t item;        //  ZeroMQ socket or file descriptor
66     zloop_fn *handler;          //  Function to execute
67     void *arg;                  //  Application argument to poll item
68     int errors;                 //  If too many errors, kill poller
69     bool tolerant;              //  Unless configured as tolerant
70 };
71 
72 struct _s_timer_t {
73     void *list_handle;          //  Handle into list
74     int timer_id;               //  Unique timer id, used to cancel timer
75     zloop_timer_fn *handler;    //  Function to execute
76     size_t delay;               //  Delay (ms) between executing
77     size_t times;               //  Number of times to repeat, 0 for forever
78     void *arg;                  //  Application argument to timer
79     int64_t when;               //  Clock time when alarm goes off
80 };
81 
82 //  As we pass void * to/from the caller for working with tickets, we
83 //  check validity using an object tag. This value is unique in CZMQ.
84 #define TICKET_TAG              0xcafe0007
85 
86 struct _s_ticket_t {
87     uint32_t tag;               //  Object tag for runtime detection
88     void *list_handle;          //  Handle into list
89     size_t delay;               //  Delay (ms) before executing
90     int64_t when;               //  Clock time to invoke the ticket
91     zloop_timer_fn *handler;    //  Function to execute (use timer fn)
92     void *arg;                  //  Application argument to function
93     bool deleted;               //  Flag as deleted (to clean up later)
94 };
95 
96 static int
s_next_timer_id(zloop_t * self)97 s_next_timer_id (zloop_t *self)
98 {
99     return ++self->last_timer_id;
100 }
101 
102 static s_reader_t *
s_reader_new(zsock_t * sock,zloop_reader_fn handler,void * arg)103 s_reader_new (zsock_t *sock, zloop_reader_fn handler, void *arg)
104 {
105     s_reader_t *self = (s_reader_t *) zmalloc (sizeof (s_reader_t));
106     assert (self);
107     self->sock = sock;
108     self->handler = handler;
109     self->arg = arg;
110     self->tolerant = false;     //  By default, errors are bad
111     return self;
112 }
113 
114 static void
s_reader_destroy(s_reader_t ** self_p)115 s_reader_destroy (s_reader_t **self_p)
116 {
117     assert (self_p);
118     s_reader_t *self = *self_p;
119     if (self) {
120         freen (self);
121         *self_p = NULL;
122     }
123 }
124 
125 static s_poller_t *
s_poller_new(zmq_pollitem_t * item,zloop_fn handler,void * arg)126 s_poller_new (zmq_pollitem_t *item, zloop_fn handler, void *arg)
127 {
128     s_poller_t *self = (s_poller_t *) zmalloc (sizeof (s_poller_t));
129     assert (self);
130     self->item = *item;
131     self->handler = handler;
132     self->arg = arg;
133     self->tolerant = false;     //  By default, errors are bad
134     return self;
135 }
136 
137 static void
s_poller_destroy(s_poller_t ** self_p)138 s_poller_destroy (s_poller_t **self_p)
139 {
140     assert (self_p);
141     s_poller_t *self = *self_p;
142     if (self) {
143         freen (self);
144         *self_p = NULL;
145     }
146 }
147 
148 
149 static s_timer_t *
s_timer_new(int timer_id,size_t delay,size_t times,zloop_timer_fn handler,void * arg)150 s_timer_new (int timer_id, size_t delay, size_t times, zloop_timer_fn handler, void *arg)
151 {
152     s_timer_t *self = (s_timer_t *) zmalloc (sizeof (s_timer_t));
153     assert (self);
154     self->timer_id = timer_id;
155     self->delay = delay;
156     self->times = times;
157     self->when = zclock_mono () + delay;
158     self->handler = handler;
159     self->arg = arg;
160     return self;
161 }
162 
163 static void
s_timer_destroy(s_timer_t ** self_p)164 s_timer_destroy (s_timer_t **self_p)
165 {
166     assert (self_p);
167     s_timer_t *self = *self_p;
168     if (self) {
169         freen (self);
170         *self_p = NULL;
171     }
172 }
173 
174 static int
s_timer_comparator(s_timer_t * first,s_timer_t * second)175 s_timer_comparator (s_timer_t *first, s_timer_t *second)
176 {
177     if (first->when > second->when)
178         return 1;
179     else
180     if (first->when < second->when)
181         return -1;
182     else
183         return 0;
184 }
185 
186 static s_ticket_t *
s_ticket_new(size_t delay,zloop_timer_fn handler,void * arg)187 s_ticket_new (size_t delay, zloop_timer_fn handler, void *arg)
188 {
189     s_ticket_t *self = (s_ticket_t *) zmalloc (sizeof (s_ticket_t));
190     assert (self);
191     self->tag = TICKET_TAG;
192     self->delay = delay;
193     self->when = zclock_mono () + delay;
194     self->handler = handler;
195     self->arg = arg;
196     return self;
197 }
198 
199 static void
s_ticket_destroy(s_ticket_t ** self_p)200 s_ticket_destroy (s_ticket_t **self_p)
201 {
202     assert (self_p);
203     s_ticket_t *self = *self_p;
204     if (self) {
205         self->tag = 0xDeadBeef;
206         freen (self);
207         *self_p = NULL;
208     }
209 }
210 
211 static int
s_ticket_comparator(s_ticket_t * first,s_ticket_t * second)212 s_ticket_comparator (s_ticket_t *first, s_ticket_t *second)
213 {
214     if (first->when > second->when)
215         return 1;
216     else
217     if (first->when < second->when)
218         return -1;
219     else
220         return 0;
221 }
222 
223 //  Remove timer with specified id, if it exists
224 
225 static void
s_timer_remove(zloop_t * self,int timer_id)226 s_timer_remove (zloop_t *self, int timer_id)
227 {
228     s_timer_t *timer = (s_timer_t *) zlistx_first (self->timers);
229     while (timer) {
230         if (timer->timer_id == timer_id) {
231             zlistx_delete (self->timers, timer->list_handle);
232             break;
233         }
234         timer = (s_timer_t *) zlistx_next (self->timers);
235     }
236 }
237 
238 
239 //  We hold an array of pollers that matches the pollset, so we can
240 //  register/cancel pollers orthogonally to executing the pollset
241 //  activity on pollers. Returns 0 on success, -1 on failure.
242 
243 static int
s_rebuild_pollset(zloop_t * self)244 s_rebuild_pollset (zloop_t *self)
245 {
246     self->poll_size = zlistx_size (self->readers) + zlistx_size (self->pollers);
247 
248     freen (self->pollset);
249     self->pollset = (zmq_pollitem_t *) zmalloc (self->poll_size * sizeof (zmq_pollitem_t));
250     assert (self->pollset);
251 
252     freen (self->readact);
253     self->readact = (s_reader_t *) zmalloc (self->poll_size * sizeof (s_reader_t));
254     assert (self->readact);
255 
256     freen (self->pollact);
257     self->pollact = (s_poller_t *) zmalloc (self->poll_size * sizeof (s_poller_t));
258     assert (self->pollact);
259 
260     s_reader_t *reader = (s_reader_t *) zlistx_first (self->readers);
261     uint item_nbr = 0;
262     while (reader) {
263         zmq_pollitem_t poll_item = { zsock_resolve (reader->sock), 0, ZMQ_POLLIN, 0 };
264         self->pollset [item_nbr] = poll_item;
265         self->readact [item_nbr] = *reader;
266         item_nbr++;
267         reader = (s_reader_t *) zlistx_next (self->readers);
268     }
269     s_poller_t *poller = (s_poller_t *) zlistx_first (self->pollers);
270     while (poller) {
271         self->pollset [item_nbr] = poller->item;
272         self->pollact [item_nbr] = *poller;
273         item_nbr++;
274         poller = (s_poller_t *) zlistx_next (self->pollers);
275     }
276     self->need_rebuild = false;
277     return 0;
278 }
279 
280 static long
s_tickless(zloop_t * self)281 s_tickless (zloop_t *self)
282 {
283     //  Calculate tickless timer, up to 1 hour
284     int64_t tickless = zclock_mono () + 1000 * 3600;
285 
286     //  Scan timers, which are not sorted
287     //  TODO: sort timers properly on insertion
288     s_timer_t *timer = (s_timer_t *) zlistx_first (self->timers);
289     while (timer) {
290         //  Find earliest timer
291         if (tickless > timer->when)
292             tickless = timer->when;
293         timer = (s_timer_t *) zlistx_next (self->timers);
294     }
295     //  Tickets are sorted, so check first ticket
296     s_ticket_t *ticket = (s_ticket_t *) zlistx_first (self->tickets);
297     if (ticket && tickless > ticket->when)
298         tickless = ticket->when;
299 
300     long timeout = (long) (tickless - zclock_mono ());
301     if (timeout < 0)
302         timeout = 0;
303     if (self->verbose)
304         zsys_debug ("zloop polling for %d msec", (int) timeout);
305 
306     return timeout * ZMQ_POLL_MSEC;
307 }
308 
309 
310 //  --------------------------------------------------------------------------
311 //  Constructor
312 
313 zloop_t *
zloop_new(void)314 zloop_new (void)
315 {
316     zloop_t *self = (zloop_t *) zmalloc (sizeof (zloop_t));
317     assert (self);
318 
319     self->readers = zlistx_new ();
320     assert (self->readers);
321 
322     self->pollers = zlistx_new ();
323     assert (self->pollers);
324 
325     self->timers = zlistx_new ();
326     assert (self->timers);
327 
328     self->zombies = zlistx_new ();
329     assert (self->zombies);
330 
331     self->tickets = zlistx_new ();
332     assert (self->tickets);
333     self->last_timer_id = 0;
334 
335     zlistx_set_destructor (self->readers, (czmq_destructor *) s_reader_destroy);
336     zlistx_set_destructor (self->pollers, (czmq_destructor *) s_poller_destroy);
337     zlistx_set_destructor (self->timers, (czmq_destructor *) s_timer_destroy);
338     zlistx_set_comparator (self->timers, (czmq_comparator *) s_timer_comparator);
339     zlistx_set_destructor (self->tickets, (czmq_destructor *) s_ticket_destroy);
340     zlistx_set_comparator (self->tickets, (czmq_comparator *) s_ticket_comparator);
341 
342     return self;
343 }
344 
345 
346 //  --------------------------------------------------------------------------
347 //  Destructor
348 
349 void
zloop_destroy(zloop_t ** self_p)350 zloop_destroy (zloop_t **self_p)
351 {
352     assert (self_p);
353     if (*self_p) {
354         zloop_t *self = *self_p;
355 
356         //  If we never started the loop, yet manipulated timers, we'll have
357         //  a zombie list
358         while (zlistx_first (self->zombies)) {
359             //  Get timer_id back from pointer
360             ptrdiff_t timer_id = (byte *) zlistx_detach (self->zombies, NULL) - (byte *) NULL;
361             s_timer_remove (self, (int) timer_id);
362         }
363         zlistx_destroy (&self->zombies);
364         zlistx_destroy (&self->readers);
365         zlistx_destroy (&self->pollers);
366         zlistx_destroy (&self->timers);
367         zlistx_destroy (&self->tickets);
368         freen (self->pollset);
369         freen (self->readact);
370         freen (self->pollact);
371         freen (self);
372         *self_p = NULL;
373     }
374 }
375 
376 
377 //  --------------------------------------------------------------------------
378 //  Register socket reader with the reactor. When the reader has messages,
379 //  the reactor will call the handler, passing the arg. Returns 0 if OK, -1
380 //  if there was an error. If you register the same socket more than once,
381 //  each instance will invoke its corresponding handler.
382 
383 int
zloop_reader(zloop_t * self,zsock_t * sock,zloop_reader_fn handler,void * arg)384 zloop_reader (zloop_t *self, zsock_t *sock, zloop_reader_fn handler, void *arg)
385 {
386     assert (self);
387     assert (sock);
388 
389     s_reader_t *reader = s_reader_new (sock, handler, arg);
390     if (reader) {
391         reader->list_handle = zlistx_add_end (self->readers, reader);
392         assert (reader->list_handle);
393         self->need_rebuild = true;
394         if (self->verbose)
395             zsys_debug ("zloop: register %s reader", zsock_type_str (sock));
396         return 0;
397     }
398     else
399         return -1;
400 }
401 
402 
403 //  --------------------------------------------------------------------------
404 //  Cancel a socket reader from the reactor. If multiple readers exist for
405 //  same socket, cancels ALL of them.
406 
407 void
zloop_reader_end(zloop_t * self,zsock_t * sock)408 zloop_reader_end (zloop_t *self, zsock_t *sock)
409 {
410     assert (self);
411     assert (sock);
412 
413     s_reader_t *reader = (s_reader_t *) zlistx_first (self->readers);
414     while (reader) {
415         if (reader->sock == sock) {
416             zlistx_delete (self->readers, reader->list_handle);
417             self->need_rebuild = true;
418         }
419         reader = (s_reader_t *) zlistx_next (self->readers);
420     }
421     if (self->verbose)
422         zsys_debug ("zloop: cancel %s reader", zsock_type_str (sock));
423 }
424 
425 
426 //  --------------------------------------------------------------------------
427 //  Configure a registered reader to ignore errors. If you do not set this,
428 //  then reader that have errors are removed from the reactor silently.
429 
430 void
zloop_reader_set_tolerant(zloop_t * self,zsock_t * sock)431 zloop_reader_set_tolerant (zloop_t *self, zsock_t *sock)
432 {
433     assert (self);
434     assert (sock);
435 
436     s_reader_t *reader = (s_reader_t *) zlistx_first (self->readers);
437     while (reader) {
438         if (reader->sock == sock)
439             reader->tolerant = true;
440         reader = (s_reader_t *) zlistx_next (self->readers);
441     }
442 }
443 
444 
445 //  --------------------------------------------------------------------------
446 //  Register low-level libzmq pollitem with the reactor. When the pollitem
447 //  is ready, will call the handler, passing the arg. Returns 0 if OK, -1
448 //  if there was an error. If you register the pollitem more than once, each
449 //  instance will invoke its corresponding handler. A pollitem with
450 //  socket=NULL and fd=0 means 'poll on FD zero'.
451 
452 int
zloop_poller(zloop_t * self,zmq_pollitem_t * item,zloop_fn handler,void * arg)453 zloop_poller (zloop_t *self, zmq_pollitem_t *item, zloop_fn handler, void *arg)
454 {
455     assert (self);
456 
457     if (item->socket
458     &&  streq (zsys_sockname (zsock_type (item->socket)), "UNKNOWN"))
459         return -1;
460 
461     s_poller_t *poller = s_poller_new (item, handler, arg);
462     assert (poller);
463 
464     poller->list_handle = zlistx_add_end (self->pollers, poller);
465     assert (poller->list_handle);
466     self->need_rebuild = true;
467     if (self->verbose)
468         zsys_debug ("zloop: register %s poller (%p, %d)",
469                     item->socket? zsys_sockname (zsock_type (item->socket)): "FD",
470                     item->socket, item->fd);
471     return 0;
472 }
473 
474 
475 //  --------------------------------------------------------------------------
476 //  Cancel a pollitem from the reactor, specified by socket or FD. If both
477 //  are specified, uses only socket. If multiple poll items exist for same
478 //  socket/FD, cancels ALL of them.
479 
480 void
zloop_poller_end(zloop_t * self,zmq_pollitem_t * item)481 zloop_poller_end (zloop_t *self, zmq_pollitem_t *item)
482 {
483     assert (self);
484 
485     s_poller_t *poller = (s_poller_t *) zlistx_first (self->pollers);
486     while (poller) {
487         bool match = false;
488         if (item->socket) {
489             if (item->socket == poller->item.socket)
490                 match = true;
491         }
492         else {
493             if (item->fd == poller->item.fd)
494                 match = true;
495         }
496         if (match) {
497             zlistx_delete (self->pollers, poller->list_handle);
498             //  Force rebuild to avoid reading from freed poller
499             self->need_rebuild = true;
500         }
501         poller = (s_poller_t *) zlistx_next (self->pollers);
502     }
503     if (self->verbose)
504         zsys_debug ("zloop: cancel %s poller (%p, %d)",
505                     item->socket? zsys_sockname (zsock_type (item->socket)): "FD",
506                     item->socket, item->fd);
507 }
508 
509 
510 //  --------------------------------------------------------------------------
511 //  Configure a registered poller to ignore errors. If you do not set this,
512 //  then poller that have errors are removed from the reactor silently.
513 
514 void
zloop_poller_set_tolerant(zloop_t * self,zmq_pollitem_t * item)515 zloop_poller_set_tolerant (zloop_t *self, zmq_pollitem_t *item)
516 {
517     assert (self);
518 
519     //  Find matching poller(s) and mark as tolerant
520     s_poller_t *poller = (s_poller_t *) zlistx_first (self->pollers);
521     while (poller) {
522         bool match = false;
523         if (item->socket) {
524             if (item->socket == poller->item.socket)
525                 match = true;
526         }
527         else {
528             if (item->fd == poller->item.fd)
529                 match = true;
530         }
531         if (match)
532             poller->tolerant = true;
533 
534         poller = (s_poller_t *) zlistx_next (self->pollers);
535     }
536 }
537 
538 
539 //  --------------------------------------------------------------------------
540 //  Register a timer that expires after some delay and repeats some number of
541 //  times. At each expiry, will call the handler, passing the arg. To run a
542 //  timer forever, use 0 times. Returns a timer_id that is used to cancel the
543 //  timer in the future. Returns -1 if there was an error.
544 
545 int
zloop_timer(zloop_t * self,size_t delay,size_t times,zloop_timer_fn handler,void * arg)546 zloop_timer (zloop_t *self, size_t delay, size_t times, zloop_timer_fn handler, void *arg)
547 {
548     assert (self);
549     //  Catch excessive use of timers
550     if (self->max_timers && zlistx_size (self->timers) == self->max_timers) {
551         zsys_error ("zloop: timer limit reached (max=%d)", self->max_timers);
552         return -1;
553     }
554     int timer_id = s_next_timer_id (self);
555     s_timer_t *timer = s_timer_new (timer_id, delay, times, handler, arg);
556     if (timer) {
557         timer->list_handle = zlistx_add_end (self->timers, timer);
558         assert (timer->list_handle);
559         if (self->verbose)
560             zsys_debug ("zloop: register timer id=%d delay=%d times=%d",
561                         timer_id, (int) delay, (int) times);
562         return timer_id;
563     }
564     else
565         return -1;
566 }
567 
568 
569 //  --------------------------------------------------------------------------
570 //  Cancel a timer by timer id (as returned by zloop_timer()).
571 //  Returns 0 on success.
572 
573 int
zloop_timer_end(zloop_t * self,int timer_id)574 zloop_timer_end (zloop_t *self, int timer_id)
575 {
576     assert (self);
577 
578     if (self->terminated)
579         s_timer_remove (self, timer_id);
580     else
581         //  We cannot touch self->timers because we may be executing that
582         //  from inside the poll loop. So, we hold the arg on the zombie
583         //  list, and process that list when we're done executing timers.
584         //  This hack lets us store an integer timer ID as a pointer
585         zlistx_add_end (self->zombies, (byte *) NULL + timer_id);
586 
587     if (self->verbose)
588         zsys_debug ("zloop: cancel timer id=%d", timer_id);
589 
590     return 0;
591 }
592 
593 
594 //  --------------------------------------------------------------------------
595 //  Register a ticket timer. Ticket timers are very fast in the case where
596 //  you use a lot of timers (thousands), and frequently remove and add them.
597 //  The main use case is expiry timers for servers that handle many clients,
598 //  and which reset the expiry timer for each message received from a client.
599 //  Whereas normal timers perform poorly as the number of clients grows, the
600 //  cost of ticket timers is constant, no matter the number of clients. You
601 //  must set the ticket delay using zloop_set_ticket_delay before creating a
602 //  ticket. Returns a handle to the timer that you should use in
603 //  zloop_ticket_reset and zloop_ticket_delete.
604 
605 void *
zloop_ticket(zloop_t * self,zloop_timer_fn handler,void * arg)606 zloop_ticket (zloop_t *self, zloop_timer_fn handler, void *arg)
607 {
608     assert (self);
609     assert (self->ticket_delay > 0);
610     s_ticket_t *ticket = s_ticket_new (self->ticket_delay, handler, arg);
611     if (ticket) {
612         ticket->list_handle = zlistx_add_end (self->tickets, ticket);
613         assert (ticket->list_handle);
614     }
615     return ticket;
616 }
617 
618 
619 //  --------------------------------------------------------------------------
620 //  Reset a ticket timer, which moves it to the end of the ticket list and
621 //  resets its execution time. This is a very fast operation.
622 
623 void
zloop_ticket_reset(zloop_t * self,void * handle)624 zloop_ticket_reset (zloop_t *self, void *handle)
625 {
626     s_ticket_t *ticket = (s_ticket_t *) handle;
627     assert (ticket->tag == TICKET_TAG);
628     ticket->when = zclock_mono () + ticket->delay;
629     zlistx_move_end (self->tickets, ticket->list_handle);
630 }
631 
632 
633 //  --------------------------------------------------------------------------
634 //  Delete a ticket timer. We do not actually delete the ticket here, as
635 //  other code may still refer to the ticket. We mark as deleted, and remove
636 //  later and safely.
637 
638 void
zloop_ticket_delete(zloop_t * self,void * handle)639 zloop_ticket_delete (zloop_t *self, void *handle)
640 {
641     s_ticket_t *ticket = (s_ticket_t *) handle;
642     assert (ticket->tag == TICKET_TAG);
643     ticket->deleted = true;
644     //  Move deleted tickets to end of list for fast cleanup
645     zlistx_move_end (self->tickets, ticket->list_handle);
646 }
647 
648 
649 //  --------------------------------------------------------------------------
650 //  Set the ticket delay, which applies to all tickets. If you lower the
651 //  delay and there are already tickets created, the results are undefined.
652 
653 void
zloop_set_ticket_delay(zloop_t * self,size_t ticket_delay)654 zloop_set_ticket_delay (zloop_t *self, size_t ticket_delay)
655 {
656     assert (self);
657     self->ticket_delay = ticket_delay;
658 }
659 
660 
661 //  --------------------------------------------------------------------------
662 //  Set verbose tracing of reactor on/off
663 
664 void
zloop_set_verbose(zloop_t * self,bool verbose)665 zloop_set_verbose (zloop_t *self, bool verbose)
666 {
667     assert (self);
668     self->verbose = verbose;
669 }
670 
671 //  --------------------------------------------------------------------------
672 //  By default the reactor stops if the process receives a SIGINT or SIGTERM
673 //  signal. This makes it impossible to shut-down message based architectures
674 //  like zactors. This method lets you switch off break handling. The default
675 //  nonstop setting is off (false).
676 
677 void
zloop_set_nonstop(zloop_t * self,bool nonstop)678 zloop_set_nonstop (zloop_t *self, bool nonstop)
679 {
680     assert (self);
681     self->nonstop = nonstop;
682 }
683 
684 
685 //  --------------------------------------------------------------------------
686 //  Set hard limit on number of timers allowed. Setting more than a small
687 //  number of timers (10-100) can have a dramatic impact on the performance
688 //  of the reactor. For high-volume cases, use ticket timers. If the hard
689 //  limit is reached, the reactor stops creating new timers and logs an
690 //  error.
691 
692 void
zloop_set_max_timers(zloop_t * self,size_t max_timers)693 zloop_set_max_timers (zloop_t *self, size_t max_timers)
694 {
695     assert (self);
696     self->max_timers = max_timers;
697 }
698 
699 
700 //  --------------------------------------------------------------------------
701 //  Start the reactor. Takes control of the thread and returns when the 0MQ
702 //  context is terminated or the process is interrupted, or any event handler
703 //  returns -1. Event handlers may register new sockets and timers, and
704 //  cancel sockets. Returns 0 if interrupted, -1 if canceled by a handler,
705 //  positive on internal error
706 
707 int
zloop_start(zloop_t * self)708 zloop_start (zloop_t *self)
709 {
710     assert (self);
711     int rc = 0;
712 
713     //  Main reactor loop
714     while (!zsys_interrupted || self->nonstop) {
715         if (rc == -1)      // somebody wanted us to quit
716             break;
717         if (self->need_rebuild) {
718             //  If s_rebuild_pollset() fails, break out of the loop and
719             //  return its error
720             rc = s_rebuild_pollset (self);
721             if (rc)
722                 break;
723         }
724         rc = zmq_poll (self->pollset, (int) self->poll_size, s_tickless (self));
725         if (rc == -1 || (zsys_interrupted && !self->nonstop)) {
726             if (self->verbose) {
727                 if (rc == -1)
728                     zsys_debug ("zloop: interrupted: %s", strerror (errno));
729                 else
730                     zsys_debug ("zloop: zsys_interrupted");
731             }
732             rc = 0;
733             break;              //  Context has been shut down
734         }
735 
736         //  Handle any timers that have now expired
737         int64_t time_now = zclock_mono ();
738         s_timer_t *timer = (s_timer_t *) zlistx_first (self->timers);
739         while (timer) {
740             if (time_now >= timer->when) {
741                 if (self->verbose)
742                     zsys_debug ("zloop: call timer handler id=%d", timer->timer_id);
743                 rc = timer->handler (self, timer->timer_id, timer->arg);
744                 if (timer->times && --timer->times == 0)
745                     zlistx_delete (self->timers, timer->list_handle);
746                 else
747                     timer->when += timer->delay;
748                 if (rc == -1)
749                     break;      //  Timer handler signaled break
750             }
751             timer = (s_timer_t *) zlistx_next (self->timers);
752         }
753 
754         //  Handle any tickets that have now expired
755         s_ticket_t *ticket = (s_ticket_t *) zlistx_first (self->tickets);
756         while (ticket && time_now >= ticket->when) {
757             if (self->verbose)
758                 zsys_debug ("zloop: call ticket handler");
759             if (!ticket->deleted
760             && ticket->handler (self, 0, ticket->arg) == -1) {
761                 rc = -1;    //  Trigger exit from zloop_start
762                 break;      //  Ticket handler signaled break
763             }
764             zlistx_delete (self->tickets, ticket->list_handle);
765             ticket = (s_ticket_t *) zlistx_next (self->tickets);
766         }
767 
768         //  Handle any tickets that were flagged for deletion
769         ticket = (s_ticket_t *) zlistx_last (self->tickets);
770         while (ticket && ticket->deleted) {
771             zlistx_delete (self->tickets, ticket->list_handle);
772             ticket = (s_ticket_t *) zlistx_last (self->tickets);
773         }
774 
775         //  Check if timers changed pollset
776         if (self->need_rebuild)
777             continue;
778 
779         //  Handle any readers and pollers that are ready
780         size_t item_nbr;
781         for (item_nbr = 0; item_nbr < self->poll_size && rc >= 0; item_nbr++) {
782             s_reader_t *reader = &self->readact [item_nbr];
783             if (reader->handler) {
784                 if ((self->pollset [item_nbr].revents & ZMQ_POLLERR)
785                 && !reader->tolerant) {
786                     if (self->verbose)
787                         zsys_warning ("zloop: can't read %s socket: %s",
788                                       zsock_type_str (reader->sock),
789                                       zmq_strerror (zmq_errno ()));
790                     //  Give handler one chance to handle error, then kill
791                     //  reader because it'll disrupt the reactor otherwise.
792                     if (reader->errors++) {
793                         zloop_reader_end (self, reader->sock);
794                         self->pollset [item_nbr].revents = 0;
795                     }
796                 }
797                 else
798                     reader->errors = 0;     //  A non-error happened
799 
800                 if (self->pollset [item_nbr].revents) {
801                     if (self->verbose)
802                         zsys_debug ("zloop: call %s socket handler",
803                                     zsock_type_str (reader->sock));
804                     rc = reader->handler (self, reader->sock, reader->arg);
805                     if (rc == -1 || self->need_rebuild)
806                         break;
807                 }
808             }
809             else {
810                 s_poller_t *poller = &self->pollact [item_nbr];
811                 assert (self->pollset [item_nbr].socket == poller->item.socket);
812 
813                 if ((self->pollset [item_nbr].revents & ZMQ_POLLERR)
814                 && !poller->tolerant) {
815                     if (self->verbose)
816                         zsys_warning ("zloop: can't poll %s socket (%p, %d): %s",
817                                       poller->item.socket?
818                                       zsys_sockname (zsock_type (poller->item.socket)): "FD",
819                                       poller->item.socket, poller->item.fd,
820                                       zmq_strerror (zmq_errno ()));
821                     //  Give handler one chance to handle error, then kill
822                     //  poller because it'll disrupt the reactor otherwise.
823                     if (poller->errors++) {
824                         zloop_poller_end (self, &poller->item);
825                         self->pollset [item_nbr].revents = 0;
826                     }
827                 }
828                 else
829                     poller->errors = 0;     //  A non-error happened
830 
831                 if (self->pollset [item_nbr].revents) {
832                     if (self->verbose)
833                         zsys_debug ("zloop: call %s socket handler (%p, %d)",
834                                     poller->item.socket?
835                                     zsys_sockname (zsock_type (poller->item.socket)): "FD",
836                                     poller->item.socket, poller->item.fd);
837                     rc = poller->handler (self, &self->pollset [item_nbr], poller->arg);
838                     if (rc == -1 || self->need_rebuild)
839                         break;
840                 }
841             }
842         }
843         //  Now handle any timer zombies
844         //  This is going to be slow if we have many timers; we might use
845         //  a faster lookup on the timer list.
846         while (zlistx_first (self->zombies)) {
847             //  Get timer_id back from pointer
848             ptrdiff_t timer_id = (byte *) zlistx_detach (self->zombies, NULL) - (byte *) NULL;
849             s_timer_remove (self, (int) timer_id);
850         }
851     }
852     self->terminated = true;
853     return rc;
854 }
855 
856 
857 //  --------------------------------------------------------------------------
858 //  Selftest
859 
860 static int
s_cancel_timer_event(zloop_t * loop,int timer_id,void * arg)861 s_cancel_timer_event (zloop_t *loop, int timer_id, void *arg)
862 {
863     //  We are handling timer 2, and will cancel timer 1
864     int cancel_timer_id = *((int *) arg);
865     return zloop_timer_end (loop, cancel_timer_id);
866 }
867 
868 
869 static int
s_timer_event(zloop_t * loop,int timer_id,void * output)870 s_timer_event (zloop_t *loop, int timer_id, void *output)
871 {
872     zstr_send (output, "PING");
873     return 0;
874 }
875 
876 static int
s_socket_event(zloop_t * loop,zsock_t * handle,void * arg)877 s_socket_event (zloop_t *loop, zsock_t *handle, void *arg)
878 {
879     //  Just end the reactor
880     return -1;
881 }
882 
883 static int
s_timer_event3(zloop_t * loop,int timer_id,void * called)884 s_timer_event3 (zloop_t *loop, int timer_id, void *called)
885 {
886     *((bool*) called) = true;
887     //  end the reactor
888     return -1;
889 }
890 
891 static int
s_socket_event1(zloop_t * loop,zsock_t * reader,void * called)892 s_socket_event1 (zloop_t *loop, zsock_t *reader, void *called)
893 {
894     *((bool*) called) = true;
895     //  end the reactor
896     return -1;
897 }
898 
899 static int
s_timer_event4(zloop_t * loop,int timer_id,void * arg)900 s_timer_event4 (zloop_t *loop, int timer_id, void *arg)
901 {
902     //  Just end the looper
903     return -1;
904 }
905 
906 static int
s_timer_event5(zloop_t * loop,int timer_id,void * arg)907 s_timer_event5 (zloop_t *loop, int timer_id, void *arg)
908 {
909     //  remove reader from loop
910     zloop_reader_end(loop, (zsock_t *) arg);
911 
912     //  end reactor on next run
913     zloop_timer(loop, 1, 1, s_timer_event4, NULL);
914 
915     return 0;
916 }
917 
918 void
zloop_test(bool verbose)919 zloop_test (bool verbose)
920 {
921     printf (" * zloop: ");
922     int rc = 0;
923     //  @selftest
924     //  Create two PAIR sockets and connect over inproc
925     zsock_t *output = zsock_new (ZMQ_PAIR);
926     assert (output);
927     zsock_bind (output, "inproc://zloop.test");
928 
929     zsock_t *input = zsock_new (ZMQ_PAIR);
930     assert (input);
931     zsock_connect (input, "inproc://zloop.test");
932 
933     zloop_t *loop = zloop_new ();
934     assert (loop);
935     zloop_set_verbose (loop, verbose);
936 
937     //  Create a timer that will be cancelled
938     int timer_id = zloop_timer (loop, 1000, 1, s_timer_event, NULL);
939     zloop_timer (loop, 5, 1, s_cancel_timer_event, &timer_id);
940 
941     //  After 20 msecs, send a ping message to output3
942     zloop_timer (loop, 20, 1, s_timer_event, output);
943 
944     //  Set up some tickets that will never expire
945     zloop_set_ticket_delay (loop, 10000);
946     void *ticket1 = zloop_ticket (loop, s_timer_event, NULL);
947     void *ticket2 = zloop_ticket (loop, s_timer_event, NULL);
948     void *ticket3 = zloop_ticket (loop, s_timer_event, NULL);
949 
950     //  When we get the ping message, end the reactor
951     rc = zloop_reader (loop, input, s_socket_event, NULL);
952     assert (rc == 0);
953     zloop_reader_set_tolerant (loop, input);
954     zloop_start (loop);
955 
956     zloop_ticket_delete (loop, ticket1);
957     zloop_ticket_delete (loop, ticket2);
958     zloop_ticket_delete (loop, ticket3);
959 
960     //  Check whether loop properly ignores zsys_interrupted flag
961     //  when asked to
962     zloop_destroy (&loop);
963     loop = zloop_new ();
964 
965     bool timer_event_called = false;
966     zloop_timer (loop, 1, 1, s_timer_event3, &timer_event_called);
967 
968     zsys_interrupted = 1;
969     zloop_start (loop);
970     //  zloop returns immediately without giving any handler a chance to run
971     assert (!timer_event_called);
972 
973     zloop_set_nonstop (loop, true);
974     zloop_start (loop);
975     //  zloop runs the handler which will terminate the loop
976     assert (timer_event_called);
977     zsys_interrupted = 0;
978 
979     //  Check if reader removed in timer is not called
980     zloop_destroy (&loop);
981     loop = zloop_new ();
982 
983     bool socket_event_called = false;
984     zloop_reader (loop, output, s_socket_event1, &socket_event_called);
985     zloop_timer (loop, 0, 1, s_timer_event5, output);
986 
987     zstr_send (input, "PING");
988 
989     zloop_start (loop);
990     assert (!socket_event_called);
991 
992     //  cleanup
993     zloop_destroy (&loop);
994     assert (loop == NULL);
995 
996     zsock_destroy (&input);
997     zsock_destroy (&output);
998 
999 #if defined (__WINDOWS__)
1000     zsys_shutdown();
1001 #endif
1002     //  @end
1003     printf ("OK\n");
1004 }
1005