1 /* $Id$ */
2 /*
3  * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4  * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20 
21 /*
22  * sock_select.c
23  *
24  * This is the implementation of IOQueue using pj_sock_select().
25  * It runs anywhere where pj_sock_select() is available (currently
26  * Win32, Linux, Linux kernel, etc.).
27  */
28 
29 #include <pj/ioqueue.h>
30 #include <pj/os.h>
31 #include <pj/lock.h>
32 #include <pj/log.h>
33 #include <pj/list.h>
34 #include <pj/pool.h>
35 #include <pj/string.h>
36 #include <pj/assert.h>
37 #include <pj/sock.h>
38 #include <pj/compat/socket.h>
39 #include <pj/sock_select.h>
40 #include <pj/sock_qos.h>
41 #include <pj/errno.h>
42 #include <pj/rand.h>
43 
44 /* Now that we have access to OS'es <sys/select>, lets check again that
45  * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
46  */
47 #if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE
48 #   error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE"
49 #endif
50 
51 
52 /*
53  * Include declaration from common abstraction.
54  */
55 #include "ioqueue_common_abs.h"
56 
57 /*
58  * ISSUES with ioqueue_select()
59  *
60  * EAGAIN/EWOULDBLOCK error in recv():
61  *  - when multiple threads are working with the ioqueue, application
62  *    may receive EAGAIN or EWOULDBLOCK in the receive callback.
63  *    This error happens because more than one thread is watching for
64  *    the same descriptor set, so when all of them call recv() or recvfrom()
65  *    simultaneously, only one will succeed and the rest will get the error.
66  *
67  */
68 #define THIS_FILE   "ioq_select"
69 
70 /*
71  * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
72  * the correct error code.
73  */
74 #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
75 #   error "Error reporting must be enabled for this function to work!"
76 #endif
77 
78 /*
79  * During debugging build, VALIDATE_FD_SET is set.
80  * This will check the validity of the fd_sets.
81  */
82 /*
83 #if defined(PJ_DEBUG) && PJ_DEBUG != 0
84 #  define VALIDATE_FD_SET		1
85 #else
86 #  define VALIDATE_FD_SET		0
87 #endif
88 */
89 #define VALIDATE_FD_SET     0
90 
91 #if 0
92 #  define TRACE__(args)	PJ_LOG(3,args)
93 #else
94 #  define TRACE__(args)
95 #endif
96 
97 /*
98  * This describes each key.
99  */
100 struct pj_ioqueue_key_t
101 {
102     DECLARE_COMMON_KEY
103 };
104 
105 /*
106  * This describes the I/O queue itself.
107  */
108 struct pj_ioqueue_t
109 {
110     DECLARE_COMMON_IOQUEUE
111 
112     unsigned		max, count;	/* Max and current key count	    */
113     int			nfds;		/* The largest fd value (for select)*/
114     pj_ioqueue_key_t	active_list;	/* List of active keys.		    */
115     pj_fd_set_t		rfdset;
116     pj_fd_set_t		wfdset;
117 #if PJ_HAS_TCP
118     pj_fd_set_t		xfdset;
119 #endif
120 
121 #if PJ_IOQUEUE_HAS_SAFE_UNREG
122     pj_mutex_t	       *ref_cnt_mutex;
123     pj_ioqueue_key_t	closing_list;
124     pj_ioqueue_key_t	free_list;
125 #endif
126 };
127 
128 /* Proto */
129 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
130 	    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
131 static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h);
132 #endif
133 
134 #if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \
135     (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE)
136     /* Call SSL Network framework poll */
137 pj_status_t ssl_network_event_poll();
138 #endif
139 
140 /* Include implementation for common abstraction after we declare
141  * pj_ioqueue_key_t and pj_ioqueue_t.
142  */
143 #include "ioqueue_common_abs.c"
144 
145 #if PJ_IOQUEUE_HAS_SAFE_UNREG
146 /* Scan closing keys to be put to free list again */
147 static void scan_closing_keys(pj_ioqueue_t *ioqueue);
148 #endif
149 
150 /*
151  * pj_ioqueue_name()
152  */
pj_ioqueue_name(void)153 PJ_DEF(const char*) pj_ioqueue_name(void)
154 {
155     return "select";
156 }
157 
158 /*
159  * Scan the socket descriptor sets for the largest descriptor.
160  * This value is needed by select().
161  */
162 #if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
rescan_fdset(pj_ioqueue_t * ioqueue)163 static void rescan_fdset(pj_ioqueue_t *ioqueue)
164 {
165     pj_ioqueue_key_t *key = ioqueue->active_list.next;
166     int max = 0;
167 
168     while (key != &ioqueue->active_list) {
169 	if (key->fd > max)
170 	    max = key->fd;
171 	key = key->next;
172     }
173 
174     ioqueue->nfds = max;
175 }
176 #else
rescan_fdset(pj_ioqueue_t * ioqueue)177 static void rescan_fdset(pj_ioqueue_t *ioqueue)
178 {
179     ioqueue->nfds = FD_SETSIZE-1;
180 }
181 #endif
182 
183 
184 /*
185  * pj_ioqueue_create()
186  *
187  * Create select ioqueue.
188  */
pj_ioqueue_create(pj_pool_t * pool,pj_size_t max_fd,pj_ioqueue_t ** p_ioqueue)189 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
190                                        pj_size_t max_fd,
191                                        pj_ioqueue_t **p_ioqueue)
192 {
193     pj_ioqueue_t *ioqueue;
194     pj_lock_t *lock;
195     unsigned i;
196     pj_status_t rc;
197 
198     /* Check that arguments are valid. */
199     PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
200                      max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
201                      PJ_EINVAL);
202 
203     /* Check that size of pj_ioqueue_op_key_t is sufficient */
204     PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
205                      sizeof(union operation_key), PJ_EBUG);
206 
207     /* Create and init common ioqueue stuffs */
208     ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
209     ioqueue_init(ioqueue);
210 
211     ioqueue->max = (unsigned)max_fd;
212     ioqueue->count = 0;
213     PJ_FD_ZERO(&ioqueue->rfdset);
214     PJ_FD_ZERO(&ioqueue->wfdset);
215 #if PJ_HAS_TCP
216     PJ_FD_ZERO(&ioqueue->xfdset);
217 #endif
218     pj_list_init(&ioqueue->active_list);
219 
220     rescan_fdset(ioqueue);
221 
222 #if PJ_IOQUEUE_HAS_SAFE_UNREG
223     /* When safe unregistration is used (the default), we pre-create
224      * all keys and put them in the free list.
225      */
226 
227     /* Mutex to protect key's reference counter
228      * We don't want to use key's mutex or ioqueue's mutex because
229      * that would create deadlock situation in some cases.
230      */
231     rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
232     if (rc != PJ_SUCCESS)
233 	return rc;
234 
235 
236     /* Init key list */
237     pj_list_init(&ioqueue->free_list);
238     pj_list_init(&ioqueue->closing_list);
239 
240 
241     /* Pre-create all keys according to max_fd */
242     for (i=0; i<max_fd; ++i) {
243 	pj_ioqueue_key_t *key;
244 
245 	key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
246 	key->ref_count = 0;
247 	rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
248 	if (rc != PJ_SUCCESS) {
249 	    key = ioqueue->free_list.next;
250 	    while (key != &ioqueue->free_list) {
251 		pj_lock_destroy(key->lock);
252 		key = key->next;
253 	    }
254 	    pj_mutex_destroy(ioqueue->ref_cnt_mutex);
255 	    return rc;
256 	}
257 
258 	pj_list_push_back(&ioqueue->free_list, key);
259     }
260 #endif
261 
262     /* Create and init ioqueue mutex */
263     rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
264     if (rc != PJ_SUCCESS)
265 	return rc;
266 
267     rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
268     if (rc != PJ_SUCCESS)
269         return rc;
270 
271     PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
272 
273     *p_ioqueue = ioqueue;
274     return PJ_SUCCESS;
275 }
276 
277 /*
278  * pj_ioqueue_destroy()
279  *
280  * Destroy ioqueue.
281  */
pj_ioqueue_destroy(pj_ioqueue_t * ioqueue)282 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
283 {
284     pj_ioqueue_key_t *key;
285 
286     PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
287 
288     pj_lock_acquire(ioqueue->lock);
289 
290 #if PJ_IOQUEUE_HAS_SAFE_UNREG
291     /* Destroy reference counters */
292     key = ioqueue->active_list.next;
293     while (key != &ioqueue->active_list) {
294 	pj_lock_destroy(key->lock);
295 	key = key->next;
296     }
297 
298     key = ioqueue->closing_list.next;
299     while (key != &ioqueue->closing_list) {
300 	pj_lock_destroy(key->lock);
301 	key = key->next;
302     }
303 
304     key = ioqueue->free_list.next;
305     while (key != &ioqueue->free_list) {
306 	pj_lock_destroy(key->lock);
307 	key = key->next;
308     }
309 
310     pj_mutex_destroy(ioqueue->ref_cnt_mutex);
311 #endif
312 
313     return ioqueue_destroy(ioqueue);
314 }
315 
316 
317 /*
318  * pj_ioqueue_register_sock()
319  *
320  * Register socket handle to ioqueue.
321  */
pj_ioqueue_register_sock2(pj_pool_t * pool,pj_ioqueue_t * ioqueue,pj_sock_t sock,pj_grp_lock_t * grp_lock,void * user_data,const pj_ioqueue_callback * cb,pj_ioqueue_key_t ** p_key)322 PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
323 					      pj_ioqueue_t *ioqueue,
324 					      pj_sock_t sock,
325 					      pj_grp_lock_t *grp_lock,
326 					      void *user_data,
327 					      const pj_ioqueue_callback *cb,
328                                               pj_ioqueue_key_t **p_key)
329 {
330     pj_ioqueue_key_t *key = NULL;
331 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
332     defined(PJ_WIN64) && PJ_WIN64 != 0 || \
333     defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
334     u_long value;
335 #else
336     pj_uint32_t value;
337 #endif
338     pj_status_t rc = PJ_SUCCESS;
339 
340     PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
341                      cb && p_key, PJ_EINVAL);
342 
343     /* On platforms with fd_set containing fd bitmap such as *nix family,
344      * avoid potential memory corruption caused by select() when given
345      * an fd that is higher than FD_SETSIZE.
346      */
347     if (sizeof(fd_set) < FD_SETSIZE && sock >= FD_SETSIZE) {
348 	PJ_LOG(4, ("pjlib", "Failed to register socket to ioqueue because "
349 		   	    "socket fd is too big (fd=%d/FD_SETSIZE=%d)",
350 		   	    sock, FD_SETSIZE));
351     	return PJ_ETOOBIG;
352     }
353 
354     pj_lock_acquire(ioqueue->lock);
355 
356     if (ioqueue->count >= ioqueue->max) {
357         rc = PJ_ETOOMANY;
358 	goto on_return;
359     }
360 
361     /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
362      * the key from the free list. Otherwise allocate a new one.
363      */
364 #if PJ_IOQUEUE_HAS_SAFE_UNREG
365 
366     /* Scan closing_keys first to let them come back to free_list */
367     scan_closing_keys(ioqueue);
368 
369     pj_assert(!pj_list_empty(&ioqueue->free_list));
370     if (pj_list_empty(&ioqueue->free_list)) {
371 	rc = PJ_ETOOMANY;
372 	goto on_return;
373     }
374 
375     key = ioqueue->free_list.next;
376     pj_list_erase(key);
377 #else
378     key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
379 #endif
380 
381     rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
382     if (rc != PJ_SUCCESS) {
383 	key = NULL;
384 	goto on_return;
385     }
386 
387     /* Set socket to nonblocking. */
388     value = 1;
389 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
390     defined(PJ_WIN64) && PJ_WIN64 != 0 || \
391     defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
392     if (ioctlsocket(sock, FIONBIO, &value)) {
393 #else
394     if (ioctl(sock, FIONBIO, &value)) {
395 #endif
396         rc = pj_get_netos_error();
397 	goto on_return;
398     }
399 
400 
401     /* Put in active list. */
402     pj_list_insert_before(&ioqueue->active_list, key);
403     ++ioqueue->count;
404 
405     /* Rescan fdset to get max descriptor */
406     rescan_fdset(ioqueue);
407 
408 on_return:
409     /* On error, socket may be left in non-blocking mode. */
410     if (rc != PJ_SUCCESS) {
411 	if (key && key->grp_lock)
412 	    pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
413     }
414     *p_key = key;
415     pj_lock_release(ioqueue->lock);
416 
417     return rc;
418 }
419 
420 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
421 					      pj_ioqueue_t *ioqueue,
422 					      pj_sock_t sock,
423 					      void *user_data,
424 					      const pj_ioqueue_callback *cb,
425 					      pj_ioqueue_key_t **p_key)
426 {
427     return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
428                                      cb, p_key);
429 }
430 
431 #if PJ_IOQUEUE_HAS_SAFE_UNREG
432 /* Increment key's reference counter */
433 static void increment_counter(pj_ioqueue_key_t *key)
434 {
435     pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
436     ++key->ref_count;
437     pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
438 }
439 
440 /* Decrement the key's reference counter, and when the counter reach zero,
441  * destroy the key.
442  *
443  * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
444  */
445 static void decrement_counter(pj_ioqueue_key_t *key)
446 {
447     pj_lock_acquire(key->ioqueue->lock);
448     pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
449     --key->ref_count;
450     if (key->ref_count == 0) {
451 
452 	pj_assert(key->closing == 1);
453 	pj_gettickcount(&key->free_time);
454 	key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
455 	pj_time_val_normalize(&key->free_time);
456 
457 	pj_list_erase(key);
458 	pj_list_push_back(&key->ioqueue->closing_list, key);
459 	/* Rescan fdset to get max descriptor */
460 	rescan_fdset(key->ioqueue);
461     }
462     pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
463     pj_lock_release(key->ioqueue->lock);
464 }
465 #endif
466 
467 
468 /*
469  * pj_ioqueue_unregister()
470  *
471  * Unregister handle from ioqueue.
472  */
473 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
474 {
475     pj_ioqueue_t *ioqueue;
476 
477     PJ_ASSERT_RETURN(key, PJ_EINVAL);
478 
479     ioqueue = key->ioqueue;
480 
481     /* Lock the key to make sure no callback is simultaneously modifying
482      * the key. We need to lock the key before ioqueue here to prevent
483      * deadlock.
484      */
485     pj_ioqueue_lock_key(key);
486 
487     /* Best effort to avoid double key-unregistration */
488     if (IS_CLOSING(key)) {
489 	pj_ioqueue_unlock_key(key);
490 	return PJ_SUCCESS;
491     }
492 
493     /* Also lock ioqueue */
494     pj_lock_acquire(ioqueue->lock);
495 
496     /* Avoid "negative" ioqueue count */
497     if (ioqueue->count > 0) {
498 	--ioqueue->count;
499     } else {
500 	/* If this happens, very likely there is double unregistration
501 	 * of a key.
502 	 */
503 	pj_assert(!"Bad ioqueue count in key unregistration!");
504 	PJ_LOG(1,(THIS_FILE, "Bad ioqueue count in key unregistration!"));
505     }
506 
507 #if !PJ_IOQUEUE_HAS_SAFE_UNREG
508     /* Ticket #520, key will be erased more than once */
509     pj_list_erase(key);
510 #endif
511     PJ_FD_CLR(key->fd, &ioqueue->rfdset);
512     PJ_FD_CLR(key->fd, &ioqueue->wfdset);
513 #if PJ_HAS_TCP
514     PJ_FD_CLR(key->fd, &ioqueue->xfdset);
515 #endif
516 
517     /* Close socket. */
518     if (key->fd != PJ_INVALID_SOCKET) {
519         pj_sock_close(key->fd);
520         key->fd = PJ_INVALID_SOCKET;
521     }
522 
523     /* Clear callback */
524     key->cb.on_accept_complete = NULL;
525     key->cb.on_connect_complete = NULL;
526     key->cb.on_read_complete = NULL;
527     key->cb.on_write_complete = NULL;
528 
529     /* Must release ioqueue lock first before decrementing counter, to
530      * prevent deadlock.
531      */
532     pj_lock_release(ioqueue->lock);
533 
534 #if PJ_IOQUEUE_HAS_SAFE_UNREG
535     /* Mark key is closing. */
536     key->closing = 1;
537 
538     /* Decrement counter. */
539     decrement_counter(key);
540 
541     /* Done. */
542     if (key->grp_lock) {
543 	/* just dec_ref and unlock. we will set grp_lock to NULL
544 	 * elsewhere */
545 	pj_grp_lock_t *grp_lock = key->grp_lock;
546 	// Don't set grp_lock to NULL otherwise the other thread
547 	// will crash. Just leave it as dangling pointer, but this
548 	// should be safe
549 	//key->grp_lock = NULL;
550 	pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
551 	pj_grp_lock_release(grp_lock);
552     } else {
553 	pj_ioqueue_unlock_key(key);
554     }
555 #else
556     if (key->grp_lock) {
557 	/* set grp_lock to NULL and unlock */
558 	pj_grp_lock_t *grp_lock = key->grp_lock;
559 	// Don't set grp_lock to NULL otherwise the other thread
560 	// will crash. Just leave it as dangling pointer, but this
561 	// should be safe
562 	//key->grp_lock = NULL;
563 	pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
564 	pj_grp_lock_release(grp_lock);
565     } else {
566 	pj_ioqueue_unlock_key(key);
567     }
568 
569     pj_lock_destroy(key->lock);
570 #endif
571 
572     return PJ_SUCCESS;
573 }
574 
575 
576 /* This supposed to check whether the fd_set values are consistent
577  * with the operation currently set in each key.
578  */
579 #if VALIDATE_FD_SET
580 static void validate_sets(const pj_ioqueue_t *ioqueue,
581 			  const pj_fd_set_t *rfdset,
582 			  const pj_fd_set_t *wfdset,
583 			  const pj_fd_set_t *xfdset)
584 {
585     pj_ioqueue_key_t *key;
586 
587     /*
588      * This basicly would not work anymore.
589      * We need to lock key before performing the check, but we can't do
590      * so because we're holding ioqueue mutex. If we acquire key's mutex
591      * now, the will cause deadlock.
592      */
593     pj_assert(0);
594 
595     key = ioqueue->active_list.next;
596     while (key != &ioqueue->active_list) {
597 	if (!pj_list_empty(&key->read_list)
598 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
599 	    || !pj_list_empty(&key->accept_list)
600 #endif
601 	    )
602 	{
603 	    pj_assert(PJ_FD_ISSET(key->fd, rfdset));
604 	}
605 	else {
606 	    pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
607 	}
608 	if (!pj_list_empty(&key->write_list)
609 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
610 	    || key->connecting
611 #endif
612 	   )
613 	{
614 	    pj_assert(PJ_FD_ISSET(key->fd, wfdset));
615 	}
616 	else {
617 	    pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
618 	}
619 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
620 	if (key->connecting)
621 	{
622 	    pj_assert(PJ_FD_ISSET(key->fd, xfdset));
623 	}
624 	else {
625 	    pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
626 	}
627 #endif /* PJ_HAS_TCP */
628 
629 	key = key->next;
630     }
631 }
632 #endif	/* VALIDATE_FD_SET */
633 
634 
635 /* ioqueue_remove_from_set()
636  * This function is called from ioqueue_dispatch_event() to instruct
637  * the ioqueue to remove the specified descriptor from ioqueue's descriptor
638  * set for the specified event.
639  */
640 static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
641                                      pj_ioqueue_key_t *key,
642                                      enum ioqueue_event_type event_type)
643 {
644     pj_lock_acquire(ioqueue->lock);
645 
646     if (event_type == READABLE_EVENT)
647         PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
648     else if (event_type == WRITEABLE_EVENT)
649         PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
650 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
651     else if (event_type == EXCEPTION_EVENT)
652         PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
653 #endif
654     else
655         pj_assert(0);
656 
657     pj_lock_release(ioqueue->lock);
658 }
659 
660 /*
661  * ioqueue_add_to_set()
662  * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
663  * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
664  * set for the specified event.
665  */
666 static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
667                                 pj_ioqueue_key_t *key,
668                                 enum ioqueue_event_type event_type )
669 {
670     pj_lock_acquire(ioqueue->lock);
671 
672     if (event_type == READABLE_EVENT)
673         PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
674     else if (event_type == WRITEABLE_EVENT)
675         PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
676 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
677     else if (event_type == EXCEPTION_EVENT)
678         PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
679 #endif
680     else
681         pj_assert(0);
682 
683     pj_lock_release(ioqueue->lock);
684 }
685 
686 #if PJ_IOQUEUE_HAS_SAFE_UNREG
687 /* Scan closing keys to be put to free list again */
688 static void scan_closing_keys(pj_ioqueue_t *ioqueue)
689 {
690     pj_time_val now;
691     pj_ioqueue_key_t *h;
692 
693     pj_gettickcount(&now);
694     h = ioqueue->closing_list.next;
695     while (h != &ioqueue->closing_list) {
696 	pj_ioqueue_key_t *next = h->next;
697 
698 	pj_assert(h->closing != 0);
699 
700 	if (PJ_TIME_VAL_GTE(now, h->free_time)) {
701 	    pj_list_erase(h);
702 	    // Don't set grp_lock to NULL otherwise the other thread
703 	    // will crash. Just leave it as dangling pointer, but this
704 	    // should be safe
705 	    //h->grp_lock = NULL;
706 	    pj_list_push_back(&ioqueue->free_list, h);
707 	}
708 	h = next;
709     }
710 }
711 #endif
712 
713 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
714     PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
715 static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h)
716 {
717     enum flags {
718 	HAS_PEER_ADDR = 1,
719 	HAS_QOS = 2
720     };
721     pj_sock_t old_sock, new_sock = PJ_INVALID_SOCKET;
722     pj_sockaddr local_addr, rem_addr;
723     int val, addr_len;
724     pj_fd_set_t *fds[3];
725     unsigned i, fds_cnt, flags=0;
726     pj_qos_params qos_params;
727     unsigned msec;
728     pj_status_t status;
729 
730     pj_lock_acquire(h->ioqueue->lock);
731 
732     old_sock = h->fd;
733 
734     fds_cnt = 0;
735     fds[fds_cnt++] = &h->ioqueue->rfdset;
736     fds[fds_cnt++] = &h->ioqueue->wfdset;
737 #if PJ_HAS_TCP
738     fds[fds_cnt++] = &h->ioqueue->xfdset;
739 #endif
740 
741     /* Can only replace UDP socket */
742     pj_assert(h->fd_type == pj_SOCK_DGRAM());
743 
744     PJ_LOG(4,(THIS_FILE, "Attempting to replace UDP socket %d", old_sock));
745 
746     for (msec=20; (msec<1000 && status != PJ_SUCCESS) ;
747          msec<1000? msec=msec*2 : 1000)
748     {
749         if (msec > 20) {
750             PJ_LOG(4,(THIS_FILE, "Retry to replace UDP socket %d", old_sock));
751             pj_thread_sleep(msec);
752         }
753 
754         if (old_sock != PJ_INVALID_SOCKET) {
755             /* Investigate the old socket */
756             addr_len = sizeof(local_addr);
757             status = pj_sock_getsockname(old_sock, &local_addr, &addr_len);
758             if (status != PJ_SUCCESS) {
759                 PJ_PERROR(5,(THIS_FILE, status, "Error get socket name"));
760             	continue;
761             }
762 
763             addr_len = sizeof(rem_addr);
764             status = pj_sock_getpeername(old_sock, &rem_addr, &addr_len);
765             if (status != PJ_SUCCESS) {
766                 PJ_PERROR(5,(THIS_FILE, status, "Error get peer name"));
767             } else {
768             	flags |= HAS_PEER_ADDR;
769             }
770 
771             status = pj_sock_get_qos_params(old_sock, &qos_params);
772             if (status == PJ_STATUS_FROM_OS(EBADF) ||
773                 status == PJ_STATUS_FROM_OS(EINVAL))
774             {
775             	PJ_PERROR(5,(THIS_FILE, status, "Error get qos param"));
776             	continue;
777             }
778 
779             if (status != PJ_SUCCESS) {
780             	PJ_PERROR(5,(THIS_FILE, status, "Error get qos param"));
781             } else {
782             	flags |= HAS_QOS;
783             }
784 
785             /* We're done with the old socket, close it otherwise we'll get
786              * error in bind()
787              */
788             status = pj_sock_close(old_sock);
789        	    if (status != PJ_SUCCESS) {
790                 PJ_PERROR(5,(THIS_FILE, status, "Error closing socket"));
791             }
792 
793             old_sock = PJ_INVALID_SOCKET;
794         }
795 
796         /* Prepare the new socket */
797         status = pj_sock_socket(local_addr.addr.sa_family, PJ_SOCK_DGRAM, 0,
798                                 &new_sock);
799         if (status != PJ_SUCCESS) {
800             PJ_PERROR(5,(THIS_FILE, status, "Error create socket"));
801             continue;
802         }
803 
804         /* Even after the socket is closed, we'll still get "Address in use"
805          * errors, so force it with SO_REUSEADDR
806          */
807         val = 1;
808         status = pj_sock_setsockopt(new_sock, SOL_SOCKET, SO_REUSEADDR,
809                                     &val, sizeof(val));
810         if (status == PJ_STATUS_FROM_OS(EBADF) ||
811             status == PJ_STATUS_FROM_OS(EINVAL))
812         {
813             PJ_PERROR(5,(THIS_FILE, status, "Error set socket option"));
814             continue;
815         }
816 
817         /* The loop is silly, but what else can we do? */
818         addr_len = pj_sockaddr_get_len(&local_addr);
819         for (msec=20; msec<1000 ; msec<1000? msec=msec*2 : 1000) {
820             status = pj_sock_bind(new_sock, &local_addr, addr_len);
821             if (status != PJ_STATUS_FROM_OS(EADDRINUSE))
822                 break;
823             PJ_LOG(4,(THIS_FILE, "Address is still in use, retrying.."));
824             pj_thread_sleep(msec);
825         }
826 
827         if (status != PJ_SUCCESS)
828             continue;
829 
830         if (flags & HAS_QOS) {
831             status = pj_sock_set_qos_params(new_sock, &qos_params);
832             if (status == PJ_STATUS_FROM_OS(EINVAL)) {
833                 PJ_PERROR(5,(THIS_FILE, status, "Error set qos param"));
834                 continue;
835             }
836         }
837 
838         if (flags & HAS_PEER_ADDR) {
839             status = pj_sock_connect(new_sock, &rem_addr, addr_len);
840             if (status != PJ_SUCCESS) {
841                 PJ_PERROR(5,(THIS_FILE, status, "Error connect socket"));
842                 continue;
843             }
844         }
845     }
846 
847     if (status != PJ_SUCCESS)
848         goto on_error;
849 
850     /* Set socket to nonblocking. */
851     val = 1;
852 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
853     defined(PJ_WIN64) && PJ_WIN64 != 0 || \
854     defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
855     if (ioctlsocket(new_sock, FIONBIO, &val)) {
856 #else
857     if (ioctl(new_sock, FIONBIO, &val)) {
858 #endif
859         status = pj_get_netos_error();
860 	goto on_error;
861     }
862 
863     /* Replace the occurrence of old socket with new socket in the
864      * fd sets.
865      */
866     for (i=0; i<fds_cnt; ++i) {
867 	if (PJ_FD_ISSET(h->fd, fds[i])) {
868 	    PJ_FD_CLR(h->fd, fds[i]);
869 	    PJ_FD_SET(new_sock, fds[i]);
870 	}
871     }
872 
873     /* And finally replace the fd in the key */
874     h->fd = new_sock;
875 
876     PJ_LOG(4,(THIS_FILE, "UDP has been replaced successfully!"));
877 
878     pj_lock_release(h->ioqueue->lock);
879 
880     return PJ_SUCCESS;
881 
882 on_error:
883     if (new_sock != PJ_INVALID_SOCKET)
884 	pj_sock_close(new_sock);
885     if (old_sock != PJ_INVALID_SOCKET)
886     	pj_sock_close(old_sock);
887 
888     /* Clear the occurrence of old socket in the fd sets. */
889     for (i=0; i<fds_cnt; ++i) {
890 	if (PJ_FD_ISSET(h->fd, fds[i])) {
891 	    PJ_FD_CLR(h->fd, fds[i]);
892 	}
893     }
894 
895     h->fd = PJ_INVALID_SOCKET;
896     PJ_PERROR(1,(THIS_FILE, status, "Error replacing socket %d", old_sock));
897     pj_lock_release(h->ioqueue->lock);
898     return PJ_ESOCKETSTOP;
899 }
900 #endif
901 
902 
903 /*
904  * pj_ioqueue_poll()
905  *
906  * Few things worth written:
907  *
908  *  - we used to do only one callback called per poll, but it didn't go
909  *    very well. The reason is because on some situation, the write
910  *    callback gets called all the time, thus doesn't give the read
911  *    callback to get called. This happens, for example, when user
912  *    submit write operation inside the write callback.
913  *    As the result, we changed the behaviour so that now multiple
914  *    callbacks are called in a single poll. It should be fast too,
915  *    just that we need to be carefull with the ioqueue data structs.
916  *
917  *  - to guarantee preemptiveness etc, the poll function must strictly
918  *    work on fd_set copy of the ioqueue (not the original one).
919  */
920 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
921 {
922     pj_fd_set_t rfdset, wfdset, xfdset;
923     int nfds;
924     int i, count, event_cnt, processed_cnt;
925     pj_ioqueue_key_t *h;
926     enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS };
927     struct event
928     {
929         pj_ioqueue_key_t	*key;
930         enum ioqueue_event_type  event_type;
931     } event[MAX_EVENTS];
932 
933     PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
934 
935 #if defined(PJ_HAS_SSL_SOCK) && PJ_HAS_SSL_SOCK != 0 && \
936     (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE)
937     /* Call SSL Network framework event poll */
938     ssl_network_event_poll();
939 #endif
940 
941     /* Lock ioqueue before making fd_set copies */
942     pj_lock_acquire(ioqueue->lock);
943 
944     /* We will only do select() when there are sockets to be polled.
945      * Otherwise select() will return error.
946      */
947     if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
948         PJ_FD_COUNT(&ioqueue->wfdset)==0
949 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
950         && PJ_FD_COUNT(&ioqueue->xfdset)==0
951 #endif
952 	)
953     {
954 #if PJ_IOQUEUE_HAS_SAFE_UNREG
955 	scan_closing_keys(ioqueue);
956 #endif
957 	pj_lock_release(ioqueue->lock);
958 	TRACE__((THIS_FILE, "     poll: no fd is set"));
959         if (timeout)
960             pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
961         return 0;
962     }
963 
964     /* Copy ioqueue's pj_fd_set_t to local variables. */
965     pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
966     pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
967 #if PJ_HAS_TCP
968     pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
969 #else
970     PJ_FD_ZERO(&xfdset);
971 #endif
972 
973 #if VALIDATE_FD_SET
974     validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
975 #endif
976 
977     nfds = ioqueue->nfds;
978 
979     /* Unlock ioqueue before select(). */
980     pj_lock_release(ioqueue->lock);
981 
982 #if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8
983     count = 0;
984     __try {
985 #endif
986 
987     count = pj_sock_select(nfds+1, &rfdset, &wfdset, &xfdset,
988 			   timeout);
989 
990 #if defined(PJ_WIN32_WINPHONE8) && PJ_WIN32_WINPHONE8
991     /* Ignore Invalid Handle Exception raised by select().*/
992     }
993     __except (GetExceptionCode() == STATUS_INVALID_HANDLE ?
994 	      EXCEPTION_CONTINUE_EXECUTION : EXCEPTION_CONTINUE_SEARCH) {
995     }
996 #endif
997 
998     if (count == 0)
999 	return 0;
1000     else if (count < 0)
1001 	return -pj_get_netos_error();
1002 
1003     /* Scan descriptor sets for event and add the events in the event
1004      * array to be processed later in this function. We do this so that
1005      * events can be processed in parallel without holding ioqueue lock.
1006      */
1007     pj_lock_acquire(ioqueue->lock);
1008 
1009     event_cnt = 0;
1010 
1011     /* Scan for writable sockets first to handle piggy-back data
1012      * coming with accept().
1013      */
1014     for (h = ioqueue->active_list.next;
1015 	 h != &ioqueue->active_list && event_cnt < MAX_EVENTS;
1016 	 h = h->next)
1017     {
1018 	if (h->fd == PJ_INVALID_SOCKET)
1019 	    continue;
1020 
1021 	if ( (key_has_pending_write(h) || key_has_pending_connect(h))
1022 	     && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
1023         {
1024 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1025 	    increment_counter(h);
1026 #endif
1027             event[event_cnt].key = h;
1028             event[event_cnt].event_type = WRITEABLE_EVENT;
1029             ++event_cnt;
1030         }
1031 
1032         /* Scan for readable socket. */
1033 	if ((key_has_pending_read(h) || key_has_pending_accept(h))
1034             && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) &&
1035 	    event_cnt < MAX_EVENTS)
1036         {
1037 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1038 	    increment_counter(h);
1039 #endif
1040             event[event_cnt].key = h;
1041             event[event_cnt].event_type = READABLE_EVENT;
1042             ++event_cnt;
1043 	}
1044 
1045 #if PJ_HAS_TCP
1046         if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
1047 	    !IS_CLOSING(h) && event_cnt < MAX_EVENTS)
1048 	{
1049 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1050 	    increment_counter(h);
1051 #endif
1052             event[event_cnt].key = h;
1053             event[event_cnt].event_type = EXCEPTION_EVENT;
1054             ++event_cnt;
1055         }
1056 #endif
1057     }
1058 
1059     for (i=0; i<event_cnt; ++i) {
1060 	if (event[i].key->grp_lock)
1061 	    pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0);
1062     }
1063 
1064     PJ_RACE_ME(5);
1065 
1066     pj_lock_release(ioqueue->lock);
1067 
1068     PJ_RACE_ME(5);
1069 
1070     processed_cnt = 0;
1071 
1072     /* Now process all events. The dispatch functions will take care
1073      * of locking in each of the key
1074      */
1075     for (i=0; i<event_cnt; ++i) {
1076 
1077 	/* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */
1078 	if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) {
1079 	    switch (event[i].event_type) {
1080 	    case READABLE_EVENT:
1081 		if (ioqueue_dispatch_read_event(ioqueue, event[i].key))
1082 		    ++processed_cnt;
1083 		break;
1084 	    case WRITEABLE_EVENT:
1085 		if (ioqueue_dispatch_write_event(ioqueue, event[i].key))
1086 		    ++processed_cnt;
1087 		break;
1088 	    case EXCEPTION_EVENT:
1089 		if (ioqueue_dispatch_exception_event(ioqueue, event[i].key))
1090 		    ++processed_cnt;
1091 		break;
1092 	    case NO_EVENT:
1093 		pj_assert(!"Invalid event!");
1094 		break;
1095 	    }
1096 	}
1097 
1098 #if PJ_IOQUEUE_HAS_SAFE_UNREG
1099 	decrement_counter(event[i].key);
1100 #endif
1101 
1102 	if (event[i].key->grp_lock)
1103 	    pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock,
1104 	                            "ioqueue", 0);
1105     }
1106 
1107     TRACE__((THIS_FILE, "     poll: count=%d events=%d processed=%d",
1108 	     count, event_cnt, processed_cnt));
1109 
1110     return processed_cnt;
1111 }
1112 
1113