1 /* $Id$ */
2 /*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21 /*
22 * sock_select.c
23 *
24 * This is the implementation of IOQueue using pj_sock_select().
25 * It runs anywhere where pj_sock_select() is available (currently
26 * Win32, Linux, Linux kernel, etc.).
27 */
28
29 #include <pj/ioqueue.h>
30 #include <pj/os.h>
31 #include <pj/lock.h>
32 #include <pj/log.h>
33 #include <pj/list.h>
34 #include <pj/pool.h>
35 #include <pj/string.h>
36 #include <pj/assert.h>
37 #include <pj/sock.h>
38 #include <pj/compat/socket.h>
39 #include <pj/sock_select.h>
40 #include <pj/sock_qos.h>
41 #include <pj/errno.h>
42 #include <pj/rand.h>
43
44 /* Now that we have access to OS'es <sys/select>, lets check again that
45 * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
46 */
47 #if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE
48 # error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE"
49 #endif
50
51
52 /*
53 * Include declaration from common abstraction.
54 */
55 #include "ioqueue_common_abs.h"
56
57 /*
58 * ISSUES with ioqueue_select()
59 *
60 * EAGAIN/EWOULDBLOCK error in recv():
61 * - when multiple threads are working with the ioqueue, application
62 * may receive EAGAIN or EWOULDBLOCK in the receive callback.
63 * This error happens because more than one thread is watching for
64 * the same descriptor set, so when all of them call recv() or recvfrom()
65 * simultaneously, only one will succeed and the rest will get the error.
66 *
67 */
68 #define THIS_FILE "ioq_select"
69
70 /*
71 * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
72 * the correct error code.
73 */
74 #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
75 # error "Error reporting must be enabled for this function to work!"
76 #endif
77
78 /*
79 * During debugging build, VALIDATE_FD_SET is set.
80 * This will check the validity of the fd_sets.
81 */
82 /*
83 #if defined(PJ_DEBUG) && PJ_DEBUG != 0
84 # define VALIDATE_FD_SET 1
85 #else
86 # define VALIDATE_FD_SET 0
87 #endif
88 */
89 #define VALIDATE_FD_SET 0
90
91 #if 0
92 # define TRACE__(args) PJ_LOG(3,args)
93 #else
94 # define TRACE__(args)
95 #endif
96
97 /*
98 * This describes each key.
99 */
100 struct pj_ioqueue_key_t
101 {
102 DECLARE_COMMON_KEY
103 };
104
105 /*
106 * This describes the I/O queue itself.
107 */
108 struct pj_ioqueue_t
109 {
110 DECLARE_COMMON_IOQUEUE
111
112 unsigned max, count; /* Max and current key count */
113 int nfds; /* The largest fd value (for select)*/
114 pj_ioqueue_key_t active_list; /* List of active keys. */
115 pj_fd_set_t rfdset;
116 pj_fd_set_t wfdset;
117 #if PJ_HAS_TCP
118 pj_fd_set_t xfdset;
119 #endif
120
121 #if PJ_IOQUEUE_HAS_SAFE_UNREG
122 pj_mutex_t *ref_cnt_mutex;
123 pj_ioqueue_key_t closing_list;
124 pj_ioqueue_key_t free_list;
125 #endif
126 };
127
128 /* Proto */
129 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
130 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
131 static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h);
132 #endif
133
134 #if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \
135 (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE)
136 /* Call SSL Network framework poll */
137 pj_status_t ssl_network_event_poll();
138 #endif
139
140 /* Include implementation for common abstraction after we declare
141 * pj_ioqueue_key_t and pj_ioqueue_t.
142 */
143 #include "ioqueue_common_abs.c"
144
145 #if PJ_IOQUEUE_HAS_SAFE_UNREG
146 /* Scan closing keys to be put to free list again */
147 static void scan_closing_keys(pj_ioqueue_t *ioqueue);
148 #endif
149
150 /*
151 * pj_ioqueue_name()
152 */
pj_ioqueue_name(void)153 PJ_DEF(const char*) pj_ioqueue_name(void)
154 {
155 return "select";
156 }
157
158 /*
159 * Scan the socket descriptor sets for the largest descriptor.
160 * This value is needed by select().
161 */
162 #if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
rescan_fdset(pj_ioqueue_t * ioqueue)163 static void rescan_fdset(pj_ioqueue_t *ioqueue)
164 {
165 pj_ioqueue_key_t *key = ioqueue->active_list.next;
166 int max = 0;
167
168 while (key != &ioqueue->active_list) {
169 if (key->fd > max)
170 max = key->fd;
171 key = key->next;
172 }
173
174 ioqueue->nfds = max;
175 }
176 #else
rescan_fdset(pj_ioqueue_t * ioqueue)177 static void rescan_fdset(pj_ioqueue_t *ioqueue)
178 {
179 ioqueue->nfds = FD_SETSIZE-1;
180 }
181 #endif
182
183
184 /*
185 * pj_ioqueue_create()
186 *
187 * Create select ioqueue.
188 */
pj_ioqueue_create(pj_pool_t * pool,pj_size_t max_fd,pj_ioqueue_t ** p_ioqueue)189 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
190 pj_size_t max_fd,
191 pj_ioqueue_t **p_ioqueue)
192 {
193 pj_ioqueue_t *ioqueue;
194 pj_lock_t *lock;
195 unsigned i;
196 pj_status_t rc;
197
198 /* Check that arguments are valid. */
199 PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
200 max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
201 PJ_EINVAL);
202
203 /* Check that size of pj_ioqueue_op_key_t is sufficient */
204 PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
205 sizeof(union operation_key), PJ_EBUG);
206
207 /* Create and init common ioqueue stuffs */
208 ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
209 ioqueue_init(ioqueue);
210
211 ioqueue->max = (unsigned)max_fd;
212 ioqueue->count = 0;
213 PJ_FD_ZERO(&ioqueue->rfdset);
214 PJ_FD_ZERO(&ioqueue->wfdset);
215 #if PJ_HAS_TCP
216 PJ_FD_ZERO(&ioqueue->xfdset);
217 #endif
218 pj_list_init(&ioqueue->active_list);
219
220 rescan_fdset(ioqueue);
221
222 #if PJ_IOQUEUE_HAS_SAFE_UNREG
223 /* When safe unregistration is used (the default), we pre-create
224 * all keys and put them in the free list.
225 */
226
227 /* Mutex to protect key's reference counter
228 * We don't want to use key's mutex or ioqueue's mutex because
229 * that would create deadlock situation in some cases.
230 */
231 rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
232 if (rc != PJ_SUCCESS)
233 return rc;
234
235
236 /* Init key list */
237 pj_list_init(&ioqueue->free_list);
238 pj_list_init(&ioqueue->closing_list);
239
240
241 /* Pre-create all keys according to max_fd */
242 for (i=0; i<max_fd; ++i) {
243 pj_ioqueue_key_t *key;
244
245 key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
246 key->ref_count = 0;
247 rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
248 if (rc != PJ_SUCCESS) {
249 key = ioqueue->free_list.next;
250 while (key != &ioqueue->free_list) {
251 pj_lock_destroy(key->lock);
252 key = key->next;
253 }
254 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
255 return rc;
256 }
257
258 pj_list_push_back(&ioqueue->free_list, key);
259 }
260 #endif
261
262 /* Create and init ioqueue mutex */
263 rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
264 if (rc != PJ_SUCCESS)
265 return rc;
266
267 rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
268 if (rc != PJ_SUCCESS)
269 return rc;
270
271 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
272
273 *p_ioqueue = ioqueue;
274 return PJ_SUCCESS;
275 }
276
277 /*
278 * pj_ioqueue_destroy()
279 *
280 * Destroy ioqueue.
281 */
pj_ioqueue_destroy(pj_ioqueue_t * ioqueue)282 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
283 {
284 pj_ioqueue_key_t *key;
285
286 PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
287
288 pj_lock_acquire(ioqueue->lock);
289
290 #if PJ_IOQUEUE_HAS_SAFE_UNREG
291 /* Destroy reference counters */
292 key = ioqueue->active_list.next;
293 while (key != &ioqueue->active_list) {
294 pj_lock_destroy(key->lock);
295 key = key->next;
296 }
297
298 key = ioqueue->closing_list.next;
299 while (key != &ioqueue->closing_list) {
300 pj_lock_destroy(key->lock);
301 key = key->next;
302 }
303
304 key = ioqueue->free_list.next;
305 while (key != &ioqueue->free_list) {
306 pj_lock_destroy(key->lock);
307 key = key->next;
308 }
309
310 pj_mutex_destroy(ioqueue->ref_cnt_mutex);
311 #endif
312
313 return ioqueue_destroy(ioqueue);
314 }
315
316
317 /*
318 * pj_ioqueue_register_sock()
319 *
320 * Register socket handle to ioqueue.
321 */
pj_ioqueue_register_sock2(pj_pool_t * pool,pj_ioqueue_t * ioqueue,pj_sock_t sock,pj_grp_lock_t * grp_lock,void * user_data,const pj_ioqueue_callback * cb,pj_ioqueue_key_t ** p_key)322 PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
323 pj_ioqueue_t *ioqueue,
324 pj_sock_t sock,
325 pj_grp_lock_t *grp_lock,
326 void *user_data,
327 const pj_ioqueue_callback *cb,
328 pj_ioqueue_key_t **p_key)
329 {
330 pj_ioqueue_key_t *key = NULL;
331 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
332 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
333 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
334 u_long value;
335 #else
336 pj_uint32_t value;
337 #endif
338 pj_status_t rc = PJ_SUCCESS;
339
340 PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
341 cb && p_key, PJ_EINVAL);
342
343 /* On platforms with fd_set containing fd bitmap such as *nix family,
344 * avoid potential memory corruption caused by select() when given
345 * an fd that is higher than FD_SETSIZE.
346 */
347 if (sizeof(fd_set) < FD_SETSIZE && sock >= FD_SETSIZE) {
348 PJ_LOG(4, ("pjlib", "Failed to register socket to ioqueue because "
349 "socket fd is too big (fd=%d/FD_SETSIZE=%d)",
350 sock, FD_SETSIZE));
351 return PJ_ETOOBIG;
352 }
353
354 pj_lock_acquire(ioqueue->lock);
355
356 if (ioqueue->count >= ioqueue->max) {
357 rc = PJ_ETOOMANY;
358 goto on_return;
359 }
360
361 /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
362 * the key from the free list. Otherwise allocate a new one.
363 */
364 #if PJ_IOQUEUE_HAS_SAFE_UNREG
365
366 /* Scan closing_keys first to let them come back to free_list */
367 scan_closing_keys(ioqueue);
368
369 pj_assert(!pj_list_empty(&ioqueue->free_list));
370 if (pj_list_empty(&ioqueue->free_list)) {
371 rc = PJ_ETOOMANY;
372 goto on_return;
373 }
374
375 key = ioqueue->free_list.next;
376 pj_list_erase(key);
377 #else
378 key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
379 #endif
380
381 rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
382 if (rc != PJ_SUCCESS) {
383 key = NULL;
384 goto on_return;
385 }
386
387 /* Set socket to nonblocking. */
388 value = 1;
389 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
390 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
391 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
392 if (ioctlsocket(sock, FIONBIO, &value)) {
393 #else
394 if (ioctl(sock, FIONBIO, &value)) {
395 #endif
396 rc = pj_get_netos_error();
397 goto on_return;
398 }
399
400
401 /* Put in active list. */
402 pj_list_insert_before(&ioqueue->active_list, key);
403 ++ioqueue->count;
404
405 /* Rescan fdset to get max descriptor */
406 rescan_fdset(ioqueue);
407
408 on_return:
409 /* On error, socket may be left in non-blocking mode. */
410 if (rc != PJ_SUCCESS) {
411 if (key && key->grp_lock)
412 pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
413 }
414 *p_key = key;
415 pj_lock_release(ioqueue->lock);
416
417 return rc;
418 }
419
420 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
421 pj_ioqueue_t *ioqueue,
422 pj_sock_t sock,
423 void *user_data,
424 const pj_ioqueue_callback *cb,
425 pj_ioqueue_key_t **p_key)
426 {
427 return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
428 cb, p_key);
429 }
430
431 #if PJ_IOQUEUE_HAS_SAFE_UNREG
432 /* Increment key's reference counter */
433 static void increment_counter(pj_ioqueue_key_t *key)
434 {
435 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
436 ++key->ref_count;
437 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
438 }
439
440 /* Decrement the key's reference counter, and when the counter reach zero,
441 * destroy the key.
442 *
443 * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
444 */
445 static void decrement_counter(pj_ioqueue_key_t *key)
446 {
447 pj_lock_acquire(key->ioqueue->lock);
448 pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
449 --key->ref_count;
450 if (key->ref_count == 0) {
451
452 pj_assert(key->closing == 1);
453 pj_gettickcount(&key->free_time);
454 key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
455 pj_time_val_normalize(&key->free_time);
456
457 pj_list_erase(key);
458 pj_list_push_back(&key->ioqueue->closing_list, key);
459 /* Rescan fdset to get max descriptor */
460 rescan_fdset(key->ioqueue);
461 }
462 pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
463 pj_lock_release(key->ioqueue->lock);
464 }
465 #endif
466
467
468 /*
469 * pj_ioqueue_unregister()
470 *
471 * Unregister handle from ioqueue.
472 */
473 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
474 {
475 pj_ioqueue_t *ioqueue;
476
477 PJ_ASSERT_RETURN(key, PJ_EINVAL);
478
479 ioqueue = key->ioqueue;
480
481 /* Lock the key to make sure no callback is simultaneously modifying
482 * the key. We need to lock the key before ioqueue here to prevent
483 * deadlock.
484 */
485 pj_ioqueue_lock_key(key);
486
487 /* Best effort to avoid double key-unregistration */
488 if (IS_CLOSING(key)) {
489 pj_ioqueue_unlock_key(key);
490 return PJ_SUCCESS;
491 }
492
493 /* Also lock ioqueue */
494 pj_lock_acquire(ioqueue->lock);
495
496 /* Avoid "negative" ioqueue count */
497 if (ioqueue->count > 0) {
498 --ioqueue->count;
499 } else {
500 /* If this happens, very likely there is double unregistration
501 * of a key.
502 */
503 pj_assert(!"Bad ioqueue count in key unregistration!");
504 PJ_LOG(1,(THIS_FILE, "Bad ioqueue count in key unregistration!"));
505 }
506
507 #if !PJ_IOQUEUE_HAS_SAFE_UNREG
508 /* Ticket #520, key will be erased more than once */
509 pj_list_erase(key);
510 #endif
511 PJ_FD_CLR(key->fd, &ioqueue->rfdset);
512 PJ_FD_CLR(key->fd, &ioqueue->wfdset);
513 #if PJ_HAS_TCP
514 PJ_FD_CLR(key->fd, &ioqueue->xfdset);
515 #endif
516
517 /* Close socket. */
518 if (key->fd != PJ_INVALID_SOCKET) {
519 pj_sock_close(key->fd);
520 key->fd = PJ_INVALID_SOCKET;
521 }
522
523 /* Clear callback */
524 key->cb.on_accept_complete = NULL;
525 key->cb.on_connect_complete = NULL;
526 key->cb.on_read_complete = NULL;
527 key->cb.on_write_complete = NULL;
528
529 /* Must release ioqueue lock first before decrementing counter, to
530 * prevent deadlock.
531 */
532 pj_lock_release(ioqueue->lock);
533
534 #if PJ_IOQUEUE_HAS_SAFE_UNREG
535 /* Mark key is closing. */
536 key->closing = 1;
537
538 /* Decrement counter. */
539 decrement_counter(key);
540
541 /* Done. */
542 if (key->grp_lock) {
543 /* just dec_ref and unlock. we will set grp_lock to NULL
544 * elsewhere */
545 pj_grp_lock_t *grp_lock = key->grp_lock;
546 // Don't set grp_lock to NULL otherwise the other thread
547 // will crash. Just leave it as dangling pointer, but this
548 // should be safe
549 //key->grp_lock = NULL;
550 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
551 pj_grp_lock_release(grp_lock);
552 } else {
553 pj_ioqueue_unlock_key(key);
554 }
555 #else
556 if (key->grp_lock) {
557 /* set grp_lock to NULL and unlock */
558 pj_grp_lock_t *grp_lock = key->grp_lock;
559 // Don't set grp_lock to NULL otherwise the other thread
560 // will crash. Just leave it as dangling pointer, but this
561 // should be safe
562 //key->grp_lock = NULL;
563 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
564 pj_grp_lock_release(grp_lock);
565 } else {
566 pj_ioqueue_unlock_key(key);
567 }
568
569 pj_lock_destroy(key->lock);
570 #endif
571
572 return PJ_SUCCESS;
573 }
574
575
576 /* This supposed to check whether the fd_set values are consistent
577 * with the operation currently set in each key.
578 */
579 #if VALIDATE_FD_SET
580 static void validate_sets(const pj_ioqueue_t *ioqueue,
581 const pj_fd_set_t *rfdset,
582 const pj_fd_set_t *wfdset,
583 const pj_fd_set_t *xfdset)
584 {
585 pj_ioqueue_key_t *key;
586
587 /*
588 * This basicly would not work anymore.
589 * We need to lock key before performing the check, but we can't do
590 * so because we're holding ioqueue mutex. If we acquire key's mutex
591 * now, the will cause deadlock.
592 */
593 pj_assert(0);
594
595 key = ioqueue->active_list.next;
596 while (key != &ioqueue->active_list) {
597 if (!pj_list_empty(&key->read_list)
598 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
599 || !pj_list_empty(&key->accept_list)
600 #endif
601 )
602 {
603 pj_assert(PJ_FD_ISSET(key->fd, rfdset));
604 }
605 else {
606 pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
607 }
608 if (!pj_list_empty(&key->write_list)
609 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
610 || key->connecting
611 #endif
612 )
613 {
614 pj_assert(PJ_FD_ISSET(key->fd, wfdset));
615 }
616 else {
617 pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
618 }
619 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
620 if (key->connecting)
621 {
622 pj_assert(PJ_FD_ISSET(key->fd, xfdset));
623 }
624 else {
625 pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
626 }
627 #endif /* PJ_HAS_TCP */
628
629 key = key->next;
630 }
631 }
632 #endif /* VALIDATE_FD_SET */
633
634
635 /* ioqueue_remove_from_set()
636 * This function is called from ioqueue_dispatch_event() to instruct
637 * the ioqueue to remove the specified descriptor from ioqueue's descriptor
638 * set for the specified event.
639 */
640 static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
641 pj_ioqueue_key_t *key,
642 enum ioqueue_event_type event_type)
643 {
644 pj_lock_acquire(ioqueue->lock);
645
646 if (event_type == READABLE_EVENT)
647 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
648 else if (event_type == WRITEABLE_EVENT)
649 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
650 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
651 else if (event_type == EXCEPTION_EVENT)
652 PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
653 #endif
654 else
655 pj_assert(0);
656
657 pj_lock_release(ioqueue->lock);
658 }
659
660 /*
661 * ioqueue_add_to_set()
662 * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
663 * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
664 * set for the specified event.
665 */
666 static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
667 pj_ioqueue_key_t *key,
668 enum ioqueue_event_type event_type )
669 {
670 pj_lock_acquire(ioqueue->lock);
671
672 if (event_type == READABLE_EVENT)
673 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
674 else if (event_type == WRITEABLE_EVENT)
675 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
676 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
677 else if (event_type == EXCEPTION_EVENT)
678 PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
679 #endif
680 else
681 pj_assert(0);
682
683 pj_lock_release(ioqueue->lock);
684 }
685
686 #if PJ_IOQUEUE_HAS_SAFE_UNREG
687 /* Scan closing keys to be put to free list again */
688 static void scan_closing_keys(pj_ioqueue_t *ioqueue)
689 {
690 pj_time_val now;
691 pj_ioqueue_key_t *h;
692
693 pj_gettickcount(&now);
694 h = ioqueue->closing_list.next;
695 while (h != &ioqueue->closing_list) {
696 pj_ioqueue_key_t *next = h->next;
697
698 pj_assert(h->closing != 0);
699
700 if (PJ_TIME_VAL_GTE(now, h->free_time)) {
701 pj_list_erase(h);
702 // Don't set grp_lock to NULL otherwise the other thread
703 // will crash. Just leave it as dangling pointer, but this
704 // should be safe
705 //h->grp_lock = NULL;
706 pj_list_push_back(&ioqueue->free_list, h);
707 }
708 h = next;
709 }
710 }
711 #endif
712
713 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
714 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
715 static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h)
716 {
717 enum flags {
718 HAS_PEER_ADDR = 1,
719 HAS_QOS = 2
720 };
721 pj_sock_t old_sock, new_sock = PJ_INVALID_SOCKET;
722 pj_sockaddr local_addr, rem_addr;
723 int val, addr_len;
724 pj_fd_set_t *fds[3];
725 unsigned i, fds_cnt, flags=0;
726 pj_qos_params qos_params;
727 unsigned msec;
728 pj_status_t status;
729
730 pj_lock_acquire(h->ioqueue->lock);
731
732 old_sock = h->fd;
733
734 fds_cnt = 0;
735 fds[fds_cnt++] = &h->ioqueue->rfdset;
736 fds[fds_cnt++] = &h->ioqueue->wfdset;
737 #if PJ_HAS_TCP
738 fds[fds_cnt++] = &h->ioqueue->xfdset;
739 #endif
740
741 /* Can only replace UDP socket */
742 pj_assert(h->fd_type == pj_SOCK_DGRAM());
743
744 PJ_LOG(4,(THIS_FILE, "Attempting to replace UDP socket %d", old_sock));
745
746 for (msec=20; (msec<1000 && status != PJ_SUCCESS) ;
747 msec<1000? msec=msec*2 : 1000)
748 {
749 if (msec > 20) {
750 PJ_LOG(4,(THIS_FILE, "Retry to replace UDP socket %d", old_sock));
751 pj_thread_sleep(msec);
752 }
753
754 if (old_sock != PJ_INVALID_SOCKET) {
755 /* Investigate the old socket */
756 addr_len = sizeof(local_addr);
757 status = pj_sock_getsockname(old_sock, &local_addr, &addr_len);
758 if (status != PJ_SUCCESS) {
759 PJ_PERROR(5,(THIS_FILE, status, "Error get socket name"));
760 continue;
761 }
762
763 addr_len = sizeof(rem_addr);
764 status = pj_sock_getpeername(old_sock, &rem_addr, &addr_len);
765 if (status != PJ_SUCCESS) {
766 PJ_PERROR(5,(THIS_FILE, status, "Error get peer name"));
767 } else {
768 flags |= HAS_PEER_ADDR;
769 }
770
771 status = pj_sock_get_qos_params(old_sock, &qos_params);
772 if (status == PJ_STATUS_FROM_OS(EBADF) ||
773 status == PJ_STATUS_FROM_OS(EINVAL))
774 {
775 PJ_PERROR(5,(THIS_FILE, status, "Error get qos param"));
776 continue;
777 }
778
779 if (status != PJ_SUCCESS) {
780 PJ_PERROR(5,(THIS_FILE, status, "Error get qos param"));
781 } else {
782 flags |= HAS_QOS;
783 }
784
785 /* We're done with the old socket, close it otherwise we'll get
786 * error in bind()
787 */
788 status = pj_sock_close(old_sock);
789 if (status != PJ_SUCCESS) {
790 PJ_PERROR(5,(THIS_FILE, status, "Error closing socket"));
791 }
792
793 old_sock = PJ_INVALID_SOCKET;
794 }
795
796 /* Prepare the new socket */
797 status = pj_sock_socket(local_addr.addr.sa_family, PJ_SOCK_DGRAM, 0,
798 &new_sock);
799 if (status != PJ_SUCCESS) {
800 PJ_PERROR(5,(THIS_FILE, status, "Error create socket"));
801 continue;
802 }
803
804 /* Even after the socket is closed, we'll still get "Address in use"
805 * errors, so force it with SO_REUSEADDR
806 */
807 val = 1;
808 status = pj_sock_setsockopt(new_sock, SOL_SOCKET, SO_REUSEADDR,
809 &val, sizeof(val));
810 if (status == PJ_STATUS_FROM_OS(EBADF) ||
811 status == PJ_STATUS_FROM_OS(EINVAL))
812 {
813 PJ_PERROR(5,(THIS_FILE, status, "Error set socket option"));
814 continue;
815 }
816
817 /* The loop is silly, but what else can we do? */
818 addr_len = pj_sockaddr_get_len(&local_addr);
819 for (msec=20; msec<1000 ; msec<1000? msec=msec*2 : 1000) {
820 status = pj_sock_bind(new_sock, &local_addr, addr_len);
821 if (status != PJ_STATUS_FROM_OS(EADDRINUSE))
822 break;
823 PJ_LOG(4,(THIS_FILE, "Address is still in use, retrying.."));
824 pj_thread_sleep(msec);
825 }
826
827 if (status != PJ_SUCCESS)
828 continue;
829
830 if (flags & HAS_QOS) {
831 status = pj_sock_set_qos_params(new_sock, &qos_params);
832 if (status == PJ_STATUS_FROM_OS(EINVAL)) {
833 PJ_PERROR(5,(THIS_FILE, status, "Error set qos param"));
834 continue;
835 }
836 }
837
838 if (flags & HAS_PEER_ADDR) {
839 status = pj_sock_connect(new_sock, &rem_addr, addr_len);
840 if (status != PJ_SUCCESS) {
841 PJ_PERROR(5,(THIS_FILE, status, "Error connect socket"));
842 continue;
843 }
844 }
845 }
846
847 if (status != PJ_SUCCESS)
848 goto on_error;
849
850 /* Set socket to nonblocking. */
851 val = 1;
852 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
853 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
854 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
855 if (ioctlsocket(new_sock, FIONBIO, &val)) {
856 #else
857 if (ioctl(new_sock, FIONBIO, &val)) {
858 #endif
859 status = pj_get_netos_error();
860 goto on_error;
861 }
862
863 /* Replace the occurrence of old socket with new socket in the
864 * fd sets.
865 */
866 for (i=0; i<fds_cnt; ++i) {
867 if (PJ_FD_ISSET(h->fd, fds[i])) {
868 PJ_FD_CLR(h->fd, fds[i]);
869 PJ_FD_SET(new_sock, fds[i]);
870 }
871 }
872
873 /* And finally replace the fd in the key */
874 h->fd = new_sock;
875
876 PJ_LOG(4,(THIS_FILE, "UDP has been replaced successfully!"));
877
878 pj_lock_release(h->ioqueue->lock);
879
880 return PJ_SUCCESS;
881
882 on_error:
883 if (new_sock != PJ_INVALID_SOCKET)
884 pj_sock_close(new_sock);
885 if (old_sock != PJ_INVALID_SOCKET)
886 pj_sock_close(old_sock);
887
888 /* Clear the occurrence of old socket in the fd sets. */
889 for (i=0; i<fds_cnt; ++i) {
890 if (PJ_FD_ISSET(h->fd, fds[i])) {
891 PJ_FD_CLR(h->fd, fds[i]);
892 }
893 }
894
895 h->fd = PJ_INVALID_SOCKET;
896 PJ_PERROR(1,(THIS_FILE, status, "Error replacing socket %d", old_sock));
897 pj_lock_release(h->ioqueue->lock);
898 return PJ_ESOCKETSTOP;
899 }
900 #endif
901
902
903 /*
904 * pj_ioqueue_poll()
905 *
906 * Few things worth written:
907 *
908 * - we used to do only one callback called per poll, but it didn't go
909 * very well. The reason is because on some situation, the write
910 * callback gets called all the time, thus doesn't give the read
911 * callback to get called. This happens, for example, when user
912 * submit write operation inside the write callback.
913 * As the result, we changed the behaviour so that now multiple
914 * callbacks are called in a single poll. It should be fast too,
915 * just that we need to be carefull with the ioqueue data structs.
916 *
917 * - to guarantee preemptiveness etc, the poll function must strictly
918 * work on fd_set copy of the ioqueue (not the original one).
919 */
920 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
921 {
922 pj_fd_set_t rfdset, wfdset, xfdset;
923 int nfds;
924 int i, count, event_cnt, processed_cnt;
925 pj_ioqueue_key_t *h;
926 enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS };
927 struct event
928 {
929 pj_ioqueue_key_t *key;
930 enum ioqueue_event_type event_type;
931 } event[MAX_EVENTS];
932
933 PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
934
935 #if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \
936 (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE)
937 /* Call SSL Network framework event poll */
938 ssl_network_event_poll();
939 #endif
940
941 /* Lock ioqueue before making fd_set copies */
942 pj_lock_acquire(ioqueue->lock);
943
944 /* We will only do select() when there are sockets to be polled.
945 * Otherwise select() will return error.
946 */
947 if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
948 PJ_FD_COUNT(&ioqueue->wfdset)==0
949 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
950 && PJ_FD_COUNT(&ioqueue->xfdset)==0
951 #endif
952 )
953 {
954 #if PJ_IOQUEUE_HAS_SAFE_UNREG
955 scan_closing_keys(ioqueue);
956 #endif
957 pj_lock_release(ioqueue->lock);
958 TRACE__((THIS_FILE, " poll: no fd is set"));
959 if (timeout)
960 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
961 return 0;
962 }
963
964 /* Copy ioqueue's pj_fd_set_t to local variables. */
965 pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
966 pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
967 #if PJ_HAS_TCP
968 pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
969 #else
970 PJ_FD_ZERO(&xfdset);
971 #endif
972
973 #if VALIDATE_FD_SET
974 validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
975 #endif
976
977 nfds = ioqueue->nfds;
978
979 /* Unlock ioqueue before select(). */
980 pj_lock_release(ioqueue->lock);
981
982 #if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8
983 count = 0;
984 __try {
985 #endif
986
987 count = pj_sock_select(nfds+1, &rfdset, &wfdset, &xfdset,
988 timeout);
989
990 #if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8
991 /* Ignore Invalid Handle Exception raised by select().*/
992 }
993 __except (GetExceptionCode() == STATUS_INVALID_HANDLE ?
994 EXCEPTION_CONTINUE_EXECUTION : EXCEPTION_CONTINUE_SEARCH) {
995 }
996 #endif
997
998 if (count == 0)
999 return 0;
1000 else if (count < 0)
1001 return -pj_get_netos_error();
1002
1003 /* Scan descriptor sets for event and add the events in the event
1004 * array to be processed later in this function. We do this so that
1005 * events can be processed in parallel without holding ioqueue lock.
1006 */
1007 pj_lock_acquire(ioqueue->lock);
1008
1009 event_cnt = 0;
1010
1011 /* Scan for writable sockets first to handle piggy-back data
1012 * coming with accept().
1013 */
1014 for (h = ioqueue->active_list.next;
1015 h != &ioqueue->active_list && event_cnt < MAX_EVENTS;
1016 h = h->next)
1017 {
1018 if (h->fd == PJ_INVALID_SOCKET)
1019 continue;
1020
1021 if ( (key_has_pending_write(h) || key_has_pending_connect(h))
1022 && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
1023 {
1024 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1025 increment_counter(h);
1026 #endif
1027 event[event_cnt].key = h;
1028 event[event_cnt].event_type = WRITEABLE_EVENT;
1029 ++event_cnt;
1030 }
1031
1032 /* Scan for readable socket. */
1033 if ((key_has_pending_read(h) || key_has_pending_accept(h))
1034 && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) &&
1035 event_cnt < MAX_EVENTS)
1036 {
1037 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1038 increment_counter(h);
1039 #endif
1040 event[event_cnt].key = h;
1041 event[event_cnt].event_type = READABLE_EVENT;
1042 ++event_cnt;
1043 }
1044
1045 #if PJ_HAS_TCP
1046 if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
1047 !IS_CLOSING(h) && event_cnt < MAX_EVENTS)
1048 {
1049 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1050 increment_counter(h);
1051 #endif
1052 event[event_cnt].key = h;
1053 event[event_cnt].event_type = EXCEPTION_EVENT;
1054 ++event_cnt;
1055 }
1056 #endif
1057 }
1058
1059 for (i=0; i<event_cnt; ++i) {
1060 if (event[i].key->grp_lock)
1061 pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0);
1062 }
1063
1064 PJ_RACE_ME(5);
1065
1066 pj_lock_release(ioqueue->lock);
1067
1068 PJ_RACE_ME(5);
1069
1070 processed_cnt = 0;
1071
1072 /* Now process all events. The dispatch functions will take care
1073 * of locking in each of the key
1074 */
1075 for (i=0; i<event_cnt; ++i) {
1076
1077 /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */
1078 if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) {
1079 switch (event[i].event_type) {
1080 case READABLE_EVENT:
1081 if (ioqueue_dispatch_read_event(ioqueue, event[i].key))
1082 ++processed_cnt;
1083 break;
1084 case WRITEABLE_EVENT:
1085 if (ioqueue_dispatch_write_event(ioqueue, event[i].key))
1086 ++processed_cnt;
1087 break;
1088 case EXCEPTION_EVENT:
1089 if (ioqueue_dispatch_exception_event(ioqueue, event[i].key))
1090 ++processed_cnt;
1091 break;
1092 case NO_EVENT:
1093 pj_assert(!"Invalid event!");
1094 break;
1095 }
1096 }
1097
1098 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1099 decrement_counter(event[i].key);
1100 #endif
1101
1102 if (event[i].key->grp_lock)
1103 pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock,
1104 "ioqueue", 0);
1105 }
1106
1107 TRACE__((THIS_FILE, " poll: count=%d events=%d processed=%d",
1108 count, event_cnt, processed_cnt));
1109
1110 return processed_cnt;
1111 }
1112
1113