1 /*
2  * This file is part of the Sofia-SIP package
3  *
4  * Copyright (C) 2005 Nokia Corporation.
5  *
6  * Contact: Pekka Pessi <pekka.pessi@nokia.com>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public License
10  * as published by the Free Software Foundation; either version 2.1 of
11  * the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
21  * 02110-1301 USA
22  *
23  */
24 
25 /**@ingroup su_wait
26  * @CFILE su_osx_runloop.c
27  *
28  * OS-Independent Socket Syncronization Interface.
29  *
30  * This looks like nth reincarnation of "reactor".  It implements the
31  * poll/select/WaitForMultipleObjects and message passing functionality.
32  *
33  * @author Pekka Pessi <Pekka.Pessi@nokia.com>
34  * @author Martti Mela <martti.mela@nokia.com>
35  *
36  * @date Created: Tue Sep 14 15:51:04 1999 ppessi
37  */
38 
39 #include "config.h"
40 
41 #include <stdlib.h>
42 #include <assert.h>
43 #include <stdarg.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <limits.h>
47 #include <errno.h>
48 
49 #define su_port_s su_osx_port_s
50 
51 #include "su_port.h"
52 #include "sofia-sip/su_osx_runloop.h"
53 #include "sofia-sip/su_alloc.h"
54 #include "sofia-sip/su_debug.h"
55 
56 #if HAVE_FUNC
57 #define enter (void)SU_DEBUG_9(("%s: entering\n", __func__))
58 #elif HAVE_FUNCTION
59 #define enter (void)SU_DEBUG_9(("%s: entering\n", __FUNCTION__))
60 #else
61 #define enter (void)0
62 #endif
63 
64 static su_port_t *su_osx_runloop_create(void) __attribute__((__malloc__));
65 
66 /* Callback for CFObserver and CFSocket */
67 static void cf_observer_cb(CFRunLoopObserverRef observer,
68 			   CFRunLoopActivity activity,
69 			   void *info);
70 
71 static void su_osx_port_socket_cb(CFSocketRef s,
72 				  CFSocketCallBackType callbackType,
73 				  CFDataRef address,
74 				  const void *data,
75 				  void *info);
76 
77 static void su_osx_port_deinit(void *arg);
78 
su_osx_port_decref(su_port_t * self,int blocking,char const * who)79 static void su_osx_port_decref(su_port_t *self, int blocking, char const *who)
80 {
81   (void)su_base_port_decref(self, blocking, who);
82 }
83 
84 
85 static CFSocketCallBackType map_poll_event_to_cf_event(int events);
86 
87 static int su_osx_port_send(su_port_t *self, su_msg_r rmsg);
88 
89 static int su_osx_port_register(su_port_t *self,
90 			    su_root_t *root,
91 			    su_wait_t *wait,
92 			    su_wakeup_f callback,
93 			    su_wakeup_arg_t *arg,
94 			    int priority);
95 static int su_osx_port_unregister(su_port_t *port,
96 			      su_root_t *root,
97 			      su_wait_t *wait,
98 			      su_wakeup_f callback,
99 			      su_wakeup_arg_t *arg);
100 
101 static int su_osx_port_deregister(su_port_t *self, int i);
102 
103 static int su_osx_port_unregister_all(su_port_t *self,
104 			   su_root_t *root);
105 
106 static int su_osx_port_eventmask(su_port_t *, int , int, int );
107 static void su_osx_port_run(su_port_t *self);
108 static void su_osx_port_break(su_port_t *self);
109 static su_duration_t su_osx_port_step(su_port_t *self, su_duration_t tout);
110 
111 static int su_osx_port_multishot(su_port_t *port, int multishot);
112 
113 static int su_osx_port_wait_events(su_port_t *self, su_duration_t tout);
114 
su_osx_port_name(su_port_t const * self)115 static char const *su_osx_port_name(su_port_t const *self)
116 {
117   return "CFRunLoop";
118 }
119 
120 /*
121  * Port is a per-thread reactor.
122  *
123  * Multiple root objects executed by single thread share a su_port_t object.
124  */
125 struct su_osx_port_s {
126   su_socket_port_t sup_socket[1];
127 
128 #define sup_pthread sup_socket->sup_base
129 #define sup_base sup_socket->sup_base->sup_base
130 #define sup_home sup_socket->sup_base->sup_base->sup_home
131 
132   unsigned         sup_source_fired;
133 
134   CFRunLoopRef        sup_main_loop;
135   CFRunLoopSourceRef *sup_sources;
136   CFSocketRef        *sup_sockets;
137 
138   CFRunLoopObserverRef sup_observer;
139   CFRunLoopObserverContext sup_observer_cntx[1];
140   /* Struct for CFSocket callbacks; contains current CFSource index */
141   struct osx_magic {
142     su_port_t *o_port;
143     int        o_current;
144     int        o_count;
145   } osx_magic[1];
146 
147   unsigned         sup_multishot; /**< Multishot operation? */
148 
149   unsigned         sup_registers; /** Counter incremented by
150 				      su_port_register() or
151 				      su_port_unregister()
152 				   */
153   int              sup_n_waits; /**< Active su_wait_t in su_waits */
154   int              sup_size_waits; /**< Size of allocate su_waits */
155 
156   int              sup_pri_offset; /**< Offset to prioritized waits */
157 
158 #define INDEX_MAX (0x7fffffff)
159 
160   /** Indices from index returned by su_root_register() to tables below.
161    *
162    * Free elements are negative. Free elements form a list, value of free
163    * element is (0 - index of next free element).
164    *
165    * First element sup_indices[0] points to first free element.
166    */
167   int             *sup_indices;
168 
169   int             *sup_reverses; /** Reverse index */
170   su_wakeup_f     *sup_wait_cbs;
171   su_wakeup_arg_t**sup_wait_args;
172   su_root_t      **sup_wait_roots;
173 
174   su_wait_t       *sup_waits;
175 };
176 
177 
178 su_port_vtable_t const su_osx_port_vtable[1] =
179   {{
180       /* su_vtable_size: */ sizeof su_osx_port_vtable,
181       su_pthread_port_lock,
182       su_pthread_port_unlock,
183       su_base_port_incref,
184       su_osx_port_decref,
185       su_base_port_gsource,
186       su_osx_port_send,
187       su_osx_port_register,
188       su_osx_port_unregister,
189       su_osx_port_deregister,
190       su_osx_port_unregister_all,
191       su_osx_port_eventmask,
192       su_osx_port_run,
193       su_osx_port_break,
194       su_osx_port_step,
195       su_pthread_port_thread,
196       su_base_port_add_prepoll,
197       su_base_port_remove_prepoll,
198       su_base_port_timers,
199       su_osx_port_multishot,
200       su_osx_port_wait_events,
201       su_base_port_getmsgs,
202       su_base_port_getmsgs_from,
203       su_osx_port_name,
204       su_base_port_start_shared,
205       su_pthread_port_wait,
206       su_pthread_port_execute,
207       su_base_port_deferrable,
208       su_base_port_max_defer,
209       su_socket_port_wakeup,
210       su_base_port_is_running,
211     }};
212 
213 /* XXX - mela static void su_osx_port_destroy(su_port_t *self); */
214 
215 /** Create a reactor object.
216  *
217  * Allocate and initialize the instance of su_root_t.
218  *
219  * @param magic     pointer to user data
220  *
221  * @return A pointer to allocated su_root_t instance, NULL on error.
222  *
223  * @NEW_1_12_4.
224  */
su_root_osx_runloop_create(su_root_magic_t * magic)225 su_root_t *su_root_osx_runloop_create(su_root_magic_t *magic)
226 {
227   return su_root_create_with_port(magic, su_osx_runloop_create());
228 }
229 
osx_enabler_cb(CFSocketRef s,CFSocketCallBackType type,CFDataRef address,const void * data,void * info)230 void osx_enabler_cb(CFSocketRef s,
231 		    CFSocketCallBackType type,
232 		    CFDataRef address,
233 		    const void *data,
234 		    void *info)
235 {
236   CFRunLoopRef  rl;
237   struct osx_magic  *magic = (struct osx_magic *) info;
238   su_port_t    *self = magic->o_port;
239   su_duration_t tout = 0;
240   su_time_t     now = su_now();
241 
242   rl = CFRunLoopGetCurrent();
243 
244   if (self->sup_base->sup_running) {
245 
246     if (self->sup_base->sup_prepoll)
247       self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
248 
249     if (self->sup_base->sup_head)
250       su_base_port_getmsgs(self);
251 
252     if (self->sup_base->sup_timers)
253       su_timer_expire(&self->sup_base->sup_timers, &tout, now);
254   }
255 
256   CFRunLoopWakeUp(rl);
257 }
258 
259 
260 /**@internal
261  *
262  * Allocates and initializes a message port.
263  *
264  * @return
265  *   If successful a pointer to the new message port is returned, otherwise
266  *   NULL is returned.
267  */
su_osx_runloop_create(void)268 su_port_t *su_osx_runloop_create(void)
269 {
270   su_port_t *self = su_home_new(sizeof *self);
271 
272   if (!self)
273     return self;
274 
275   enter;
276 
277   if (su_home_destructor(su_port_home(self), su_osx_port_deinit) < 0)
278     return su_home_unref(su_port_home(self)), NULL;
279 
280   self->sup_multishot = SU_ENABLE_MULTISHOT_POLL;
281 
282   if (su_socket_port_init(self->sup_base, su_osx_port_vtable) == 0) {
283     self->osx_magic->o_port = self;
284     self->sup_observer_cntx->info = self->osx_magic;
285     self->sup_observer =
286       CFRunLoopObserverCreate(NULL,
287 			      kCFRunLoopAfterWaiting | kCFRunLoopBeforeWaiting,
288 			      TRUE, 0, cf_observer_cb, self->sup_observer_cntx);
289 #if 0
290     CFRunLoopAddObserver(CFRunLoopGetCurrent(),
291 			 self->sup_observer,
292 			 kCFRunLoopDefaultMode);
293 #endif
294   }
295   else
296     return su_home_unref(su_port_home(self)), NULL;
297 
298   return self;
299 }
300 
301 static
cf_observer_cb(CFRunLoopObserverRef observer,CFRunLoopActivity activity,void * info)302 void cf_observer_cb(CFRunLoopObserverRef observer,
303 		    CFRunLoopActivity activity,
304 		    void *info)
305 {
306   CFRunLoopRef  rl;
307   struct osx_magic  *magic = (struct osx_magic *) info;
308   su_port_t    *self = magic->o_port;
309   su_duration_t tout = 0;
310   su_time_t     now = su_now();
311 
312   rl = CFRunLoopGetCurrent();
313 
314   if (self->sup_base->sup_running) {
315 
316     if (self->sup_base->sup_prepoll)
317       self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
318 
319     if (self->sup_base->sup_head)
320       su_port_getmsgs(self);
321 
322     if (self->sup_base->sup_timers)
323       su_timer_expire(&self->sup_base->sup_timers, &tout, now);
324   } else
325     SU_DEBUG_9(("cf_observer_cb(): PORT IS NOT RUNNING!\n"));
326 
327   CFRunLoopWakeUp(rl);
328 
329   return;
330 }
331 
332 /** @internal Destroy a port. */
su_osx_port_deinit(void * arg)333 static void su_osx_port_deinit(void *arg)
334 {
335   su_port_t *self = arg;
336 
337   SU_DEBUG_9(("%s(%p) called\n", "su_osx_port_deinit", (void *)self));
338 
339   su_socket_port_deinit(self->sup_base);
340 }
341 
342 static
map_poll_event_to_cf_event(int events)343 CFSocketCallBackType map_poll_event_to_cf_event(int events)
344 {
345   CFSocketCallBackType type = 0;
346 
347   if (events & SU_WAIT_IN)
348     type |= kCFSocketReadCallBack;
349 
350   if (events & SU_WAIT_OUT)
351     type |= kCFSocketWriteCallBack;
352 
353 #if 0
354   if (events & SU_WAIT_CONNECT)
355     type |= kCFSocketConnectCallBack;
356 
357   if (events & SU_WAIT_ACCEPT)
358     type |= kCFSocketAcceptCallBack;
359 #endif
360 
361   return type;
362 }
363 
364 
365 #if 0
366 static
367 int map_cf_event_to_poll_event(CFSocketCallBackType type)
368 {
369   int event = 0;
370 
371   if (type & kCFSocketReadCallBack)
372     event |= SU_WAIT_IN;
373 
374   if (type & kCFSocketWriteCallBack)
375     event |= SU_WAIT_OUT;
376 
377   if (type & kCFSocketConnectCallBack)
378     event |= SU_WAIT_CONNECT;
379 
380   if (type & kCFSocketAcceptCallBack)
381     event |= SU_WAIT_ACCEPT;
382 
383   return event;
384 }
385 #endif
386 
387 static
su_osx_port_socket_cb(CFSocketRef s,CFSocketCallBackType type,CFDataRef address,const void * data,void * info)388 void su_osx_port_socket_cb(CFSocketRef s,
389 			   CFSocketCallBackType type,
390 			   CFDataRef address,
391 			   const void *data,
392 			   void *info)
393 {
394   struct osx_magic *magic = (struct osx_magic *) info;
395   su_port_t        *self = magic->o_port;
396   int               curr = magic->o_current;
397   su_duration_t tout = 0;
398 
399 #if SU_HAVE_POLL
400   {
401     su_root_t *root;
402     su_wait_t *waits = self->sup_waits;
403     int n = self->sup_indices[curr];
404 
405     assert(self->sup_reverses[n] == curr);
406 
407     SU_DEBUG_9(("socket_cb(%p): count %u index %d\n", self->sup_sources[n], magic->o_count, curr));
408 
409     waits[n].revents = map_poll_event_to_cf_event(type);
410 
411     root = self->sup_wait_roots[n];
412     self->sup_wait_cbs[n](root ? su_root_magic(root) : NULL,
413 			  &waits[n],
414 			  self->sup_wait_args[n]);
415 
416     if (self->sup_base->sup_running) {
417       su_port_getmsgs(self);
418 
419       if (self->sup_base->sup_timers)
420 	su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
421 
422       if (self->sup_base->sup_head)
423 	tout = 0;
424 
425       /* CFRunLoopWakeUp(CFRunLoopGetCurrent()); */
426     }
427 
428     /* Tell to run loop an su socket fired */
429     self->sup_source_fired = 1;
430   }
431 #endif
432 
433 }
434 
435 /** @internal Send a message to the port. */
su_osx_port_send(su_port_t * self,su_msg_r rmsg)436 int su_osx_port_send(su_port_t *self, su_msg_r rmsg)
437 {
438   CFRunLoopRef rl;
439 
440   if (self) {
441     int wakeup;
442 
443     //XXX - mela SU_OSX_PORT_LOCK(self, "su_osx_port_send");
444 
445     wakeup = self->sup_base->sup_head == NULL;
446 
447     *self->sup_base->sup_tail = rmsg[0]; rmsg[0] = NULL;
448     self->sup_base->sup_tail = &(*self->sup_base->sup_tail)->sum_next;
449 
450 #if SU_HAVE_MBOX
451     /* if (!pthread_equal(pthread_self(), self->sup_tid)) */
452     if (wakeup)
453     {
454       assert(self->sup_mbox[MBOX_SEND] != INVALID_SOCKET);
455 
456       if (send(self->sup_mbox[MBOX_SEND], "X", 1, 0) == -1) {
457 #if HAVE_SOCKETPAIR
458 	if (su_errno() != EWOULDBLOCK)
459 #endif
460 	  su_perror("su_msg_send: send()");
461       }
462     }
463 #endif
464 
465     //XXX - mela SU_OSX_PORT_UNLOCK(self, "su_osx_port_send");
466 
467     rl = CFRunLoopGetCurrent();
468     CFRunLoopWakeUp(rl);
469 
470     return 0;
471   }
472   else {
473     su_msg_destroy(rmsg);
474     return -1;
475   }
476 }
477 static int o_count;
478 
479 /** @internal
480  *
481  *  Register a @c su_wait_t object. The wait object, a callback function and
482  *  a argument pointer is stored in the port object.  The callback function
483  *  will be called when the wait object is signaled.
484  *
485  *  Please note if identical wait objects are inserted, only first one is
486  *  ever signalled.
487  *
488  * @param self	     pointer to port
489  * @param root	     pointer to root object
490  * @param waits	     pointer to wait object
491  * @param callback   callback function pointer
492  * @param arg	     argument given to callback function when it is invoked
493  * @param priority   relative priority of the wait object
494  *              (0 is normal, 1 important, 2 realtime)
495  *
496  * @return
497  *   The function @su_osx_port_register returns nonzero index of the wait object,
498  *   or -1 upon an error.  */
su_osx_port_register(su_port_t * self,su_root_t * root,su_wait_t * wait,su_wakeup_f callback,su_wakeup_arg_t * arg,int priority)499 int su_osx_port_register(su_port_t *self,
500 			 su_root_t *root,
501 			 su_wait_t *wait,
502 			 su_wakeup_f callback,
503 			 su_wakeup_arg_t *arg,
504 			 int priority)
505 {
506   int i, j, n;
507   CFRunLoopRef rl;
508   CFRunLoopSourceRef *sources, source;
509   CFSocketRef cf_socket, *sockets;
510   int events = 0;
511   struct osx_magic *osx_magic = NULL;
512   CFSocketContext cf_socket_cntx[1] = {{0, NULL, NULL, NULL, NULL}};
513   CFOptionFlags flags = 0;
514 
515   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
516 
517   n = self->sup_n_waits;
518 
519   if (n >= SU_WAIT_MAX)
520     return su_seterrno(ENOMEM);
521 
522   if (n >= self->sup_size_waits) {
523     /* Reallocate size arrays */
524     int size;
525     int *indices;
526     int *reverses;
527     su_wait_t *waits;
528     su_wakeup_f *wait_cbs;
529     su_wakeup_arg_t **wait_args;
530     su_root_t **wait_tasks;
531 
532     if (self->sup_size_waits == 0)
533       size = su_root_size_hint;
534     else
535       size = 2 * self->sup_size_waits;
536 
537     if (size < SU_WAIT_MIN)
538       size = SU_WAIT_MIN;
539 
540     /* Too large */
541     if (-3 - size > 0)
542       return (errno = ENOMEM), -1;
543 
544     indices = realloc(self->sup_indices, (size + 1) * sizeof(*indices));
545     if (indices) {
546       self->sup_indices = indices;
547 
548       for (i = self->sup_size_waits; i <= size; i++)
549 	indices[i] = -1 - i;
550     }
551 
552     reverses = realloc(self->sup_reverses, size * sizeof(*waits));
553     if (reverses) {
554       for (i = self->sup_size_waits; i < size; i++)
555 	reverses[i] = -1;
556       self->sup_reverses = reverses;
557     }
558 
559     sources = realloc(self->sup_sources, size * sizeof(*sources));
560     if (sources)
561       self->sup_sources = sources;
562 
563     sockets = realloc(self->sup_sockets, size * sizeof(*sockets));
564     if (sockets)
565       self->sup_sockets = sockets;
566 
567     waits = realloc(self->sup_waits, size * sizeof(*waits));
568     if (waits)
569       self->sup_waits = waits;
570 
571     wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs));
572     if (wait_cbs)
573       self->sup_wait_cbs = wait_cbs;
574 
575     wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args));
576     if (wait_args)
577       self->sup_wait_args = wait_args;
578 
579     /* Add sup_wait_roots array, if needed */
580     wait_tasks = realloc(self->sup_wait_roots, size * sizeof(*wait_tasks));
581     if (wait_tasks)
582       self->sup_wait_roots = wait_tasks;
583 
584     if (!(indices &&
585 	  reverses && sources && sockets && waits && wait_cbs && wait_args && wait_tasks)) {
586       return -1;
587     }
588 
589     self->sup_size_waits = size;
590   }
591 
592   i = -self->sup_indices[0]; assert(i <= self->sup_size_waits);
593 
594   if (priority > 0) {
595     /* Insert */
596     for (n = self->sup_n_waits; n > 0; n--) {
597       j = self->sup_reverses[n-1]; assert(self->sup_indices[j] == n - 1);
598       self->sup_indices[j] = n;
599       self->sup_reverses[n] = self->sup_reverses[n-1];
600       self->sup_sources[n] = self->sup_sources[n-1];
601       self->sup_sockets[n] = self->sup_sockets[n-1];
602       self->sup_waits[n] = self->sup_waits[n-1];
603       self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1];
604       self->sup_wait_args[n] = self->sup_wait_args[n-1];
605       self->sup_wait_roots[n] = self->sup_wait_roots[n-1];
606     }
607 
608     self->sup_pri_offset++;
609   }
610   else {
611     /* Append - no need to move anything */
612     n = self->sup_n_waits;
613   }
614 
615   self->sup_n_waits++;
616 
617   self->sup_indices[0] = self->sup_indices[i];  /* Free index */
618   self->sup_indices[i] = n;
619 
620   self->sup_reverses[n] = i;
621   self->sup_waits[n] = *wait;
622   self->sup_wait_cbs[n] = callback;
623   self->sup_wait_args[n] = arg;
624   self->sup_wait_roots[n] = root;
625 
626   self->sup_registers++;
627 
628   /* XXX -- mela: leak, leak -- free() somewheeeere */
629   osx_magic = calloc(1, sizeof(*osx_magic));
630   osx_magic->o_port = self;
631   osx_magic->o_current = i;
632   osx_magic->o_count = ++o_count;
633   cf_socket_cntx->info = osx_magic;
634 
635   events = map_poll_event_to_cf_event(wait->events);
636 
637   cf_socket = CFSocketCreateWithNative(NULL,
638 				       (CFSocketNativeHandle) su_wait_socket(wait),
639 				       events, su_osx_port_socket_cb, cf_socket_cntx);
640 
641   flags = CFSocketGetSocketFlags(cf_socket);
642   flags &= ~kCFSocketCloseOnInvalidate;
643 
644   CFSocketSetSocketFlags(cf_socket, flags);
645 
646   CFRetain(cf_socket);
647   source = CFSocketCreateRunLoopSource(NULL, cf_socket, 0);
648 
649   SU_DEBUG_9(("source(%p): count %u index %d\n", source, o_count, i));
650 
651   rl = CFRunLoopGetCurrent();
652 
653   CFRunLoopAddSource(rl, source, kCFRunLoopDefaultMode);
654 
655   CFRetain(source);
656   self->sup_sources[n] = source;
657   self->sup_sockets[n] = cf_socket;
658 
659   CFRunLoopWakeUp(rl);
660 
661   /* Just like epoll, we return -1 or positive integer */
662 
663   return i;
664 }
665 
666 /** Deregister a su_wait_t object. */
667 static
su_osx_port_deregister0(su_port_t * self,int i)668 int su_osx_port_deregister0(su_port_t *self, int i)
669 {
670   CFRunLoopRef rl;
671   int n, N, *indices, *reverses;
672 
673   indices = self->sup_indices;
674   reverses = self->sup_reverses;
675 
676   n = indices[i]; assert(n >= 0); assert(i == reverses[n]);
677 
678   N = --self->sup_n_waits;
679 
680   rl = CFRunLoopGetCurrent();
681   CFSocketInvalidate(self->sup_sockets[n]);
682   CFRelease(self->sup_sockets[n]);
683   CFRunLoopRemoveSource(rl, self->sup_sources[n], kCFRunLoopDefaultMode);
684   CFRelease(self->sup_sources[n]);
685 
686   CFRunLoopWakeUp(rl);
687 
688   if (n < self->sup_pri_offset) {
689     int j = --self->sup_pri_offset;
690     if (n != j) {
691       assert(reverses[j] > 0);
692       assert(indices[reverses[j]] == j);
693       indices[reverses[j]] = n;
694       reverses[n] = reverses[j];
695 
696       self->sup_sources[n] = self->sup_sources[j];
697       self->sup_sockets[n] = self->sup_sockets[j];
698       self->sup_waits[n] = self->sup_waits[j];
699       self->sup_wait_cbs[n] = self->sup_wait_cbs[j];
700       self->sup_wait_args[n] = self->sup_wait_args[j];
701       self->sup_wait_roots[n] = self->sup_wait_roots[j];
702       n = j;
703     }
704   }
705 
706   if (n < N) {
707     assert(reverses[N] > 0);
708     assert(indices[reverses[N]] == N);
709 
710     indices[reverses[N]] = n;
711     reverses[n] = reverses[N];
712 
713     self->sup_sources[n] = self->sup_sources[N];
714     self->sup_sockets[n] = self->sup_sockets[N];
715     self->sup_waits[n] = self->sup_waits[N];
716     self->sup_wait_cbs[n] = self->sup_wait_cbs[N];
717     self->sup_wait_args[n] = self->sup_wait_args[N];
718     self->sup_wait_roots[n] = self->sup_wait_roots[N];
719     n = N;
720   }
721 
722 
723   reverses[n] = -1;
724   memset(&self->sup_waits[n], 0, sizeof self->sup_waits[n]);
725   self->sup_sources[n] = NULL;
726   self->sup_sockets[n] = NULL;
727   self->sup_wait_cbs[n] = NULL;
728   self->sup_wait_args[n] = NULL;
729   self->sup_wait_roots[n] = NULL;
730 
731   indices[i] = indices[0];
732   indices[0] = -i;
733 
734   self->sup_registers++;
735 
736   return i;
737 }
738 
739 
740 /** Unregister a su_wait_t object.
741  *
742  *  The function su_osx_port_unregister() unregisters a su_wait_t object. The
743  *  wait object, a callback function and a argument are removed from the
744  *  port object.
745  *
746  * @param self     - pointer to port object
747  * @param root     - pointer to root object
748  * @param wait     - pointer to wait object
749  * @param callback - callback function pointer (may be NULL)
750  * @param arg      - argument given to callback function when it is invoked
751  *                   (may be NULL)
752  *
753  * @return Nonzero index of the wait object, or -1 upon an error.
754  */
su_osx_port_unregister(su_port_t * self,su_root_t * root,su_wait_t * wait,su_wakeup_f callback,su_wakeup_arg_t * arg)755 int su_osx_port_unregister(su_port_t *self,
756 		       su_root_t *root,
757 		       su_wait_t *wait,
758 		       su_wakeup_f callback, /* XXX - ignored */
759 		       su_wakeup_arg_t *arg)
760 {
761   int n, N;
762 
763   assert(self);
764   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
765 
766   N = self->sup_n_waits;
767 
768   for (n = 0; n < N; n++) {
769     if (SU_WAIT_CMP(wait[0], self->sup_waits[n]) == 0) {
770       return su_osx_port_deregister0(self, self->sup_reverses[n]);
771     }
772   }
773 
774   su_seterrno(ENOENT);
775 
776   return -1;
777 }
778 
779 /** Deregister a su_wait_t object.
780  *
781  *  The function su_osx_port_deregister() deregisters a su_wait_t registrattion.
782  *  The wait object, a callback function and a argument are removed from the
783  *  port object.
784  *
785  * @param self     - pointer to port object
786  * @param i        - registration index
787  *
788  * @return Index of the wait object, or -1 upon an error.
789  */
su_osx_port_deregister(su_port_t * self,int i)790 int su_osx_port_deregister(su_port_t *self, int i)
791 {
792   su_wait_t wait[1] = { SU_WAIT_INIT };
793   int retval;
794 
795   assert(self);
796   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
797 
798   if (i <= 0 || i > self->sup_size_waits)
799     return su_seterrno(EBADF);
800 
801   if (self->sup_indices[i] < 0)
802     return su_seterrno(EBADF);
803 
804   retval = su_osx_port_deregister0(self, i);
805 
806   su_wait_destroy(wait);
807 
808   return retval;
809 }
810 
811 
812 /** @internal
813  * Unregister all su_wait_t objects.
814  *
815  * The function su_osx_port_unregister_all() unregisters all su_wait_t objects
816  * and destroys all queued timers associated with given root object.
817  *
818  * @param  self     - pointer to port object
819  * @param  root     - pointer to root object
820  *
821  * @return Number of wait objects removed.
822  */
su_osx_port_unregister_all(su_port_t * self,su_root_t * root)823 int su_osx_port_unregister_all(su_port_t *self,
824 			   su_root_t *root)
825 {
826   int i, j, index, N;
827   int                *indices, *reverses;
828   su_wait_t          *waits;
829   su_wakeup_f        *wait_cbs;
830   su_wakeup_arg_t   **wait_args;
831   su_root_t         **wait_roots;
832   CFRunLoopRef        rl;
833   CFRunLoopSourceRef *sources;
834   CFSocketRef        *sockets;
835 
836   // XXX - assert(SU_OSX_PORT_OWN_THREAD(self));
837 
838   N          = self->sup_n_waits;
839   indices    = self->sup_indices;
840   reverses   = self->sup_reverses;
841   sources    = self->sup_sources;
842   sockets    = self->sup_sockets;
843   waits      = self->sup_waits;
844   wait_cbs   = self->sup_wait_cbs;
845   wait_args  = self->sup_wait_args;
846   wait_roots = self->sup_wait_roots;
847 
848   rl = CFRunLoopGetCurrent();
849 
850   for (i = j = 0; i < N; i++) {
851     index = reverses[i]; assert(index > 0 && indices[index] == i);
852 
853     if (wait_roots[i] == root) {
854       if (i < self->sup_pri_offset)
855 	self->sup_pri_offset--;
856 
857       indices[index] = indices[0];
858       indices[0] = -index;
859       continue;
860     }
861 
862     if (i != j) {
863       indices[index] = j;
864 
865       CFSocketInvalidate(self->sup_sockets[j]);
866       CFRelease(self->sup_sockets[j]);
867       CFRunLoopRemoveSource(rl, sources[j], kCFRunLoopDefaultMode);
868       CFRelease(sources[j]);
869 
870       reverses[j]   = reverses[i];
871       sources[j]    = sources[i];
872       sockets[j]    = sockets[i];
873       waits[j]      = waits[i];
874       wait_cbs[j]   = wait_cbs[i];
875       wait_args[j]  = wait_args[i];
876       wait_roots[j] = wait_roots[i];
877     }
878 
879     j++;
880   }
881 
882   /* Prepare for removing CFSources */
883   for (i = j; i < N; i++) {
884     reverses[i] = -1;
885 
886     CFSocketInvalidate(self->sup_sockets[i]);
887     CFRelease(self->sup_sockets[i]);
888     CFRunLoopRemoveSource(rl, sources[i], kCFRunLoopDefaultMode);
889     CFRunLoopSourceInvalidate(sources[i]);
890 
891     sources[i] = NULL;
892     sockets[i] = NULL;
893     wait_cbs[i] = NULL;
894     wait_args[i] = NULL;
895     wait_roots[i] = NULL;
896   }
897   memset(&waits[j], 0, (char *)&waits[N] - (char *)&waits[j]);
898 
899   /* Tell run loop things have changed */
900   CFRunLoopWakeUp(rl);
901 
902   self->sup_n_waits = j;
903   self->sup_registers++;
904 
905   return N - j;
906 }
907 
908 /**Set mask for a registered event. @internal
909  *
910  * The function su_osx_port_eventmask() sets the mask describing events that can
911  * signal the registered callback.
912  *
913  * @param port   pointer to port object
914  * @param index  registration index
915  * @param socket socket
916  * @param events new event mask
917  *
918  * @retval 0 when successful,
919  * @retval -1 upon an error.
920  */
su_osx_port_eventmask(su_port_t * self,int index,int socket,int events)921 int su_osx_port_eventmask(su_port_t *self, int index, int socket, int events)
922 {
923   int n, ret;
924 
925   assert(self);
926   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
927 
928   if (index <= 0 || index > self->sup_size_waits)
929     return su_seterrno(EBADF);
930   n = self->sup_indices[index];
931   if (n < 0)
932     return su_seterrno(EBADF);
933 
934   ret = su_wait_mask(&self->sup_waits[n], socket, events);
935 
936   CFSocketSetSocketFlags(self->sup_sockets[n],
937 			 map_poll_event_to_cf_event(events));
938 
939   return ret;
940 }
941 
942 /** @internal
943  *
944  *  Copies the su_wait_t objects from the port. The number of wait objects
945  *  can be found out by calling su_osx_port_query() with @a n_waits as zero.
946  *
947  * @note This function is called only by friends.
948  *
949  * @param self     - pointer to port object
950  * @param waits    - pointer to array to which wait objects are copied
951  * @param n_waits  - number of wait objects fitting in array waits
952  *
953  * @return Number of wait objects, or 0 upon an error.
954  */
su_osx_port_query(su_port_t * self,su_wait_t * waits,unsigned n_waits)955 unsigned su_osx_port_query(su_port_t *self, su_wait_t *waits, unsigned n_waits)
956 {
957   unsigned n;
958 
959   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
960 
961   n = self->sup_n_waits;
962 
963   if (n_waits != 0) {
964     if (waits && n_waits >= n)
965       memcpy(waits, self->sup_waits, n * sizeof(*waits));
966     else
967       n = 0;
968   }
969 
970   return n;
971 }
972 
973 /** @internal Enable multishot mode.
974  *
975  * The function su_osx_port_multishot() enables, disables or queries the
976  * multishot mode for the port. The multishot mode determines how the events
977  * are scheduled by port. If multishot mode is enabled, port serves all the
978  * sockets that have received network events. If it is disables, only first
979  * socket event is served.
980  *
981  * @param self      pointer to port object
982  * @param multishot multishot mode (0 => disables, 1 => enables, -1 => query)
983  *
984  * @retval 0 multishot mode is disabled
985  * @retval 1 multishot mode is enabled
986  * @retval -1 an error occurred
987  */
su_osx_port_multishot(su_port_t * self,int multishot)988 int su_osx_port_multishot(su_port_t *self, int multishot)
989 {
990   if (multishot < 0)
991     return self->sup_multishot;
992   else if (multishot == 0 || multishot == 1)
993     return self->sup_multishot = multishot;
994   else
995     return (errno = EINVAL), -1;
996 }
997 
998 #if 0
999 /** @internal Enable threadsafe operation. */
1000 static
1001 int su_osx_port_threadsafe(su_port_t *port)
1002 {
1003   return su_home_threadsafe(port->sup_home);
1004 }
1005 #endif
1006 
1007 /** Prepare root to be run on OSX Run Loop.
1008  *
1009  * Sets #su_root_t object to be callable by the application's run loop. This
1010  * function is to be used instead of su_root_run() for OSX applications
1011  * using Core Foundation's Run Loop.
1012  *
1013  * The function su_root_osx_prepare_run() returns immmediately.
1014  *
1015  * @param root     pointer to root object
1016  *
1017  * @NEW_1_12_4.
1018  */
su_root_osx_prepare_run(su_root_t * root)1019 void su_root_osx_prepare_run(su_root_t *root)
1020 {
1021   su_port_t *self = root->sur_task->sut_port;
1022   CFRunLoopRef rl;
1023   su_duration_t tout = 0;
1024 
1025   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
1026 
1027   enter;
1028 
1029   self->sup_base->sup_running = 1;
1030   rl = CFRunLoopGetCurrent();
1031 
1032   if (self->sup_base->sup_prepoll)
1033     self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1034 
1035   if (self->sup_base->sup_head)
1036     su_port_getmsgs(self);
1037 
1038   if (self->sup_base->sup_timers)
1039     su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1040 
1041   if (!self->sup_base->sup_running)
1042     return;
1043 
1044   CFRetain(rl);
1045   self->sup_main_loop = rl;
1046 
1047   return;
1048 }
1049 
1050 /** @internal Main loop.
1051  *
1052  * The function @c su_osx_port_run() waits for wait objects and the timers
1053  * associated with the port object.  When any wait object is signaled or
1054  * timer is expired, it invokes the callbacks, and returns waiting.
1055  *
1056  * The function @c su_osx_port_run() runs until @c su_osx_port_break() is called
1057  * from a callback.
1058  *
1059  * @param self     pointer to port object
1060  *
1061  */
su_osx_port_run(su_port_t * self)1062 void su_osx_port_run(su_port_t *self)
1063 {
1064   CFRunLoopRef rl;
1065   su_duration_t tout = 0;
1066 
1067   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
1068 
1069   enter;
1070 
1071   self->sup_base->sup_running = 1;
1072   rl = CFRunLoopGetCurrent();
1073 
1074   if (self->sup_base->sup_prepoll)
1075     self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1076 
1077   if (self->sup_base->sup_head)
1078     su_port_getmsgs(self);
1079 
1080   if (self->sup_base->sup_timers)
1081     su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1082 
1083   if (!self->sup_base->sup_running)
1084     return;
1085 
1086   CFRetain(rl);
1087   self->sup_main_loop = rl;
1088 
1089   /* if there are messages do a quick wait */
1090   if (self->sup_base->sup_head)
1091     tout = 0;
1092 
1093   CFRunLoopRun();
1094 
1095   self->sup_main_loop = NULL;
1096 
1097 }
1098 
1099 #if tuning
1100 /* This version can help tuning... */
su_osx_port_run_tune(su_port_t * self)1101 void su_osx_port_run_tune(su_port_t *self)
1102 {
1103   int i;
1104   int timers = 0, messages = 0, events = 0;
1105   su_duration_t tout = 0, tout0;
1106   su_time_t started = su_now(), woken = started, bedtime = woken;
1107 
1108   // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
1109 
1110   for (self->sup_base->sup_running = 1; self->sup_base->sup_running;) {
1111     tout0 = tout, tout = 2000;
1112 
1113     timers = 0, messages = 0;
1114 
1115     if (self->sup_base->sup_prepoll)
1116       self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1117 
1118     if (self->sup_base->sup_head)
1119       messages = su_port_getmsgs(self);
1120 
1121     if (self->sup_base->sup_timers)
1122       timers = su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1123 
1124     if (!self->sup_base->sup_running)
1125       break;
1126 
1127     if (self->sup_base->sup_head)      /* if there are messages do a quick wait */
1128       tout = 0;
1129 
1130     bedtime = su_now();
1131 
1132     events = su_osx_port_wait_events(self, tout);
1133 
1134     woken = su_now();
1135 
1136     if (messages || timers || events)
1137       SU_DEBUG_1(("su_osx_port_run(%p): %.6f: %u messages %u timers %u "
1138 		  "events slept %.6f/%.3f\n",
1139 		  self, su_time_diff(woken, started), messages, timers, events,
1140 		  su_time_diff(woken, bedtime), tout0 * 1e-3));
1141 
1142     if (!self->sup_base->sup_running)
1143       break;
1144   }
1145 }
1146 #endif
1147 
1148 /** @internal
1149  * The function @c su_osx_port_break() is used to terminate execution of @c
1150  * su_osx_port_run(). It can be called from a callback function.
1151  *
1152  * @param self     pointer to port
1153  *
1154  */
su_osx_port_break(su_port_t * self)1155 void su_osx_port_break(su_port_t *self)
1156 {
1157   if (self->sup_main_loop)
1158     CFRunLoopStop(self->sup_main_loop);
1159 
1160   self->sup_base->sup_running = 0;
1161 }
1162 
1163 /** @internal
1164  * The function @c su_osx_port_wait_events() is used to poll() for wait objects
1165  *
1166  * @param self     pointer to port
1167  * @param tout     timeout in milliseconds
1168  *
1169  * @return number of events handled
1170  */
1171 static
su_osx_port_wait_events(su_port_t * self,su_duration_t tout)1172 int su_osx_port_wait_events(su_port_t *self, su_duration_t tout)
1173 {
1174   int i, events = 0;
1175   su_wait_t *waits = self->sup_waits;
1176   unsigned n = self->sup_n_waits;
1177 #if HAVE_POLL
1178   unsigned version = self->sup_registers;
1179 #endif
1180   su_root_t *root;
1181 
1182   i = su_wait(waits, n, tout);
1183 
1184   if (i >= 0 && (unsigned)i < n) {
1185 #if HAVE_POLL
1186     /* poll() can return events for multiple wait objects */
1187     if (self->sup_multishot) {
1188       for (; i < n; i++) {
1189         if (waits[i].revents) {
1190           root = self->sup_wait_roots[i];
1191           self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
1192                                 &waits[i],
1193                                 self->sup_wait_args[i]);
1194           events++;
1195           /* Callback function used su_register()/su_deregister() */
1196           if (version != self->sup_registers)
1197             break;
1198         }
1199       }
1200     }
1201 #else /* !HAVE_POLL */
1202     if (0) {
1203     }
1204 #endif
1205     else {
1206       root = self->sup_wait_roots[i];
1207       self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
1208                             &self->sup_waits[i],
1209                             self->sup_wait_args[i]);
1210       events++;
1211     }
1212   }
1213 
1214   return events;
1215 }
1216 
1217 /** @internal Block until wait object is signaled or timeout.
1218  *
1219  * This function waits for wait objects and the timers associated with
1220  * the root object.  When any wait object is signaled or timer is
1221  * expired, it invokes the callbacks.
1222  *
1223  *   This function returns when a callback has been invoked or @c tout
1224  *   milliseconds is elapsed.
1225  *
1226  * @param self     pointer to port
1227  * @param tout     timeout in milliseconds
1228  *
1229  * @return
1230  *   Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if
1231  *   there are no active timers.
1232  */
su_osx_port_step(su_port_t * self,su_duration_t tout)1233 su_duration_t su_osx_port_step(su_port_t *self, su_duration_t tout)
1234 {
1235   CFRunLoopRef rl;
1236   su_time_t now = su_now();
1237   CFAbsoluteTime start;
1238   int ret, timeout = tout > INT32_MAX ? INT32_MAX : tout;
1239 
1240   rl = CFRunLoopGetCurrent();
1241 
1242   if (!rl)
1243     return -1;
1244 
1245   CFRunLoopWakeUp(rl);
1246 
1247   if (tout < timeout)
1248     timeout = tout;
1249 
1250   if (self->sup_base->sup_prepoll)
1251     self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1252 
1253   if (self->sup_base->sup_head)
1254     su_base_port_getmsgs(self);
1255 
1256   if (self->sup_base->sup_timers)
1257     su_timer_expire(&self->sup_base->sup_timers, &tout, now);
1258 
1259   /* if there are messages do a quick wait */
1260   if (self->sup_base->sup_head)
1261     tout = 0;
1262 
1263   ret = CFRunLoopRunInMode(kCFRunLoopDefaultMode,
1264 			   tout/1000000.0,
1265 			   true);
1266 
1267   CFRunLoopWakeUp(rl);
1268 
1269   if (self->sup_base->sup_head)
1270     su_base_port_getmsgs(self);
1271 
1272   if (self->sup_base->sup_timers)
1273     su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1274 
1275   if (self->sup_base->sup_head)
1276     tout = 0;
1277 
1278   return tout;
1279 }
1280