1 /* $Id$ */
2 /*
3  * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4  * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20 
21 /*
22  * ioqueue_common_abs.c
23  *
24  * This contains common functionalities to emulate proactor pattern with
25  * various event dispatching mechanisms (e.g. select, epoll).
26  *
27  * This file will be included by the appropriate ioqueue implementation.
28  * This file is NOT supposed to be compiled as stand-alone source.
29  */
30 
31 #define PENDING_RETRY	2
32 
ioqueue_init(pj_ioqueue_t * ioqueue)33 static void ioqueue_init( pj_ioqueue_t *ioqueue )
34 {
35     ioqueue->lock = NULL;
36     ioqueue->auto_delete_lock = 0;
37     ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
38 }
39 
ioqueue_destroy(pj_ioqueue_t * ioqueue)40 static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41 {
42     if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43 	pj_lock_release(ioqueue->lock);
44         return pj_lock_destroy(ioqueue->lock);
45     }
46 
47     return PJ_SUCCESS;
48 }
49 
50 /*
51  * pj_ioqueue_set_lock()
52  */
pj_ioqueue_set_lock(pj_ioqueue_t * ioqueue,pj_lock_t * lock,pj_bool_t auto_delete)53 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
54 					 pj_lock_t *lock,
55 					 pj_bool_t auto_delete )
56 {
57     PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58 
59     if (ioqueue->auto_delete_lock && ioqueue->lock) {
60         pj_lock_destroy(ioqueue->lock);
61     }
62 
63     ioqueue->lock = lock;
64     ioqueue->auto_delete_lock = auto_delete;
65 
66     return PJ_SUCCESS;
67 }
68 
ioqueue_init_key(pj_pool_t * pool,pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * key,pj_sock_t sock,pj_grp_lock_t * grp_lock,void * user_data,const pj_ioqueue_callback * cb)69 static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70                                      pj_ioqueue_t *ioqueue,
71                                      pj_ioqueue_key_t *key,
72                                      pj_sock_t sock,
73                                      pj_grp_lock_t *grp_lock,
74                                      void *user_data,
75                                      const pj_ioqueue_callback *cb)
76 {
77     pj_status_t rc;
78     int optlen;
79 
80     PJ_UNUSED_ARG(pool);
81 
82     key->ioqueue = ioqueue;
83     key->fd = sock;
84     key->user_data = user_data;
85     pj_list_init(&key->read_list);
86     pj_list_init(&key->write_list);
87 #if PJ_HAS_TCP
88     pj_list_init(&key->accept_list);
89     key->connecting = 0;
90 #endif
91 
92     /* Save callback. */
93     pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
94 
95 #if PJ_IOQUEUE_HAS_SAFE_UNREG
96     /* Set initial reference count to 1 */
97     pj_assert(key->ref_count == 0);
98     ++key->ref_count;
99 
100     key->closing = 0;
101 #endif
102 
103     rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
104     if (rc != PJ_SUCCESS)
105 	return rc;
106 
107     /* Get socket type. When socket type is datagram, some optimization
108      * will be performed during send to allow parallel send operations.
109      */
110     optlen = sizeof(key->fd_type);
111     rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
112                             &key->fd_type, &optlen);
113     if (rc != PJ_SUCCESS)
114         key->fd_type = pj_SOCK_STREAM();
115 
116     /* Create mutex for the key. */
117 #if !PJ_IOQUEUE_HAS_SAFE_UNREG
118     rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock);
119     if (rc != PJ_SUCCESS)
120 	return rc;
121 #endif
122 
123     /* Group lock */
124     key->grp_lock = grp_lock;
125     if (key->grp_lock) {
126 	pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0);
127     }
128 
129     return PJ_SUCCESS;
130 }
131 
132 /*
133  * pj_ioqueue_get_user_data()
134  *
135  * Obtain value associated with a key.
136  */
pj_ioqueue_get_user_data(pj_ioqueue_key_t * key)137 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
138 {
139     PJ_ASSERT_RETURN(key != NULL, NULL);
140     return key->user_data;
141 }
142 
143 /*
144  * pj_ioqueue_set_user_data()
145  */
pj_ioqueue_set_user_data(pj_ioqueue_key_t * key,void * user_data,void ** old_data)146 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
147                                               void *user_data,
148                                               void **old_data)
149 {
150     PJ_ASSERT_RETURN(key, PJ_EINVAL);
151 
152     if (old_data)
153         *old_data = key->user_data;
154     key->user_data = user_data;
155 
156     return PJ_SUCCESS;
157 }
158 
key_has_pending_write(pj_ioqueue_key_t * key)159 PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
160 {
161     return !pj_list_empty(&key->write_list);
162 }
163 
key_has_pending_read(pj_ioqueue_key_t * key)164 PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
165 {
166     return !pj_list_empty(&key->read_list);
167 }
168 
key_has_pending_accept(pj_ioqueue_key_t * key)169 PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
170 {
171 #if PJ_HAS_TCP
172     return !pj_list_empty(&key->accept_list);
173 #else
174     PJ_UNUSED_ARG(key);
175     return 0;
176 #endif
177 }
178 
key_has_pending_connect(pj_ioqueue_key_t * key)179 PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
180 {
181     return key->connecting;
182 }
183 
184 
185 #if PJ_IOQUEUE_HAS_SAFE_UNREG
186 #   define IS_CLOSING(key)  (key->closing)
187 #else
188 #   define IS_CLOSING(key)  (0)
189 #endif
190 
191 
192 /*
193  * ioqueue_dispatch_event()
194  *
195  * Report occurence of an event in the key to be processed by the
196  * framework.
197  */
ioqueue_dispatch_write_event(pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * h)198 pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue,
199 				        pj_ioqueue_key_t *h)
200 {
201     pj_status_t rc;
202 
203     /* Try lock the key. */
204     rc = pj_ioqueue_trylock_key(h);
205     if (rc != PJ_SUCCESS) {
206 	return PJ_FALSE;
207     }
208 
209     if (IS_CLOSING(h)) {
210 	pj_ioqueue_unlock_key(h);
211 	return PJ_TRUE;
212     }
213 
214 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
215     if (h->connecting) {
216 	/* Completion of connect() operation */
217 	pj_status_t status;
218 	pj_bool_t has_lock;
219 
220 	/* Clear operation. */
221 	h->connecting = 0;
222 
223         ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
224         ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
225 
226 
227 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
228 	/* from connect(2):
229 	 * On Linux, use getsockopt to read the SO_ERROR option at
230 	 * level SOL_SOCKET to determine whether connect() completed
231 	 * successfully (if SO_ERROR is zero).
232 	 */
233 	{
234 	  int value;
235 	  int vallen = sizeof(value);
236 	  int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
237 					 &value, &vallen);
238 	  if (gs_rc != 0) {
239 	    /* Argh!! What to do now???
240 	     * Just indicate that the socket is connected. The
241 	     * application will get error as soon as it tries to use
242 	     * the socket to send/receive.
243 	     */
244 	      status = PJ_SUCCESS;
245 	  } else {
246 	      status = PJ_STATUS_FROM_OS(value);
247 	  }
248  	}
249 #elif (defined(PJ_WIN32) && PJ_WIN32!=0) || (defined(PJ_WIN64) && PJ_WIN64!=0)
250 	status = PJ_SUCCESS; /* success */
251 #else
252 	/* Excellent information in D.J. Bernstein page:
253 	 * http://cr.yp.to/docs/connect.html
254 	 *
255 	 * Seems like the most portable way of detecting connect()
256 	 * failure is to call getpeername(). If socket is connected,
257 	 * getpeername() will return 0. If the socket is not connected,
258 	 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
259 	 * the right errno through error slippage. This is a combination
260 	 * of suggestions from Douglas C. Schmidt and Ken Keys.
261 	 */
262 	{
263 	    struct sockaddr_in addr;
264 	    int addrlen = sizeof(addr);
265 
266 	    status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
267 				         &addrlen);
268 	}
269 #endif
270 
271         /* Unlock; from this point we don't need to hold key's mutex
272 	 * (unless concurrency is disabled, which in this case we should
273 	 * hold the mutex while calling the callback) */
274 	if (h->allow_concurrent) {
275 	    /* concurrency may be changed while we're in the callback, so
276 	     * save it to a flag.
277 	     */
278 	    has_lock = PJ_FALSE;
279 	    pj_ioqueue_unlock_key(h);
280 	} else {
281 	    has_lock = PJ_TRUE;
282 	}
283 
284 	/* Call callback. */
285         if (h->cb.on_connect_complete && !IS_CLOSING(h))
286 	    (*h->cb.on_connect_complete)(h, status);
287 
288 	/* Unlock if we still hold the lock */
289 	if (has_lock) {
290 	    pj_ioqueue_unlock_key(h);
291 	}
292 
293         /* Done. */
294 
295     } else
296 #endif /* PJ_HAS_TCP */
297     if (key_has_pending_write(h)) {
298 	/* Socket is writable. */
299         struct write_operation *write_op;
300         pj_ssize_t sent;
301         pj_status_t send_rc = PJ_SUCCESS;
302 
303         /* Get the first in the queue. */
304         write_op = h->write_list.next;
305 
306         /* For datagrams, we can remove the write_op from the list
307          * so that send() can work in parallel.
308          */
309         if (h->fd_type == pj_SOCK_DGRAM()) {
310             pj_list_erase(write_op);
311 
312             if (pj_list_empty(&h->write_list))
313                 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
314 
315         }
316 
317         /* Send the data.
318          * Unfortunately we must do this while holding key's mutex, thus
319          * preventing parallel write on a single key.. :-((
320          */
321         sent = write_op->size - write_op->written;
322         if (write_op->op == PJ_IOQUEUE_OP_SEND) {
323             send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
324                                    &sent, write_op->flags);
325 	    /* Can't do this. We only clear "op" after we're finished sending
326 	     * the whole buffer.
327 	     */
328 	    //write_op->op = 0;
329         } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
330 	    int retry = 2;
331 	    while (--retry >= 0) {
332 		send_rc = pj_sock_sendto(h->fd,
333 					 write_op->buf+write_op->written,
334 					 &sent, write_op->flags,
335 					 &write_op->rmt_addr,
336 					 write_op->rmt_addrlen);
337 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
338 	    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
339 		/* Special treatment for dead UDP sockets here, see ticket #1107 */
340 		if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) &&
341 		    h->fd_type==pj_SOCK_DGRAM())
342 		{
343 		    PJ_PERROR(4,(THIS_FILE, send_rc,
344 				 "Send error for socket %d, retrying",
345 				 h->fd));
346 		    send_rc = replace_udp_sock(h);
347 		    continue;
348 		}
349 #endif
350 		break;
351 	    }
352 
353 	    /* Can't do this. We only clear "op" after we're finished sending
354 	     * the whole buffer.
355 	     */
356 	    //write_op->op = 0;
357         } else {
358             pj_assert(!"Invalid operation type!");
359 	    write_op->op = PJ_IOQUEUE_OP_NONE;
360             send_rc = PJ_EBUG;
361         }
362 
363         if (send_rc == PJ_SUCCESS) {
364             write_op->written += sent;
365         } else {
366             pj_assert(send_rc > 0);
367             write_op->written = -send_rc;
368         }
369 
370         /* Are we finished with this buffer? */
371         if (send_rc!=PJ_SUCCESS ||
372             write_op->written == (pj_ssize_t)write_op->size ||
373             h->fd_type == pj_SOCK_DGRAM())
374         {
375 	    pj_bool_t has_lock;
376 
377 	    write_op->op = PJ_IOQUEUE_OP_NONE;
378 
379             if (h->fd_type != pj_SOCK_DGRAM()) {
380                 /* Write completion of the whole stream. */
381                 pj_list_erase(write_op);
382 
383                 /* Clear operation if there's no more data to send. */
384                 if (pj_list_empty(&h->write_list))
385                     ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
386 
387             }
388 
389 	    /* Unlock; from this point we don't need to hold key's mutex
390 	     * (unless concurrency is disabled, which in this case we should
391 	     * hold the mutex while calling the callback) */
392 	    if (h->allow_concurrent) {
393 		/* concurrency may be changed while we're in the callback, so
394 		 * save it to a flag.
395 		 */
396 		has_lock = PJ_FALSE;
397 		pj_ioqueue_unlock_key(h);
398 		PJ_RACE_ME(5);
399 	    } else {
400 		has_lock = PJ_TRUE;
401 	    }
402 
403 	    /* Call callback. */
404             if (h->cb.on_write_complete && !IS_CLOSING(h)) {
405 	        (*h->cb.on_write_complete)(h,
406                                            (pj_ioqueue_op_key_t*)write_op,
407                                            write_op->written);
408             }
409 
410 	    if (has_lock) {
411 		pj_ioqueue_unlock_key(h);
412 	    }
413 
414         } else {
415             pj_ioqueue_unlock_key(h);
416         }
417 
418         /* Done. */
419     } else {
420         /*
421          * This is normal; execution may fall here when multiple threads
422          * are signalled for the same event, but only one thread eventually
423          * able to process the event.
424          */
425 	pj_ioqueue_unlock_key(h);
426 
427 	return PJ_FALSE;
428     }
429 
430     return PJ_TRUE;
431 }
432 
ioqueue_dispatch_read_event(pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * h)433 pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue,
434 				       pj_ioqueue_key_t *h )
435 {
436     pj_status_t rc;
437 
438     /* Try lock the key. */
439     rc = pj_ioqueue_trylock_key(h);
440     if (rc != PJ_SUCCESS) {
441 	return PJ_FALSE;
442     }
443 
444     if (IS_CLOSING(h)) {
445 	pj_ioqueue_unlock_key(h);
446 	return PJ_TRUE;
447     }
448 
449 #   if PJ_HAS_TCP
450     if (!pj_list_empty(&h->accept_list)) {
451 
452         struct accept_operation *accept_op;
453 	pj_bool_t has_lock;
454 
455         /* Get one accept operation from the list. */
456 	accept_op = h->accept_list.next;
457         pj_list_erase(accept_op);
458         accept_op->op = PJ_IOQUEUE_OP_NONE;
459 
460 	/* Clear bit in fdset if there is no more pending accept */
461         if (pj_list_empty(&h->accept_list))
462             ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
463 
464 	rc=pj_sock_accept(h->fd, accept_op->accept_fd,
465                           accept_op->rmt_addr, accept_op->addrlen);
466 	if (rc==PJ_SUCCESS && accept_op->local_addr) {
467 	    rc = pj_sock_getsockname(*accept_op->accept_fd,
468                                      accept_op->local_addr,
469 				     accept_op->addrlen);
470 	}
471 
472 	/* Unlock; from this point we don't need to hold key's mutex
473 	 * (unless concurrency is disabled, which in this case we should
474 	 * hold the mutex while calling the callback) */
475 	if (h->allow_concurrent) {
476 	    /* concurrency may be changed while we're in the callback, so
477 	     * save it to a flag.
478 	     */
479 	    has_lock = PJ_FALSE;
480 	    pj_ioqueue_unlock_key(h);
481 	    PJ_RACE_ME(5);
482 	} else {
483 	    has_lock = PJ_TRUE;
484 	}
485 
486 	/* Call callback. */
487         if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
488 	    (*h->cb.on_accept_complete)(h,
489                                         (pj_ioqueue_op_key_t*)accept_op,
490                                         *accept_op->accept_fd, rc);
491 	}
492 
493 	if (has_lock) {
494 	    pj_ioqueue_unlock_key(h);
495 	}
496     }
497     else
498 #   endif
499     if (key_has_pending_read(h)) {
500         struct read_operation *read_op;
501         pj_ssize_t bytes_read;
502 	pj_bool_t has_lock;
503 
504         /* Get one pending read operation from the list. */
505         read_op = h->read_list.next;
506         pj_list_erase(read_op);
507 
508         /* Clear fdset if there is no pending read. */
509         if (pj_list_empty(&h->read_list))
510             ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
511 
512         bytes_read = read_op->size;
513 
514 	if (read_op->op == PJ_IOQUEUE_OP_RECV_FROM) {
515 	    read_op->op = PJ_IOQUEUE_OP_NONE;
516 	    rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
517 				  read_op->flags,
518 				  read_op->rmt_addr,
519                                   read_op->rmt_addrlen);
520 	} else if (read_op->op == PJ_IOQUEUE_OP_RECV) {
521 	    read_op->op = PJ_IOQUEUE_OP_NONE;
522 	    rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
523 			      read_op->flags);
524         } else {
525             pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
526 	    read_op->op = PJ_IOQUEUE_OP_NONE;
527             /*
528              * User has specified pj_ioqueue_read().
529              * On Win32, we should do ReadFile(). But because we got
530              * here because of select() anyway, user must have put a
531              * socket descriptor on h->fd, which in this case we can
532              * just call pj_sock_recv() instead of ReadFile().
533              * On Unix, user may put a file in h->fd, so we'll have
534              * to call read() here.
535              * This may not compile on systems which doesn't have
536              * read(). That's why we only specify PJ_LINUX here so
537              * that error is easier to catch.
538              */
539 #	    if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
540 	       defined(PJ_WIN64) && PJ_WIN64 != 0 || \
541 	       defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
542                 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
543 				  read_op->flags);
544                 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
545                 //              &bytes_read, NULL);
546 #           elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
547                 bytes_read = read(h->fd, read_op->buf, bytes_read);
548                 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
549 #           else
550 #               error "Implement read() for this platform!"
551 #           endif
552         }
553 
554 	if (rc != PJ_SUCCESS) {
555 #	    if (defined(PJ_WIN32) && PJ_WIN32 != 0) || \
556 	       (defined(PJ_WIN64) && PJ_WIN64 != 0)
557 	    /* On Win32, for UDP, WSAECONNRESET on the receive side
558 	     * indicates that previous sending has triggered ICMP Port
559 	     * Unreachable message.
560 	     * But we wouldn't know at this point which one of previous
561 	     * key that has triggered the error, since UDP socket can
562 	     * be shared!
563 	     * So we'll just ignore it!
564 	     */
565 
566 	    if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
567 		//PJ_LOG(4,(THIS_FILE,
568                 //          "Ignored ICMP port unreach. on key=%p", h));
569 	    }
570 #	    endif
571 
572             /* In any case we would report this to caller. */
573             bytes_read = -rc;
574 
575 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
576     PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
577 	    /* Special treatment for dead UDP sockets here, see ticket #1107 */
578 	    if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
579 		h->fd_type==pj_SOCK_DGRAM())
580 	    {
581 		rc = replace_udp_sock(h);
582 		if (rc != PJ_SUCCESS) {
583 		    bytes_read = -rc;
584 		}
585 	    }
586 #endif
587 	}
588 
589 	/* Unlock; from this point we don't need to hold key's mutex
590 	 * (unless concurrency is disabled, which in this case we should
591 	 * hold the mutex while calling the callback) */
592 	if (h->allow_concurrent) {
593 	    /* concurrency may be changed while we're in the callback, so
594 	     * save it to a flag.
595 	     */
596 	    has_lock = PJ_FALSE;
597 	    pj_ioqueue_unlock_key(h);
598 	    PJ_RACE_ME(5);
599 	} else {
600 	    has_lock = PJ_TRUE;
601 	}
602 
603 	/* Call callback. */
604         if (h->cb.on_read_complete && !IS_CLOSING(h)) {
605 	    (*h->cb.on_read_complete)(h,
606                                       (pj_ioqueue_op_key_t*)read_op,
607                                       bytes_read);
608         }
609 
610 	if (has_lock) {
611 	    pj_ioqueue_unlock_key(h);
612 	}
613 
614     } else {
615         /*
616          * This is normal; execution may fall here when multiple threads
617          * are signalled for the same event, but only one thread eventually
618          * able to process the event.
619          */
620 	pj_ioqueue_unlock_key(h);
621 
622 	return PJ_FALSE;
623     }
624 
625     return PJ_TRUE;
626 }
627 
628 
ioqueue_dispatch_exception_event(pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * h)629 pj_bool_t ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
630 					    pj_ioqueue_key_t *h )
631 {
632     pj_bool_t has_lock;
633     pj_status_t rc;
634 
635     /* Try lock the key. */
636     rc = pj_ioqueue_trylock_key(h);
637     if (rc != PJ_SUCCESS) {
638 	return PJ_FALSE;
639     }
640 
641     if (!h->connecting) {
642         /* It is possible that more than one thread was woken up, thus
643          * the remaining thread will see h->connecting as zero because
644          * it has been processed by other thread.
645          */
646 	pj_ioqueue_unlock_key(h);
647 	return PJ_TRUE;
648     }
649 
650     if (IS_CLOSING(h)) {
651 	pj_ioqueue_unlock_key(h);
652 	return PJ_TRUE;
653     }
654 
655     /* Clear operation. */
656     h->connecting = 0;
657 
658     ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
659     ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
660 
661     /* Unlock; from this point we don't need to hold key's mutex
662      * (unless concurrency is disabled, which in this case we should
663      * hold the mutex while calling the callback) */
664     if (h->allow_concurrent) {
665 	/* concurrency may be changed while we're in the callback, so
666 	 * save it to a flag.
667 	 */
668 	has_lock = PJ_FALSE;
669 	pj_ioqueue_unlock_key(h);
670 	PJ_RACE_ME(5);
671     } else {
672 	has_lock = PJ_TRUE;
673     }
674 
675     /* Call callback. */
676     if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
677 	pj_status_t status = -1;
678 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
679 	int value;
680 	int vallen = sizeof(value);
681 	int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
682 				       &value, &vallen);
683 	if (gs_rc == 0) {
684 	    status = PJ_RETURN_OS_ERROR(value);
685 	}
686 #endif
687 
688 	(*h->cb.on_connect_complete)(h, status);
689     }
690 
691     if (has_lock) {
692 	pj_ioqueue_unlock_key(h);
693     }
694 
695     return PJ_TRUE;
696 }
697 
698 /*
699  * pj_ioqueue_recv()
700  *
701  * Start asynchronous recv() from the socket.
702  */
pj_ioqueue_recv(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,void * buffer,pj_ssize_t * length,unsigned flags)703 PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,
704                                       pj_ioqueue_op_key_t *op_key,
705 				      void *buffer,
706 				      pj_ssize_t *length,
707 				      unsigned flags )
708 {
709     struct read_operation *read_op;
710 
711     PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
712     PJ_CHECK_STACK();
713 
714     /* Check if key is closing (need to do this first before accessing
715      * other variables, since they might have been destroyed. See ticket
716      * #469).
717      */
718     if (IS_CLOSING(key))
719 	return PJ_ECANCELLED;
720 
721     read_op = (struct read_operation*)op_key;
722     PJ_ASSERT_RETURN(read_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
723     read_op->op = PJ_IOQUEUE_OP_NONE;
724 
725     /* Try to see if there's data immediately available.
726      */
727     if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
728 	pj_status_t status;
729 	pj_ssize_t size;
730 
731 	size = *length;
732 	status = pj_sock_recv(key->fd, buffer, &size, flags);
733 	if (status == PJ_SUCCESS) {
734 	    /* Yes! Data is available! */
735 	    *length = size;
736 	    return PJ_SUCCESS;
737 	} else {
738 	    /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
739 	     * the error to caller.
740 	     */
741 	    if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
742 		return status;
743 	}
744     }
745 
746     flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
747 
748     /*
749      * No data is immediately available.
750      * Must schedule asynchronous operation to the ioqueue.
751      */
752     read_op->op = PJ_IOQUEUE_OP_RECV;
753     read_op->buf = buffer;
754     read_op->size = *length;
755     read_op->flags = flags;
756 
757     pj_ioqueue_lock_key(key);
758     /* Check again. Handle may have been closed after the previous check
759      * in multithreaded app. If we add bad handle to the set it will
760      * corrupt the ioqueue set. See #913
761      */
762     if (IS_CLOSING(key)) {
763 	pj_ioqueue_unlock_key(key);
764 	return PJ_ECANCELLED;
765     }
766     pj_list_insert_before(&key->read_list, read_op);
767     ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
768     pj_ioqueue_unlock_key(key);
769 
770     return PJ_EPENDING;
771 }
772 
773 /*
774  * pj_ioqueue_recvfrom()
775  *
776  * Start asynchronous recvfrom() from the socket.
777  */
pj_ioqueue_recvfrom(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,void * buffer,pj_ssize_t * length,unsigned flags,pj_sockaddr_t * addr,int * addrlen)778 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
779                                          pj_ioqueue_op_key_t *op_key,
780 				         void *buffer,
781 				         pj_ssize_t *length,
782                                          unsigned flags,
783 				         pj_sockaddr_t *addr,
784 				         int *addrlen)
785 {
786     struct read_operation *read_op;
787 
788     PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
789     PJ_CHECK_STACK();
790 
791     /* Check if key is closing. */
792     if (IS_CLOSING(key))
793 	return PJ_ECANCELLED;
794 
795     read_op = (struct read_operation*)op_key;
796     PJ_ASSERT_RETURN(read_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
797     read_op->op = PJ_IOQUEUE_OP_NONE;
798 
799     /* Try to see if there's data immediately available.
800      */
801     if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
802 	pj_status_t status;
803 	pj_ssize_t size;
804 
805 	size = *length;
806 	status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
807 				  addr, addrlen);
808 	if (status == PJ_SUCCESS) {
809 	    /* Yes! Data is available! */
810 	    *length = size;
811 	    return PJ_SUCCESS;
812 	} else {
813 	    /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
814 	     * the error to caller.
815 	     */
816 	    if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
817 		return status;
818 	}
819     }
820 
821     flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
822 
823     /*
824      * No data is immediately available.
825      * Must schedule asynchronous operation to the ioqueue.
826      */
827     read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
828     read_op->buf = buffer;
829     read_op->size = *length;
830     read_op->flags = flags;
831     read_op->rmt_addr = addr;
832     read_op->rmt_addrlen = addrlen;
833 
834     pj_ioqueue_lock_key(key);
835     /* Check again. Handle may have been closed after the previous check
836      * in multithreaded app. If we add bad handle to the set it will
837      * corrupt the ioqueue set. See #913
838      */
839     if (IS_CLOSING(key)) {
840 	pj_ioqueue_unlock_key(key);
841 	return PJ_ECANCELLED;
842     }
843     pj_list_insert_before(&key->read_list, read_op);
844     ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
845     pj_ioqueue_unlock_key(key);
846 
847     return PJ_EPENDING;
848 }
849 
850 /*
851  * pj_ioqueue_send()
852  *
853  * Start asynchronous send() to the descriptor.
854  */
pj_ioqueue_send(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,const void * data,pj_ssize_t * length,unsigned flags)855 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
856                                      pj_ioqueue_op_key_t *op_key,
857 			             const void *data,
858 			             pj_ssize_t *length,
859                                      unsigned flags)
860 {
861     struct write_operation *write_op;
862     pj_status_t status;
863     unsigned retry;
864     pj_ssize_t sent;
865 
866     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
867     PJ_CHECK_STACK();
868 
869     /* Check if key is closing. */
870     if (IS_CLOSING(key))
871 	return PJ_ECANCELLED;
872 
873     /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
874     flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
875 
876     /* Fast track:
877      *   Try to send data immediately, only if there's no pending write!
878      * Note:
879      *  We are speculating that the list is empty here without properly
880      *  acquiring ioqueue's mutex first. This is intentional, to maximize
881      *  performance via parallelism.
882      *
883      *  This should be safe, because:
884      *      - by convention, we require caller to make sure that the
885      *        key is not unregistered while other threads are invoking
886      *        an operation on the same key.
887      *      - pj_list_empty() is safe to be invoked by multiple threads,
888      *        even when other threads are modifying the list.
889      */
890     if (pj_list_empty(&key->write_list)) {
891         /*
892          * See if data can be sent immediately.
893          */
894         sent = *length;
895         status = pj_sock_send(key->fd, data, &sent, flags);
896         if (status == PJ_SUCCESS) {
897             /* Success! */
898             *length = sent;
899             return PJ_SUCCESS;
900         } else {
901             /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
902              * the error to caller.
903              */
904             if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
905                 return status;
906             }
907         }
908     }
909 
910     /*
911      * Schedule asynchronous send.
912      */
913     write_op = (struct write_operation*)op_key;
914 
915     /* Spin if write_op has pending operation */
916     for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
917 	pj_thread_sleep(0);
918 
919     /* Last chance */
920     if (write_op->op) {
921 	/* Unable to send packet because there is already pending write in the
922 	 * write_op. We could not put the operation into the write_op
923 	 * because write_op already contains a pending operation! And
924 	 * we could not send the packet directly with send() either,
925 	 * because that will break the order of the packet. So we can
926 	 * only return error here.
927 	 *
928 	 * This could happen for example in multithreads program,
929 	 * where polling is done by one thread, while other threads are doing
930 	 * the sending only. If the polling thread runs on lower priority
931 	 * than the sending thread, then it's possible that the pending
932 	 * write flag is not cleared in-time because clearing is only done
933 	 * during polling.
934 	 *
935 	 * Aplication should specify multiple write operation keys on
936 	 * situation like this.
937 	 */
938 	//pj_assert(!"ioqueue: there is pending operation on this key!");
939 	return PJ_EBUSY;
940     }
941 
942     write_op->op = PJ_IOQUEUE_OP_SEND;
943     write_op->buf = (char*)data;
944     write_op->size = *length;
945     write_op->written = 0;
946     write_op->flags = flags;
947 
948     pj_ioqueue_lock_key(key);
949     /* Check again. Handle may have been closed after the previous check
950      * in multithreaded app. If we add bad handle to the set it will
951      * corrupt the ioqueue set. See #913
952      */
953     if (IS_CLOSING(key)) {
954 	pj_ioqueue_unlock_key(key);
955 	return PJ_ECANCELLED;
956     }
957     pj_list_insert_before(&key->write_list, write_op);
958     ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
959     pj_ioqueue_unlock_key(key);
960 
961     return PJ_EPENDING;
962 }
963 
964 
965 /*
966  * pj_ioqueue_sendto()
967  *
968  * Start asynchronous write() to the descriptor.
969  */
pj_ioqueue_sendto(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,const void * data,pj_ssize_t * length,pj_uint32_t flags,const pj_sockaddr_t * addr,int addrlen)970 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
971                                        pj_ioqueue_op_key_t *op_key,
972 			               const void *data,
973 			               pj_ssize_t *length,
974                                        pj_uint32_t flags,
975 			               const pj_sockaddr_t *addr,
976 			               int addrlen)
977 {
978     struct write_operation *write_op;
979     unsigned retry;
980     pj_bool_t restart_retry = PJ_FALSE;
981     pj_status_t status;
982     pj_ssize_t sent;
983 
984     PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
985     PJ_CHECK_STACK();
986 
987 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
988 	    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
989 retry_on_restart:
990 #else
991     PJ_UNUSED_ARG(restart_retry);
992 #endif
993     /* Check if key is closing. */
994     if (IS_CLOSING(key))
995 	return PJ_ECANCELLED;
996 
997     /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
998     flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
999 
1000     /* Fast track:
1001      *   Try to send data immediately, only if there's no pending write!
1002      * Note:
1003      *  We are speculating that the list is empty here without properly
1004      *  acquiring ioqueue's mutex first. This is intentional, to maximize
1005      *  performance via parallelism.
1006      *
1007      *  This should be safe, because:
1008      *      - by convention, we require caller to make sure that the
1009      *        key is not unregistered while other threads are invoking
1010      *        an operation on the same key.
1011      *      - pj_list_empty() is safe to be invoked by multiple threads,
1012      *        even when other threads are modifying the list.
1013      */
1014     if (pj_list_empty(&key->write_list)) {
1015         /*
1016          * See if data can be sent immediately.
1017          */
1018         sent = *length;
1019         status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
1020         if (status == PJ_SUCCESS) {
1021             /* Success! */
1022             *length = sent;
1023             return PJ_SUCCESS;
1024         } else {
1025             /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1026              * the error to caller.
1027              */
1028             if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1029 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
1030 	    PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
1031 		/* Special treatment for dead UDP sockets here, see ticket #1107 */
1032 		if (status == PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) &&
1033 		    key->fd_type == pj_SOCK_DGRAM())
1034 		{
1035 		    if (!restart_retry) {
1036 			PJ_PERROR(4, (THIS_FILE, status,
1037 				      "Send error for socket %d, retrying",
1038 				      key->fd));
1039 			status = replace_udp_sock(key);
1040 			if (status == PJ_SUCCESS) {
1041 			    restart_retry = PJ_TRUE;
1042 			    goto retry_on_restart;
1043 			}
1044 		    }
1045 		    status = PJ_ESOCKETSTOP;
1046 		}
1047 #endif
1048                 return status;
1049             }
1050         }
1051     }
1052 
1053     /*
1054      * Check that address storage can hold the address parameter.
1055      */
1056     PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
1057 
1058     /*
1059      * Schedule asynchronous send.
1060      */
1061     write_op = (struct write_operation*)op_key;
1062 
1063     /* Spin if write_op has pending operation */
1064     for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
1065 	pj_thread_sleep(0);
1066 
1067     /* Last chance */
1068     if (write_op->op) {
1069 	/* Unable to send packet because there is already pending write on the
1070 	 * write_op. We could not put the operation into the write_op
1071 	 * because write_op already contains a pending operation! And
1072 	 * we could not send the packet directly with sendto() either,
1073 	 * because that will break the order of the packet. So we can
1074 	 * only return error here.
1075 	 *
1076 	 * This could happen for example in multithreads program,
1077 	 * where polling is done by one thread, while other threads are doing
1078 	 * the sending only. If the polling thread runs on lower priority
1079 	 * than the sending thread, then it's possible that the pending
1080 	 * write flag is not cleared in-time because clearing is only done
1081 	 * during polling.
1082 	 *
1083 	 * Aplication should specify multiple write operation keys on
1084 	 * situation like this.
1085 	 */
1086 	//pj_assert(!"ioqueue: there is pending operation on this key!");
1087 	return PJ_EBUSY;
1088     }
1089 
1090     write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1091     write_op->buf = (char*)data;
1092     write_op->size = *length;
1093     write_op->written = 0;
1094     write_op->flags = flags;
1095     pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1096     write_op->rmt_addrlen = addrlen;
1097 
1098     pj_ioqueue_lock_key(key);
1099     /* Check again. Handle may have been closed after the previous check
1100      * in multithreaded app. If we add bad handle to the set it will
1101      * corrupt the ioqueue set. See #913
1102      */
1103     if (IS_CLOSING(key)) {
1104 	pj_ioqueue_unlock_key(key);
1105 	return PJ_ECANCELLED;
1106     }
1107     pj_list_insert_before(&key->write_list, write_op);
1108     ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1109     pj_ioqueue_unlock_key(key);
1110 
1111     return PJ_EPENDING;
1112 }
1113 
1114 #if PJ_HAS_TCP
1115 /*
1116  * Initiate overlapped accept() operation.
1117  */
pj_ioqueue_accept(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_sock_t * new_sock,pj_sockaddr_t * local,pj_sockaddr_t * remote,int * addrlen)1118 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1119                                        pj_ioqueue_op_key_t *op_key,
1120 			               pj_sock_t *new_sock,
1121 			               pj_sockaddr_t *local,
1122 			               pj_sockaddr_t *remote,
1123 			               int *addrlen)
1124 {
1125     struct accept_operation *accept_op;
1126     pj_status_t status;
1127 
1128     /* check parameters. All must be specified! */
1129     PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1130 
1131     /* Check if key is closing. */
1132     if (IS_CLOSING(key))
1133 	return PJ_ECANCELLED;
1134 
1135     accept_op = (struct accept_operation*)op_key;
1136     PJ_ASSERT_RETURN(accept_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
1137     accept_op->op = PJ_IOQUEUE_OP_NONE;
1138 
1139     /* Fast track:
1140      *  See if there's new connection available immediately.
1141      */
1142     if (pj_list_empty(&key->accept_list)) {
1143         status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1144         if (status == PJ_SUCCESS) {
1145             /* Yes! New connection is available! */
1146             if (local && addrlen) {
1147                 status = pj_sock_getsockname(*new_sock, local, addrlen);
1148                 if (status != PJ_SUCCESS) {
1149                     pj_sock_close(*new_sock);
1150                     *new_sock = PJ_INVALID_SOCKET;
1151                     return status;
1152                 }
1153             }
1154             return PJ_SUCCESS;
1155         } else {
1156             /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1157              * the error to caller.
1158              */
1159             if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1160                 return status;
1161             }
1162         }
1163     }
1164 
1165     /*
1166      * No connection is available immediately.
1167      * Schedule accept() operation to be completed when there is incoming
1168      * connection available.
1169      */
1170     accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1171     accept_op->accept_fd = new_sock;
1172     accept_op->rmt_addr = remote;
1173     accept_op->addrlen= addrlen;
1174     accept_op->local_addr = local;
1175 
1176     pj_ioqueue_lock_key(key);
1177     /* Check again. Handle may have been closed after the previous check
1178      * in multithreaded app. If we add bad handle to the set it will
1179      * corrupt the ioqueue set. See #913
1180      */
1181     if (IS_CLOSING(key)) {
1182 	pj_ioqueue_unlock_key(key);
1183 	return PJ_ECANCELLED;
1184     }
1185     pj_list_insert_before(&key->accept_list, accept_op);
1186     ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
1187     pj_ioqueue_unlock_key(key);
1188 
1189     return PJ_EPENDING;
1190 }
1191 
1192 /*
1193  * Initiate overlapped connect() operation (well, it's non-blocking actually,
1194  * since there's no overlapped version of connect()).
1195  */
pj_ioqueue_connect(pj_ioqueue_key_t * key,const pj_sockaddr_t * addr,int addrlen)1196 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1197 					const pj_sockaddr_t *addr,
1198 					int addrlen )
1199 {
1200     pj_status_t status;
1201 
1202     /* check parameters. All must be specified! */
1203     PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1204 
1205     /* Check if key is closing. */
1206     if (IS_CLOSING(key))
1207 	return PJ_ECANCELLED;
1208 
1209     /* Check if socket has not been marked for connecting */
1210     if (key->connecting != 0)
1211         return PJ_EPENDING;
1212 
1213     status = pj_sock_connect(key->fd, addr, addrlen);
1214     if (status == PJ_SUCCESS) {
1215 	/* Connected! */
1216 	return PJ_SUCCESS;
1217     } else {
1218 	if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1219 	    /* Pending! */
1220 	    pj_ioqueue_lock_key(key);
1221 	    /* Check again. Handle may have been closed after the previous
1222 	     * check in multithreaded app. See #913
1223 	     */
1224 	    if (IS_CLOSING(key)) {
1225 		pj_ioqueue_unlock_key(key);
1226 		return PJ_ECANCELLED;
1227 	    }
1228 	    key->connecting = PJ_TRUE;
1229             ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1230             ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
1231             pj_ioqueue_unlock_key(key);
1232 	    return PJ_EPENDING;
1233 	} else {
1234 	    /* Error! */
1235 	    return status;
1236 	}
1237     }
1238 }
1239 #endif	/* PJ_HAS_TCP */
1240 
1241 
pj_ioqueue_op_key_init(pj_ioqueue_op_key_t * op_key,pj_size_t size)1242 PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1243 				     pj_size_t size )
1244 {
1245     pj_bzero(op_key, size);
1246 }
1247 
1248 
1249 /*
1250  * pj_ioqueue_is_pending()
1251  */
pj_ioqueue_is_pending(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key)1252 PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1253                                          pj_ioqueue_op_key_t *op_key )
1254 {
1255     struct generic_operation *op_rec;
1256 
1257     PJ_UNUSED_ARG(key);
1258 
1259     op_rec = (struct generic_operation*)op_key;
1260     return op_rec->op != 0;
1261 }
1262 
1263 
1264 /*
1265  * pj_ioqueue_post_completion()
1266  */
pj_ioqueue_post_completion(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_status)1267 PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1268                                                 pj_ioqueue_op_key_t *op_key,
1269                                                 pj_ssize_t bytes_status )
1270 {
1271     struct generic_operation *op_rec;
1272 
1273     /*
1274      * Find the operation key in all pending operation list to
1275      * really make sure that it's still there; then call the callback.
1276      */
1277     pj_ioqueue_lock_key(key);
1278 
1279     /* Find the operation in the pending read list. */
1280     op_rec = (struct generic_operation*)key->read_list.next;
1281     while (op_rec != (void*)&key->read_list) {
1282         if (op_rec == (void*)op_key) {
1283             pj_list_erase(op_rec);
1284             op_rec->op = PJ_IOQUEUE_OP_NONE;
1285             ioqueue_remove_from_set(key->ioqueue, key, READABLE_EVENT);
1286             pj_ioqueue_unlock_key(key);
1287 
1288             if (key->cb.on_read_complete)
1289             	(*key->cb.on_read_complete)(key, op_key, bytes_status);
1290             return PJ_SUCCESS;
1291         }
1292         op_rec = op_rec->next;
1293     }
1294 
1295     /* Find the operation in the pending write list. */
1296     op_rec = (struct generic_operation*)key->write_list.next;
1297     while (op_rec != (void*)&key->write_list) {
1298         if (op_rec == (void*)op_key) {
1299             pj_list_erase(op_rec);
1300             op_rec->op = PJ_IOQUEUE_OP_NONE;
1301             ioqueue_remove_from_set(key->ioqueue, key, WRITEABLE_EVENT);
1302             pj_ioqueue_unlock_key(key);
1303 
1304             if (key->cb.on_write_complete)
1305             	(*key->cb.on_write_complete)(key, op_key, bytes_status);
1306             return PJ_SUCCESS;
1307         }
1308         op_rec = op_rec->next;
1309     }
1310 
1311     /* Find the operation in the pending accept list. */
1312     op_rec = (struct generic_operation*)key->accept_list.next;
1313     while (op_rec != (void*)&key->accept_list) {
1314         if (op_rec == (void*)op_key) {
1315             pj_list_erase(op_rec);
1316             op_rec->op = PJ_IOQUEUE_OP_NONE;
1317             pj_ioqueue_unlock_key(key);
1318 
1319             if (key->cb.on_accept_complete) {
1320             	(*key->cb.on_accept_complete)(key, op_key,
1321                                               PJ_INVALID_SOCKET,
1322                                               (pj_status_t)bytes_status);
1323             }
1324             return PJ_SUCCESS;
1325         }
1326         op_rec = op_rec->next;
1327     }
1328 
1329     /* Clear connecting operation. */
1330     if (key->connecting) {
1331         key->connecting = 0;
1332         ioqueue_remove_from_set(key->ioqueue, key, WRITEABLE_EVENT);
1333         ioqueue_remove_from_set(key->ioqueue, key, EXCEPTION_EVENT);
1334     }
1335 
1336     pj_ioqueue_unlock_key(key);
1337 
1338     return PJ_EINVALIDOP;
1339 }
1340 
pj_ioqueue_set_default_concurrency(pj_ioqueue_t * ioqueue,pj_bool_t allow)1341 PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1342 							pj_bool_t allow)
1343 {
1344     PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1345     ioqueue->default_concurrency = allow;
1346     return PJ_SUCCESS;
1347 }
1348 
1349 
pj_ioqueue_set_concurrency(pj_ioqueue_key_t * key,pj_bool_t allow)1350 PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1351 					       pj_bool_t allow)
1352 {
1353     PJ_ASSERT_RETURN(key, PJ_EINVAL);
1354 
1355     /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1356      * disabled.
1357      */
1358     PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1359 
1360     key->allow_concurrent = allow;
1361     return PJ_SUCCESS;
1362 }
1363 
pj_ioqueue_lock_key(pj_ioqueue_key_t * key)1364 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1365 {
1366     if (key->grp_lock)
1367 	return pj_grp_lock_acquire(key->grp_lock);
1368     else
1369 	return pj_lock_acquire(key->lock);
1370 }
1371 
pj_ioqueue_trylock_key(pj_ioqueue_key_t * key)1372 PJ_DEF(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key)
1373 {
1374     if (key->grp_lock)
1375 	return pj_grp_lock_tryacquire(key->grp_lock);
1376     else
1377 	return pj_lock_tryacquire(key->lock);
1378 }
1379 
pj_ioqueue_unlock_key(pj_ioqueue_key_t * key)1380 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1381 {
1382     if (key->grp_lock)
1383 	return pj_grp_lock_release(key->grp_lock);
1384     else
1385 	return pj_lock_release(key->lock);
1386 }
1387 
1388 
1389