1 /*
2 * This file is part of the Sofia-SIP package
3 *
4 * Copyright (C) 2005 Nokia Corporation.
5 *
6 * Contact: Pekka Pessi <pekka.pessi@nokia.com>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public License
10 * as published by the Free Software Foundation; either version 2.1 of
11 * the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
21 * 02110-1301 USA
22 *
23 */
24
25 /**@ingroup su_wait
26 * @CFILE su_osx_runloop.c
27 *
28 * OS-Independent Socket Syncronization Interface.
29 *
30 * This looks like nth reincarnation of "reactor". It implements the
31 * poll/select/WaitForMultipleObjects and message passing functionality.
32 *
33 * @author Pekka Pessi <Pekka.Pessi@nokia.com>
34 * @author Martti Mela <martti.mela@nokia.com>
35 *
36 * @date Created: Tue Sep 14 15:51:04 1999 ppessi
37 */
38
39 #include "config.h"
40
41 #include <stdlib.h>
42 #include <assert.h>
43 #include <stdarg.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <limits.h>
47 #include <errno.h>
48
49 #define su_port_s su_osx_port_s
50
51 #include "su_port.h"
52 #include "sofia-sip/su_osx_runloop.h"
53 #include "sofia-sip/su_alloc.h"
54 #include "sofia-sip/su_debug.h"
55
56 #if HAVE_FUNC
57 #define enter (void)SU_DEBUG_9(("%s: entering\n", __func__))
58 #elif HAVE_FUNCTION
59 #define enter (void)SU_DEBUG_9(("%s: entering\n", __FUNCTION__))
60 #else
61 #define enter (void)0
62 #endif
63
64 static su_port_t *su_osx_runloop_create(void) __attribute__((__malloc__));
65
66 /* Callback for CFObserver and CFSocket */
67 static void cf_observer_cb(CFRunLoopObserverRef observer,
68 CFRunLoopActivity activity,
69 void *info);
70
71 static void su_osx_port_socket_cb(CFSocketRef s,
72 CFSocketCallBackType callbackType,
73 CFDataRef address,
74 const void *data,
75 void *info);
76
77 static void su_osx_port_deinit(void *arg);
78
su_osx_port_decref(su_port_t * self,int blocking,char const * who)79 static void su_osx_port_decref(su_port_t *self, int blocking, char const *who)
80 {
81 (void)su_base_port_decref(self, blocking, who);
82 }
83
84
85 static CFSocketCallBackType map_poll_event_to_cf_event(int events);
86
87 static int su_osx_port_send(su_port_t *self, su_msg_r rmsg);
88
89 static int su_osx_port_register(su_port_t *self,
90 su_root_t *root,
91 su_wait_t *wait,
92 su_wakeup_f callback,
93 su_wakeup_arg_t *arg,
94 int priority);
95 static int su_osx_port_unregister(su_port_t *port,
96 su_root_t *root,
97 su_wait_t *wait,
98 su_wakeup_f callback,
99 su_wakeup_arg_t *arg);
100
101 static int su_osx_port_deregister(su_port_t *self, int i);
102
103 static int su_osx_port_unregister_all(su_port_t *self,
104 su_root_t *root);
105
106 static int su_osx_port_eventmask(su_port_t *, int , int, int );
107 static void su_osx_port_run(su_port_t *self);
108 static void su_osx_port_break(su_port_t *self);
109 static su_duration_t su_osx_port_step(su_port_t *self, su_duration_t tout);
110
111 static int su_osx_port_multishot(su_port_t *port, int multishot);
112
113 static int su_osx_port_wait_events(su_port_t *self, su_duration_t tout);
114
su_osx_port_name(su_port_t const * self)115 static char const *su_osx_port_name(su_port_t const *self)
116 {
117 return "CFRunLoop";
118 }
119
120 /*
121 * Port is a per-thread reactor.
122 *
123 * Multiple root objects executed by single thread share a su_port_t object.
124 */
125 struct su_osx_port_s {
126 su_socket_port_t sup_socket[1];
127
128 #define sup_pthread sup_socket->sup_base
129 #define sup_base sup_socket->sup_base->sup_base
130 #define sup_home sup_socket->sup_base->sup_base->sup_home
131
132 unsigned sup_source_fired;
133
134 CFRunLoopRef sup_main_loop;
135 CFRunLoopSourceRef *sup_sources;
136 CFSocketRef *sup_sockets;
137
138 CFRunLoopObserverRef sup_observer;
139 CFRunLoopObserverContext sup_observer_cntx[1];
140 /* Struct for CFSocket callbacks; contains current CFSource index */
141 struct osx_magic {
142 su_port_t *o_port;
143 int o_current;
144 int o_count;
145 } osx_magic[1];
146
147 unsigned sup_multishot; /**< Multishot operation? */
148
149 unsigned sup_registers; /** Counter incremented by
150 su_port_register() or
151 su_port_unregister()
152 */
153 int sup_n_waits; /**< Active su_wait_t in su_waits */
154 int sup_size_waits; /**< Size of allocate su_waits */
155
156 int sup_pri_offset; /**< Offset to prioritized waits */
157
158 #define INDEX_MAX (0x7fffffff)
159
160 /** Indices from index returned by su_root_register() to tables below.
161 *
162 * Free elements are negative. Free elements form a list, value of free
163 * element is (0 - index of next free element).
164 *
165 * First element sup_indices[0] points to first free element.
166 */
167 int *sup_indices;
168
169 int *sup_reverses; /** Reverse index */
170 su_wakeup_f *sup_wait_cbs;
171 su_wakeup_arg_t**sup_wait_args;
172 su_root_t **sup_wait_roots;
173
174 su_wait_t *sup_waits;
175 };
176
177
178 su_port_vtable_t const su_osx_port_vtable[1] =
179 {{
180 /* su_vtable_size: */ sizeof su_osx_port_vtable,
181 su_pthread_port_lock,
182 su_pthread_port_unlock,
183 su_base_port_incref,
184 su_osx_port_decref,
185 su_base_port_gsource,
186 su_osx_port_send,
187 su_osx_port_register,
188 su_osx_port_unregister,
189 su_osx_port_deregister,
190 su_osx_port_unregister_all,
191 su_osx_port_eventmask,
192 su_osx_port_run,
193 su_osx_port_break,
194 su_osx_port_step,
195 su_pthread_port_thread,
196 su_base_port_add_prepoll,
197 su_base_port_remove_prepoll,
198 su_base_port_timers,
199 su_osx_port_multishot,
200 su_osx_port_wait_events,
201 su_base_port_getmsgs,
202 su_base_port_getmsgs_from,
203 su_osx_port_name,
204 su_base_port_start_shared,
205 su_pthread_port_wait,
206 su_pthread_port_execute,
207 su_base_port_deferrable,
208 su_base_port_max_defer,
209 su_socket_port_wakeup,
210 su_base_port_is_running,
211 }};
212
213 /* XXX - mela static void su_osx_port_destroy(su_port_t *self); */
214
215 /** Create a reactor object.
216 *
217 * Allocate and initialize the instance of su_root_t.
218 *
219 * @param magic pointer to user data
220 *
221 * @return A pointer to allocated su_root_t instance, NULL on error.
222 *
223 * @NEW_1_12_4.
224 */
su_root_osx_runloop_create(su_root_magic_t * magic)225 su_root_t *su_root_osx_runloop_create(su_root_magic_t *magic)
226 {
227 return su_root_create_with_port(magic, su_osx_runloop_create());
228 }
229
osx_enabler_cb(CFSocketRef s,CFSocketCallBackType type,CFDataRef address,const void * data,void * info)230 void osx_enabler_cb(CFSocketRef s,
231 CFSocketCallBackType type,
232 CFDataRef address,
233 const void *data,
234 void *info)
235 {
236 CFRunLoopRef rl;
237 struct osx_magic *magic = (struct osx_magic *) info;
238 su_port_t *self = magic->o_port;
239 su_duration_t tout = 0;
240 su_time_t now = su_now();
241
242 rl = CFRunLoopGetCurrent();
243
244 if (self->sup_base->sup_running) {
245
246 if (self->sup_base->sup_prepoll)
247 self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
248
249 if (self->sup_base->sup_head)
250 su_base_port_getmsgs(self);
251
252 if (self->sup_base->sup_timers)
253 su_timer_expire(&self->sup_base->sup_timers, &tout, now);
254 }
255
256 CFRunLoopWakeUp(rl);
257 }
258
259
260 /**@internal
261 *
262 * Allocates and initializes a message port.
263 *
264 * @return
265 * If successful a pointer to the new message port is returned, otherwise
266 * NULL is returned.
267 */
su_osx_runloop_create(void)268 su_port_t *su_osx_runloop_create(void)
269 {
270 su_port_t *self = su_home_new(sizeof *self);
271
272 if (!self)
273 return self;
274
275 enter;
276
277 if (su_home_destructor(su_port_home(self), su_osx_port_deinit) < 0)
278 return su_home_unref(su_port_home(self)), NULL;
279
280 self->sup_multishot = SU_ENABLE_MULTISHOT_POLL;
281
282 if (su_socket_port_init(self->sup_base, su_osx_port_vtable) == 0) {
283 self->osx_magic->o_port = self;
284 self->sup_observer_cntx->info = self->osx_magic;
285 self->sup_observer =
286 CFRunLoopObserverCreate(NULL,
287 kCFRunLoopAfterWaiting | kCFRunLoopBeforeWaiting,
288 TRUE, 0, cf_observer_cb, self->sup_observer_cntx);
289 #if 0
290 CFRunLoopAddObserver(CFRunLoopGetCurrent(),
291 self->sup_observer,
292 kCFRunLoopDefaultMode);
293 #endif
294 }
295 else
296 return su_home_unref(su_port_home(self)), NULL;
297
298 return self;
299 }
300
301 static
cf_observer_cb(CFRunLoopObserverRef observer,CFRunLoopActivity activity,void * info)302 void cf_observer_cb(CFRunLoopObserverRef observer,
303 CFRunLoopActivity activity,
304 void *info)
305 {
306 CFRunLoopRef rl;
307 struct osx_magic *magic = (struct osx_magic *) info;
308 su_port_t *self = magic->o_port;
309 su_duration_t tout = 0;
310 su_time_t now = su_now();
311
312 rl = CFRunLoopGetCurrent();
313
314 if (self->sup_base->sup_running) {
315
316 if (self->sup_base->sup_prepoll)
317 self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
318
319 if (self->sup_base->sup_head)
320 su_port_getmsgs(self);
321
322 if (self->sup_base->sup_timers)
323 su_timer_expire(&self->sup_base->sup_timers, &tout, now);
324 } else
325 SU_DEBUG_9(("cf_observer_cb(): PORT IS NOT RUNNING!\n"));
326
327 CFRunLoopWakeUp(rl);
328
329 return;
330 }
331
332 /** @internal Destroy a port. */
su_osx_port_deinit(void * arg)333 static void su_osx_port_deinit(void *arg)
334 {
335 su_port_t *self = arg;
336
337 SU_DEBUG_9(("%s(%p) called\n", "su_osx_port_deinit", (void *)self));
338
339 su_socket_port_deinit(self->sup_base);
340 }
341
342 static
map_poll_event_to_cf_event(int events)343 CFSocketCallBackType map_poll_event_to_cf_event(int events)
344 {
345 CFSocketCallBackType type = 0;
346
347 if (events & SU_WAIT_IN)
348 type |= kCFSocketReadCallBack;
349
350 if (events & SU_WAIT_OUT)
351 type |= kCFSocketWriteCallBack;
352
353 #if 0
354 if (events & SU_WAIT_CONNECT)
355 type |= kCFSocketConnectCallBack;
356
357 if (events & SU_WAIT_ACCEPT)
358 type |= kCFSocketAcceptCallBack;
359 #endif
360
361 return type;
362 }
363
364
365 #if 0
366 static
367 int map_cf_event_to_poll_event(CFSocketCallBackType type)
368 {
369 int event = 0;
370
371 if (type & kCFSocketReadCallBack)
372 event |= SU_WAIT_IN;
373
374 if (type & kCFSocketWriteCallBack)
375 event |= SU_WAIT_OUT;
376
377 if (type & kCFSocketConnectCallBack)
378 event |= SU_WAIT_CONNECT;
379
380 if (type & kCFSocketAcceptCallBack)
381 event |= SU_WAIT_ACCEPT;
382
383 return event;
384 }
385 #endif
386
387 static
su_osx_port_socket_cb(CFSocketRef s,CFSocketCallBackType type,CFDataRef address,const void * data,void * info)388 void su_osx_port_socket_cb(CFSocketRef s,
389 CFSocketCallBackType type,
390 CFDataRef address,
391 const void *data,
392 void *info)
393 {
394 struct osx_magic *magic = (struct osx_magic *) info;
395 su_port_t *self = magic->o_port;
396 int curr = magic->o_current;
397 su_duration_t tout = 0;
398
399 #if SU_HAVE_POLL
400 {
401 su_root_t *root;
402 su_wait_t *waits = self->sup_waits;
403 int n = self->sup_indices[curr];
404
405 assert(self->sup_reverses[n] == curr);
406
407 SU_DEBUG_9(("socket_cb(%p): count %u index %d\n", self->sup_sources[n], magic->o_count, curr));
408
409 waits[n].revents = map_poll_event_to_cf_event(type);
410
411 root = self->sup_wait_roots[n];
412 self->sup_wait_cbs[n](root ? su_root_magic(root) : NULL,
413 &waits[n],
414 self->sup_wait_args[n]);
415
416 if (self->sup_base->sup_running) {
417 su_port_getmsgs(self);
418
419 if (self->sup_base->sup_timers)
420 su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
421
422 if (self->sup_base->sup_head)
423 tout = 0;
424
425 /* CFRunLoopWakeUp(CFRunLoopGetCurrent()); */
426 }
427
428 /* Tell to run loop an su socket fired */
429 self->sup_source_fired = 1;
430 }
431 #endif
432
433 }
434
435 /** @internal Send a message to the port. */
su_osx_port_send(su_port_t * self,su_msg_r rmsg)436 int su_osx_port_send(su_port_t *self, su_msg_r rmsg)
437 {
438 CFRunLoopRef rl;
439
440 if (self) {
441 int wakeup;
442
443 //XXX - mela SU_OSX_PORT_LOCK(self, "su_osx_port_send");
444
445 wakeup = self->sup_base->sup_head == NULL;
446
447 *self->sup_base->sup_tail = rmsg[0]; rmsg[0] = NULL;
448 self->sup_base->sup_tail = &(*self->sup_base->sup_tail)->sum_next;
449
450 #if SU_HAVE_MBOX
451 /* if (!pthread_equal(pthread_self(), self->sup_tid)) */
452 if (wakeup)
453 {
454 assert(self->sup_mbox[MBOX_SEND] != INVALID_SOCKET);
455
456 if (send(self->sup_mbox[MBOX_SEND], "X", 1, 0) == -1) {
457 #if HAVE_SOCKETPAIR
458 if (su_errno() != EWOULDBLOCK)
459 #endif
460 su_perror("su_msg_send: send()");
461 }
462 }
463 #endif
464
465 //XXX - mela SU_OSX_PORT_UNLOCK(self, "su_osx_port_send");
466
467 rl = CFRunLoopGetCurrent();
468 CFRunLoopWakeUp(rl);
469
470 return 0;
471 }
472 else {
473 su_msg_destroy(rmsg);
474 return -1;
475 }
476 }
477 static int o_count;
478
479 /** @internal
480 *
481 * Register a @c su_wait_t object. The wait object, a callback function and
482 * a argument pointer is stored in the port object. The callback function
483 * will be called when the wait object is signaled.
484 *
485 * Please note if identical wait objects are inserted, only first one is
486 * ever signalled.
487 *
488 * @param self pointer to port
489 * @param root pointer to root object
490 * @param waits pointer to wait object
491 * @param callback callback function pointer
492 * @param arg argument given to callback function when it is invoked
493 * @param priority relative priority of the wait object
494 * (0 is normal, 1 important, 2 realtime)
495 *
496 * @return
497 * The function @su_osx_port_register returns nonzero index of the wait object,
498 * or -1 upon an error. */
su_osx_port_register(su_port_t * self,su_root_t * root,su_wait_t * wait,su_wakeup_f callback,su_wakeup_arg_t * arg,int priority)499 int su_osx_port_register(su_port_t *self,
500 su_root_t *root,
501 su_wait_t *wait,
502 su_wakeup_f callback,
503 su_wakeup_arg_t *arg,
504 int priority)
505 {
506 int i, j, n;
507 CFRunLoopRef rl;
508 CFRunLoopSourceRef *sources, source;
509 CFSocketRef cf_socket, *sockets;
510 int events = 0;
511 struct osx_magic *osx_magic = NULL;
512 CFSocketContext cf_socket_cntx[1] = {{0, NULL, NULL, NULL, NULL}};
513 CFOptionFlags flags = 0;
514
515 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
516
517 n = self->sup_n_waits;
518
519 if (n >= SU_WAIT_MAX)
520 return su_seterrno(ENOMEM);
521
522 if (n >= self->sup_size_waits) {
523 /* Reallocate size arrays */
524 int size;
525 int *indices;
526 int *reverses;
527 su_wait_t *waits;
528 su_wakeup_f *wait_cbs;
529 su_wakeup_arg_t **wait_args;
530 su_root_t **wait_tasks;
531
532 if (self->sup_size_waits == 0)
533 size = su_root_size_hint;
534 else
535 size = 2 * self->sup_size_waits;
536
537 if (size < SU_WAIT_MIN)
538 size = SU_WAIT_MIN;
539
540 /* Too large */
541 if (-3 - size > 0)
542 return (errno = ENOMEM), -1;
543
544 indices = realloc(self->sup_indices, (size + 1) * sizeof(*indices));
545 if (indices) {
546 self->sup_indices = indices;
547
548 for (i = self->sup_size_waits; i <= size; i++)
549 indices[i] = -1 - i;
550 }
551
552 reverses = realloc(self->sup_reverses, size * sizeof(*waits));
553 if (reverses) {
554 for (i = self->sup_size_waits; i < size; i++)
555 reverses[i] = -1;
556 self->sup_reverses = reverses;
557 }
558
559 sources = realloc(self->sup_sources, size * sizeof(*sources));
560 if (sources)
561 self->sup_sources = sources;
562
563 sockets = realloc(self->sup_sockets, size * sizeof(*sockets));
564 if (sockets)
565 self->sup_sockets = sockets;
566
567 waits = realloc(self->sup_waits, size * sizeof(*waits));
568 if (waits)
569 self->sup_waits = waits;
570
571 wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs));
572 if (wait_cbs)
573 self->sup_wait_cbs = wait_cbs;
574
575 wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args));
576 if (wait_args)
577 self->sup_wait_args = wait_args;
578
579 /* Add sup_wait_roots array, if needed */
580 wait_tasks = realloc(self->sup_wait_roots, size * sizeof(*wait_tasks));
581 if (wait_tasks)
582 self->sup_wait_roots = wait_tasks;
583
584 if (!(indices &&
585 reverses && sources && sockets && waits && wait_cbs && wait_args && wait_tasks)) {
586 return -1;
587 }
588
589 self->sup_size_waits = size;
590 }
591
592 i = -self->sup_indices[0]; assert(i <= self->sup_size_waits);
593
594 if (priority > 0) {
595 /* Insert */
596 for (n = self->sup_n_waits; n > 0; n--) {
597 j = self->sup_reverses[n-1]; assert(self->sup_indices[j] == n - 1);
598 self->sup_indices[j] = n;
599 self->sup_reverses[n] = self->sup_reverses[n-1];
600 self->sup_sources[n] = self->sup_sources[n-1];
601 self->sup_sockets[n] = self->sup_sockets[n-1];
602 self->sup_waits[n] = self->sup_waits[n-1];
603 self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1];
604 self->sup_wait_args[n] = self->sup_wait_args[n-1];
605 self->sup_wait_roots[n] = self->sup_wait_roots[n-1];
606 }
607
608 self->sup_pri_offset++;
609 }
610 else {
611 /* Append - no need to move anything */
612 n = self->sup_n_waits;
613 }
614
615 self->sup_n_waits++;
616
617 self->sup_indices[0] = self->sup_indices[i]; /* Free index */
618 self->sup_indices[i] = n;
619
620 self->sup_reverses[n] = i;
621 self->sup_waits[n] = *wait;
622 self->sup_wait_cbs[n] = callback;
623 self->sup_wait_args[n] = arg;
624 self->sup_wait_roots[n] = root;
625
626 self->sup_registers++;
627
628 /* XXX -- mela: leak, leak -- free() somewheeeere */
629 osx_magic = calloc(1, sizeof(*osx_magic));
630 osx_magic->o_port = self;
631 osx_magic->o_current = i;
632 osx_magic->o_count = ++o_count;
633 cf_socket_cntx->info = osx_magic;
634
635 events = map_poll_event_to_cf_event(wait->events);
636
637 cf_socket = CFSocketCreateWithNative(NULL,
638 (CFSocketNativeHandle) su_wait_socket(wait),
639 events, su_osx_port_socket_cb, cf_socket_cntx);
640
641 flags = CFSocketGetSocketFlags(cf_socket);
642 flags &= ~kCFSocketCloseOnInvalidate;
643
644 CFSocketSetSocketFlags(cf_socket, flags);
645
646 CFRetain(cf_socket);
647 source = CFSocketCreateRunLoopSource(NULL, cf_socket, 0);
648
649 SU_DEBUG_9(("source(%p): count %u index %d\n", source, o_count, i));
650
651 rl = CFRunLoopGetCurrent();
652
653 CFRunLoopAddSource(rl, source, kCFRunLoopDefaultMode);
654
655 CFRetain(source);
656 self->sup_sources[n] = source;
657 self->sup_sockets[n] = cf_socket;
658
659 CFRunLoopWakeUp(rl);
660
661 /* Just like epoll, we return -1 or positive integer */
662
663 return i;
664 }
665
666 /** Deregister a su_wait_t object. */
667 static
su_osx_port_deregister0(su_port_t * self,int i)668 int su_osx_port_deregister0(su_port_t *self, int i)
669 {
670 CFRunLoopRef rl;
671 int n, N, *indices, *reverses;
672
673 indices = self->sup_indices;
674 reverses = self->sup_reverses;
675
676 n = indices[i]; assert(n >= 0); assert(i == reverses[n]);
677
678 N = --self->sup_n_waits;
679
680 rl = CFRunLoopGetCurrent();
681 CFSocketInvalidate(self->sup_sockets[n]);
682 CFRelease(self->sup_sockets[n]);
683 CFRunLoopRemoveSource(rl, self->sup_sources[n], kCFRunLoopDefaultMode);
684 CFRelease(self->sup_sources[n]);
685
686 CFRunLoopWakeUp(rl);
687
688 if (n < self->sup_pri_offset) {
689 int j = --self->sup_pri_offset;
690 if (n != j) {
691 assert(reverses[j] > 0);
692 assert(indices[reverses[j]] == j);
693 indices[reverses[j]] = n;
694 reverses[n] = reverses[j];
695
696 self->sup_sources[n] = self->sup_sources[j];
697 self->sup_sockets[n] = self->sup_sockets[j];
698 self->sup_waits[n] = self->sup_waits[j];
699 self->sup_wait_cbs[n] = self->sup_wait_cbs[j];
700 self->sup_wait_args[n] = self->sup_wait_args[j];
701 self->sup_wait_roots[n] = self->sup_wait_roots[j];
702 n = j;
703 }
704 }
705
706 if (n < N) {
707 assert(reverses[N] > 0);
708 assert(indices[reverses[N]] == N);
709
710 indices[reverses[N]] = n;
711 reverses[n] = reverses[N];
712
713 self->sup_sources[n] = self->sup_sources[N];
714 self->sup_sockets[n] = self->sup_sockets[N];
715 self->sup_waits[n] = self->sup_waits[N];
716 self->sup_wait_cbs[n] = self->sup_wait_cbs[N];
717 self->sup_wait_args[n] = self->sup_wait_args[N];
718 self->sup_wait_roots[n] = self->sup_wait_roots[N];
719 n = N;
720 }
721
722
723 reverses[n] = -1;
724 memset(&self->sup_waits[n], 0, sizeof self->sup_waits[n]);
725 self->sup_sources[n] = NULL;
726 self->sup_sockets[n] = NULL;
727 self->sup_wait_cbs[n] = NULL;
728 self->sup_wait_args[n] = NULL;
729 self->sup_wait_roots[n] = NULL;
730
731 indices[i] = indices[0];
732 indices[0] = -i;
733
734 self->sup_registers++;
735
736 return i;
737 }
738
739
740 /** Unregister a su_wait_t object.
741 *
742 * The function su_osx_port_unregister() unregisters a su_wait_t object. The
743 * wait object, a callback function and a argument are removed from the
744 * port object.
745 *
746 * @param self - pointer to port object
747 * @param root - pointer to root object
748 * @param wait - pointer to wait object
749 * @param callback - callback function pointer (may be NULL)
750 * @param arg - argument given to callback function when it is invoked
751 * (may be NULL)
752 *
753 * @return Nonzero index of the wait object, or -1 upon an error.
754 */
su_osx_port_unregister(su_port_t * self,su_root_t * root,su_wait_t * wait,su_wakeup_f callback,su_wakeup_arg_t * arg)755 int su_osx_port_unregister(su_port_t *self,
756 su_root_t *root,
757 su_wait_t *wait,
758 su_wakeup_f callback, /* XXX - ignored */
759 su_wakeup_arg_t *arg)
760 {
761 int n, N;
762
763 assert(self);
764 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
765
766 N = self->sup_n_waits;
767
768 for (n = 0; n < N; n++) {
769 if (SU_WAIT_CMP(wait[0], self->sup_waits[n]) == 0) {
770 return su_osx_port_deregister0(self, self->sup_reverses[n]);
771 }
772 }
773
774 su_seterrno(ENOENT);
775
776 return -1;
777 }
778
779 /** Deregister a su_wait_t object.
780 *
781 * The function su_osx_port_deregister() deregisters a su_wait_t registrattion.
782 * The wait object, a callback function and a argument are removed from the
783 * port object.
784 *
785 * @param self - pointer to port object
786 * @param i - registration index
787 *
788 * @return Index of the wait object, or -1 upon an error.
789 */
su_osx_port_deregister(su_port_t * self,int i)790 int su_osx_port_deregister(su_port_t *self, int i)
791 {
792 su_wait_t wait[1] = { SU_WAIT_INIT };
793 int retval;
794
795 assert(self);
796 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
797
798 if (i <= 0 || i > self->sup_size_waits)
799 return su_seterrno(EBADF);
800
801 if (self->sup_indices[i] < 0)
802 return su_seterrno(EBADF);
803
804 retval = su_osx_port_deregister0(self, i);
805
806 su_wait_destroy(wait);
807
808 return retval;
809 }
810
811
812 /** @internal
813 * Unregister all su_wait_t objects.
814 *
815 * The function su_osx_port_unregister_all() unregisters all su_wait_t objects
816 * and destroys all queued timers associated with given root object.
817 *
818 * @param self - pointer to port object
819 * @param root - pointer to root object
820 *
821 * @return Number of wait objects removed.
822 */
su_osx_port_unregister_all(su_port_t * self,su_root_t * root)823 int su_osx_port_unregister_all(su_port_t *self,
824 su_root_t *root)
825 {
826 int i, j, index, N;
827 int *indices, *reverses;
828 su_wait_t *waits;
829 su_wakeup_f *wait_cbs;
830 su_wakeup_arg_t **wait_args;
831 su_root_t **wait_roots;
832 CFRunLoopRef rl;
833 CFRunLoopSourceRef *sources;
834 CFSocketRef *sockets;
835
836 // XXX - assert(SU_OSX_PORT_OWN_THREAD(self));
837
838 N = self->sup_n_waits;
839 indices = self->sup_indices;
840 reverses = self->sup_reverses;
841 sources = self->sup_sources;
842 sockets = self->sup_sockets;
843 waits = self->sup_waits;
844 wait_cbs = self->sup_wait_cbs;
845 wait_args = self->sup_wait_args;
846 wait_roots = self->sup_wait_roots;
847
848 rl = CFRunLoopGetCurrent();
849
850 for (i = j = 0; i < N; i++) {
851 index = reverses[i]; assert(index > 0 && indices[index] == i);
852
853 if (wait_roots[i] == root) {
854 if (i < self->sup_pri_offset)
855 self->sup_pri_offset--;
856
857 indices[index] = indices[0];
858 indices[0] = -index;
859 continue;
860 }
861
862 if (i != j) {
863 indices[index] = j;
864
865 CFSocketInvalidate(self->sup_sockets[j]);
866 CFRelease(self->sup_sockets[j]);
867 CFRunLoopRemoveSource(rl, sources[j], kCFRunLoopDefaultMode);
868 CFRelease(sources[j]);
869
870 reverses[j] = reverses[i];
871 sources[j] = sources[i];
872 sockets[j] = sockets[i];
873 waits[j] = waits[i];
874 wait_cbs[j] = wait_cbs[i];
875 wait_args[j] = wait_args[i];
876 wait_roots[j] = wait_roots[i];
877 }
878
879 j++;
880 }
881
882 /* Prepare for removing CFSources */
883 for (i = j; i < N; i++) {
884 reverses[i] = -1;
885
886 CFSocketInvalidate(self->sup_sockets[i]);
887 CFRelease(self->sup_sockets[i]);
888 CFRunLoopRemoveSource(rl, sources[i], kCFRunLoopDefaultMode);
889 CFRunLoopSourceInvalidate(sources[i]);
890
891 sources[i] = NULL;
892 sockets[i] = NULL;
893 wait_cbs[i] = NULL;
894 wait_args[i] = NULL;
895 wait_roots[i] = NULL;
896 }
897 memset(&waits[j], 0, (char *)&waits[N] - (char *)&waits[j]);
898
899 /* Tell run loop things have changed */
900 CFRunLoopWakeUp(rl);
901
902 self->sup_n_waits = j;
903 self->sup_registers++;
904
905 return N - j;
906 }
907
908 /**Set mask for a registered event. @internal
909 *
910 * The function su_osx_port_eventmask() sets the mask describing events that can
911 * signal the registered callback.
912 *
913 * @param port pointer to port object
914 * @param index registration index
915 * @param socket socket
916 * @param events new event mask
917 *
918 * @retval 0 when successful,
919 * @retval -1 upon an error.
920 */
su_osx_port_eventmask(su_port_t * self,int index,int socket,int events)921 int su_osx_port_eventmask(su_port_t *self, int index, int socket, int events)
922 {
923 int n, ret;
924
925 assert(self);
926 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
927
928 if (index <= 0 || index > self->sup_size_waits)
929 return su_seterrno(EBADF);
930 n = self->sup_indices[index];
931 if (n < 0)
932 return su_seterrno(EBADF);
933
934 ret = su_wait_mask(&self->sup_waits[n], socket, events);
935
936 CFSocketSetSocketFlags(self->sup_sockets[n],
937 map_poll_event_to_cf_event(events));
938
939 return ret;
940 }
941
942 /** @internal
943 *
944 * Copies the su_wait_t objects from the port. The number of wait objects
945 * can be found out by calling su_osx_port_query() with @a n_waits as zero.
946 *
947 * @note This function is called only by friends.
948 *
949 * @param self - pointer to port object
950 * @param waits - pointer to array to which wait objects are copied
951 * @param n_waits - number of wait objects fitting in array waits
952 *
953 * @return Number of wait objects, or 0 upon an error.
954 */
su_osx_port_query(su_port_t * self,su_wait_t * waits,unsigned n_waits)955 unsigned su_osx_port_query(su_port_t *self, su_wait_t *waits, unsigned n_waits)
956 {
957 unsigned n;
958
959 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
960
961 n = self->sup_n_waits;
962
963 if (n_waits != 0) {
964 if (waits && n_waits >= n)
965 memcpy(waits, self->sup_waits, n * sizeof(*waits));
966 else
967 n = 0;
968 }
969
970 return n;
971 }
972
973 /** @internal Enable multishot mode.
974 *
975 * The function su_osx_port_multishot() enables, disables or queries the
976 * multishot mode for the port. The multishot mode determines how the events
977 * are scheduled by port. If multishot mode is enabled, port serves all the
978 * sockets that have received network events. If it is disables, only first
979 * socket event is served.
980 *
981 * @param self pointer to port object
982 * @param multishot multishot mode (0 => disables, 1 => enables, -1 => query)
983 *
984 * @retval 0 multishot mode is disabled
985 * @retval 1 multishot mode is enabled
986 * @retval -1 an error occurred
987 */
su_osx_port_multishot(su_port_t * self,int multishot)988 int su_osx_port_multishot(su_port_t *self, int multishot)
989 {
990 if (multishot < 0)
991 return self->sup_multishot;
992 else if (multishot == 0 || multishot == 1)
993 return self->sup_multishot = multishot;
994 else
995 return (errno = EINVAL), -1;
996 }
997
998 #if 0
999 /** @internal Enable threadsafe operation. */
1000 static
1001 int su_osx_port_threadsafe(su_port_t *port)
1002 {
1003 return su_home_threadsafe(port->sup_home);
1004 }
1005 #endif
1006
1007 /** Prepare root to be run on OSX Run Loop.
1008 *
1009 * Sets #su_root_t object to be callable by the application's run loop. This
1010 * function is to be used instead of su_root_run() for OSX applications
1011 * using Core Foundation's Run Loop.
1012 *
1013 * The function su_root_osx_prepare_run() returns immmediately.
1014 *
1015 * @param root pointer to root object
1016 *
1017 * @NEW_1_12_4.
1018 */
su_root_osx_prepare_run(su_root_t * root)1019 void su_root_osx_prepare_run(su_root_t *root)
1020 {
1021 su_port_t *self = root->sur_task->sut_port;
1022 CFRunLoopRef rl;
1023 su_duration_t tout = 0;
1024
1025 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
1026
1027 enter;
1028
1029 self->sup_base->sup_running = 1;
1030 rl = CFRunLoopGetCurrent();
1031
1032 if (self->sup_base->sup_prepoll)
1033 self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1034
1035 if (self->sup_base->sup_head)
1036 su_port_getmsgs(self);
1037
1038 if (self->sup_base->sup_timers)
1039 su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1040
1041 if (!self->sup_base->sup_running)
1042 return;
1043
1044 CFRetain(rl);
1045 self->sup_main_loop = rl;
1046
1047 return;
1048 }
1049
1050 /** @internal Main loop.
1051 *
1052 * The function @c su_osx_port_run() waits for wait objects and the timers
1053 * associated with the port object. When any wait object is signaled or
1054 * timer is expired, it invokes the callbacks, and returns waiting.
1055 *
1056 * The function @c su_osx_port_run() runs until @c su_osx_port_break() is called
1057 * from a callback.
1058 *
1059 * @param self pointer to port object
1060 *
1061 */
su_osx_port_run(su_port_t * self)1062 void su_osx_port_run(su_port_t *self)
1063 {
1064 CFRunLoopRef rl;
1065 su_duration_t tout = 0;
1066
1067 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
1068
1069 enter;
1070
1071 self->sup_base->sup_running = 1;
1072 rl = CFRunLoopGetCurrent();
1073
1074 if (self->sup_base->sup_prepoll)
1075 self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1076
1077 if (self->sup_base->sup_head)
1078 su_port_getmsgs(self);
1079
1080 if (self->sup_base->sup_timers)
1081 su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1082
1083 if (!self->sup_base->sup_running)
1084 return;
1085
1086 CFRetain(rl);
1087 self->sup_main_loop = rl;
1088
1089 /* if there are messages do a quick wait */
1090 if (self->sup_base->sup_head)
1091 tout = 0;
1092
1093 CFRunLoopRun();
1094
1095 self->sup_main_loop = NULL;
1096
1097 }
1098
1099 #if tuning
1100 /* This version can help tuning... */
su_osx_port_run_tune(su_port_t * self)1101 void su_osx_port_run_tune(su_port_t *self)
1102 {
1103 int i;
1104 int timers = 0, messages = 0, events = 0;
1105 su_duration_t tout = 0, tout0;
1106 su_time_t started = su_now(), woken = started, bedtime = woken;
1107
1108 // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self));
1109
1110 for (self->sup_base->sup_running = 1; self->sup_base->sup_running;) {
1111 tout0 = tout, tout = 2000;
1112
1113 timers = 0, messages = 0;
1114
1115 if (self->sup_base->sup_prepoll)
1116 self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1117
1118 if (self->sup_base->sup_head)
1119 messages = su_port_getmsgs(self);
1120
1121 if (self->sup_base->sup_timers)
1122 timers = su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1123
1124 if (!self->sup_base->sup_running)
1125 break;
1126
1127 if (self->sup_base->sup_head) /* if there are messages do a quick wait */
1128 tout = 0;
1129
1130 bedtime = su_now();
1131
1132 events = su_osx_port_wait_events(self, tout);
1133
1134 woken = su_now();
1135
1136 if (messages || timers || events)
1137 SU_DEBUG_1(("su_osx_port_run(%p): %.6f: %u messages %u timers %u "
1138 "events slept %.6f/%.3f\n",
1139 self, su_time_diff(woken, started), messages, timers, events,
1140 su_time_diff(woken, bedtime), tout0 * 1e-3));
1141
1142 if (!self->sup_base->sup_running)
1143 break;
1144 }
1145 }
1146 #endif
1147
1148 /** @internal
1149 * The function @c su_osx_port_break() is used to terminate execution of @c
1150 * su_osx_port_run(). It can be called from a callback function.
1151 *
1152 * @param self pointer to port
1153 *
1154 */
su_osx_port_break(su_port_t * self)1155 void su_osx_port_break(su_port_t *self)
1156 {
1157 if (self->sup_main_loop)
1158 CFRunLoopStop(self->sup_main_loop);
1159
1160 self->sup_base->sup_running = 0;
1161 }
1162
1163 /** @internal
1164 * The function @c su_osx_port_wait_events() is used to poll() for wait objects
1165 *
1166 * @param self pointer to port
1167 * @param tout timeout in milliseconds
1168 *
1169 * @return number of events handled
1170 */
1171 static
su_osx_port_wait_events(su_port_t * self,su_duration_t tout)1172 int su_osx_port_wait_events(su_port_t *self, su_duration_t tout)
1173 {
1174 int i, events = 0;
1175 su_wait_t *waits = self->sup_waits;
1176 unsigned n = self->sup_n_waits;
1177 #if HAVE_POLL
1178 unsigned version = self->sup_registers;
1179 #endif
1180 su_root_t *root;
1181
1182 i = su_wait(waits, n, tout);
1183
1184 if (i >= 0 && (unsigned)i < n) {
1185 #if HAVE_POLL
1186 /* poll() can return events for multiple wait objects */
1187 if (self->sup_multishot) {
1188 for (; i < n; i++) {
1189 if (waits[i].revents) {
1190 root = self->sup_wait_roots[i];
1191 self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
1192 &waits[i],
1193 self->sup_wait_args[i]);
1194 events++;
1195 /* Callback function used su_register()/su_deregister() */
1196 if (version != self->sup_registers)
1197 break;
1198 }
1199 }
1200 }
1201 #else /* !HAVE_POLL */
1202 if (0) {
1203 }
1204 #endif
1205 else {
1206 root = self->sup_wait_roots[i];
1207 self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL,
1208 &self->sup_waits[i],
1209 self->sup_wait_args[i]);
1210 events++;
1211 }
1212 }
1213
1214 return events;
1215 }
1216
1217 /** @internal Block until wait object is signaled or timeout.
1218 *
1219 * This function waits for wait objects and the timers associated with
1220 * the root object. When any wait object is signaled or timer is
1221 * expired, it invokes the callbacks.
1222 *
1223 * This function returns when a callback has been invoked or @c tout
1224 * milliseconds is elapsed.
1225 *
1226 * @param self pointer to port
1227 * @param tout timeout in milliseconds
1228 *
1229 * @return
1230 * Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if
1231 * there are no active timers.
1232 */
su_osx_port_step(su_port_t * self,su_duration_t tout)1233 su_duration_t su_osx_port_step(su_port_t *self, su_duration_t tout)
1234 {
1235 CFRunLoopRef rl;
1236 su_time_t now = su_now();
1237 CFAbsoluteTime start;
1238 int ret, timeout = tout > INT32_MAX ? INT32_MAX : tout;
1239
1240 rl = CFRunLoopGetCurrent();
1241
1242 if (!rl)
1243 return -1;
1244
1245 CFRunLoopWakeUp(rl);
1246
1247 if (tout < timeout)
1248 timeout = tout;
1249
1250 if (self->sup_base->sup_prepoll)
1251 self->sup_base->sup_prepoll(self->sup_base->sup_pp_magic, self->sup_base->sup_pp_root);
1252
1253 if (self->sup_base->sup_head)
1254 su_base_port_getmsgs(self);
1255
1256 if (self->sup_base->sup_timers)
1257 su_timer_expire(&self->sup_base->sup_timers, &tout, now);
1258
1259 /* if there are messages do a quick wait */
1260 if (self->sup_base->sup_head)
1261 tout = 0;
1262
1263 ret = CFRunLoopRunInMode(kCFRunLoopDefaultMode,
1264 tout/1000000.0,
1265 true);
1266
1267 CFRunLoopWakeUp(rl);
1268
1269 if (self->sup_base->sup_head)
1270 su_base_port_getmsgs(self);
1271
1272 if (self->sup_base->sup_timers)
1273 su_timer_expire(&self->sup_base->sup_timers, &tout, su_now());
1274
1275 if (self->sup_base->sup_head)
1276 tout = 0;
1277
1278 return tout;
1279 }
1280