1 /* $Id$ */
2 /*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20
21 /*
22 * ioqueue_common_abs.c
23 *
24 * This contains common functionalities to emulate proactor pattern with
25 * various event dispatching mechanisms (e.g. select, epoll).
26 *
27 * This file will be included by the appropriate ioqueue implementation.
28 * This file is NOT supposed to be compiled as stand-alone source.
29 */
30
31 #define PENDING_RETRY 2
32
ioqueue_init(pj_ioqueue_t * ioqueue)33 static void ioqueue_init( pj_ioqueue_t *ioqueue )
34 {
35 ioqueue->lock = NULL;
36 ioqueue->auto_delete_lock = 0;
37 ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
38 }
39
ioqueue_destroy(pj_ioqueue_t * ioqueue)40 static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
41 {
42 if (ioqueue->auto_delete_lock && ioqueue->lock ) {
43 pj_lock_release(ioqueue->lock);
44 return pj_lock_destroy(ioqueue->lock);
45 }
46
47 return PJ_SUCCESS;
48 }
49
50 /*
51 * pj_ioqueue_set_lock()
52 */
pj_ioqueue_set_lock(pj_ioqueue_t * ioqueue,pj_lock_t * lock,pj_bool_t auto_delete)53 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
54 pj_lock_t *lock,
55 pj_bool_t auto_delete )
56 {
57 PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
58
59 if (ioqueue->auto_delete_lock && ioqueue->lock) {
60 pj_lock_destroy(ioqueue->lock);
61 }
62
63 ioqueue->lock = lock;
64 ioqueue->auto_delete_lock = auto_delete;
65
66 return PJ_SUCCESS;
67 }
68
ioqueue_init_key(pj_pool_t * pool,pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * key,pj_sock_t sock,pj_grp_lock_t * grp_lock,void * user_data,const pj_ioqueue_callback * cb)69 static pj_status_t ioqueue_init_key( pj_pool_t *pool,
70 pj_ioqueue_t *ioqueue,
71 pj_ioqueue_key_t *key,
72 pj_sock_t sock,
73 pj_grp_lock_t *grp_lock,
74 void *user_data,
75 const pj_ioqueue_callback *cb)
76 {
77 pj_status_t rc;
78 int optlen;
79
80 PJ_UNUSED_ARG(pool);
81
82 key->ioqueue = ioqueue;
83 key->fd = sock;
84 key->user_data = user_data;
85 pj_list_init(&key->read_list);
86 pj_list_init(&key->write_list);
87 #if PJ_HAS_TCP
88 pj_list_init(&key->accept_list);
89 key->connecting = 0;
90 #endif
91
92 /* Save callback. */
93 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
94
95 #if PJ_IOQUEUE_HAS_SAFE_UNREG
96 /* Set initial reference count to 1 */
97 pj_assert(key->ref_count == 0);
98 ++key->ref_count;
99
100 key->closing = 0;
101 #endif
102
103 rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
104 if (rc != PJ_SUCCESS)
105 return rc;
106
107 /* Get socket type. When socket type is datagram, some optimization
108 * will be performed during send to allow parallel send operations.
109 */
110 optlen = sizeof(key->fd_type);
111 rc = pj_sock_getsockopt(sock, pj_SOL_SOCKET(), pj_SO_TYPE(),
112 &key->fd_type, &optlen);
113 if (rc != PJ_SUCCESS)
114 key->fd_type = pj_SOCK_STREAM();
115
116 /* Create mutex for the key. */
117 #if !PJ_IOQUEUE_HAS_SAFE_UNREG
118 rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock);
119 if (rc != PJ_SUCCESS)
120 return rc;
121 #endif
122
123 /* Group lock */
124 key->grp_lock = grp_lock;
125 if (key->grp_lock) {
126 pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0);
127 }
128
129 return PJ_SUCCESS;
130 }
131
132 /*
133 * pj_ioqueue_get_user_data()
134 *
135 * Obtain value associated with a key.
136 */
pj_ioqueue_get_user_data(pj_ioqueue_key_t * key)137 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
138 {
139 PJ_ASSERT_RETURN(key != NULL, NULL);
140 return key->user_data;
141 }
142
143 /*
144 * pj_ioqueue_set_user_data()
145 */
pj_ioqueue_set_user_data(pj_ioqueue_key_t * key,void * user_data,void ** old_data)146 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
147 void *user_data,
148 void **old_data)
149 {
150 PJ_ASSERT_RETURN(key, PJ_EINVAL);
151
152 if (old_data)
153 *old_data = key->user_data;
154 key->user_data = user_data;
155
156 return PJ_SUCCESS;
157 }
158
key_has_pending_write(pj_ioqueue_key_t * key)159 PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
160 {
161 return !pj_list_empty(&key->write_list);
162 }
163
key_has_pending_read(pj_ioqueue_key_t * key)164 PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
165 {
166 return !pj_list_empty(&key->read_list);
167 }
168
key_has_pending_accept(pj_ioqueue_key_t * key)169 PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
170 {
171 #if PJ_HAS_TCP
172 return !pj_list_empty(&key->accept_list);
173 #else
174 PJ_UNUSED_ARG(key);
175 return 0;
176 #endif
177 }
178
key_has_pending_connect(pj_ioqueue_key_t * key)179 PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
180 {
181 return key->connecting;
182 }
183
184
185 #if PJ_IOQUEUE_HAS_SAFE_UNREG
186 # define IS_CLOSING(key) (key->closing)
187 #else
188 # define IS_CLOSING(key) (0)
189 #endif
190
191
192 /*
193 * ioqueue_dispatch_event()
194 *
195 * Report occurence of an event in the key to be processed by the
196 * framework.
197 */
ioqueue_dispatch_write_event(pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * h)198 pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue,
199 pj_ioqueue_key_t *h)
200 {
201 pj_status_t rc;
202
203 /* Try lock the key. */
204 rc = pj_ioqueue_trylock_key(h);
205 if (rc != PJ_SUCCESS) {
206 return PJ_FALSE;
207 }
208
209 if (IS_CLOSING(h)) {
210 pj_ioqueue_unlock_key(h);
211 return PJ_TRUE;
212 }
213
214 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
215 if (h->connecting) {
216 /* Completion of connect() operation */
217 pj_status_t status;
218 pj_bool_t has_lock;
219
220 /* Clear operation. */
221 h->connecting = 0;
222
223 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
224 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
225
226
227 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
228 /* from connect(2):
229 * On Linux, use getsockopt to read the SO_ERROR option at
230 * level SOL_SOCKET to determine whether connect() completed
231 * successfully (if SO_ERROR is zero).
232 */
233 {
234 int value;
235 int vallen = sizeof(value);
236 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
237 &value, &vallen);
238 if (gs_rc != 0) {
239 /* Argh!! What to do now???
240 * Just indicate that the socket is connected. The
241 * application will get error as soon as it tries to use
242 * the socket to send/receive.
243 */
244 status = PJ_SUCCESS;
245 } else {
246 status = PJ_STATUS_FROM_OS(value);
247 }
248 }
249 #elif (defined(PJ_WIN32) && PJ_WIN32!=0) || (defined(PJ_WIN64) && PJ_WIN64!=0)
250 status = PJ_SUCCESS; /* success */
251 #else
252 /* Excellent information in D.J. Bernstein page:
253 * http://cr.yp.to/docs/connect.html
254 *
255 * Seems like the most portable way of detecting connect()
256 * failure is to call getpeername(). If socket is connected,
257 * getpeername() will return 0. If the socket is not connected,
258 * it will return ENOTCONN, and read(fd, &ch, 1) will produce
259 * the right errno through error slippage. This is a combination
260 * of suggestions from Douglas C. Schmidt and Ken Keys.
261 */
262 {
263 struct sockaddr_in addr;
264 int addrlen = sizeof(addr);
265
266 status = pj_sock_getpeername(h->fd, (struct sockaddr*)&addr,
267 &addrlen);
268 }
269 #endif
270
271 /* Unlock; from this point we don't need to hold key's mutex
272 * (unless concurrency is disabled, which in this case we should
273 * hold the mutex while calling the callback) */
274 if (h->allow_concurrent) {
275 /* concurrency may be changed while we're in the callback, so
276 * save it to a flag.
277 */
278 has_lock = PJ_FALSE;
279 pj_ioqueue_unlock_key(h);
280 } else {
281 has_lock = PJ_TRUE;
282 }
283
284 /* Call callback. */
285 if (h->cb.on_connect_complete && !IS_CLOSING(h))
286 (*h->cb.on_connect_complete)(h, status);
287
288 /* Unlock if we still hold the lock */
289 if (has_lock) {
290 pj_ioqueue_unlock_key(h);
291 }
292
293 /* Done. */
294
295 } else
296 #endif /* PJ_HAS_TCP */
297 if (key_has_pending_write(h)) {
298 /* Socket is writable. */
299 struct write_operation *write_op;
300 pj_ssize_t sent;
301 pj_status_t send_rc = PJ_SUCCESS;
302
303 /* Get the first in the queue. */
304 write_op = h->write_list.next;
305
306 /* For datagrams, we can remove the write_op from the list
307 * so that send() can work in parallel.
308 */
309 if (h->fd_type == pj_SOCK_DGRAM()) {
310 pj_list_erase(write_op);
311
312 if (pj_list_empty(&h->write_list))
313 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
314
315 }
316
317 /* Send the data.
318 * Unfortunately we must do this while holding key's mutex, thus
319 * preventing parallel write on a single key.. :-((
320 */
321 sent = write_op->size - write_op->written;
322 if (write_op->op == PJ_IOQUEUE_OP_SEND) {
323 send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
324 &sent, write_op->flags);
325 /* Can't do this. We only clear "op" after we're finished sending
326 * the whole buffer.
327 */
328 //write_op->op = 0;
329 } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
330 int retry = 2;
331 while (--retry >= 0) {
332 send_rc = pj_sock_sendto(h->fd,
333 write_op->buf+write_op->written,
334 &sent, write_op->flags,
335 &write_op->rmt_addr,
336 write_op->rmt_addrlen);
337 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
338 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
339 /* Special treatment for dead UDP sockets here, see ticket #1107 */
340 if (send_rc==PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(h) &&
341 h->fd_type==pj_SOCK_DGRAM())
342 {
343 PJ_PERROR(4,(THIS_FILE, send_rc,
344 "Send error for socket %d, retrying",
345 h->fd));
346 send_rc = replace_udp_sock(h);
347 continue;
348 }
349 #endif
350 break;
351 }
352
353 /* Can't do this. We only clear "op" after we're finished sending
354 * the whole buffer.
355 */
356 //write_op->op = 0;
357 } else {
358 pj_assert(!"Invalid operation type!");
359 write_op->op = PJ_IOQUEUE_OP_NONE;
360 send_rc = PJ_EBUG;
361 }
362
363 if (send_rc == PJ_SUCCESS) {
364 write_op->written += sent;
365 } else {
366 pj_assert(send_rc > 0);
367 write_op->written = -send_rc;
368 }
369
370 /* Are we finished with this buffer? */
371 if (send_rc!=PJ_SUCCESS ||
372 write_op->written == (pj_ssize_t)write_op->size ||
373 h->fd_type == pj_SOCK_DGRAM())
374 {
375 pj_bool_t has_lock;
376
377 write_op->op = PJ_IOQUEUE_OP_NONE;
378
379 if (h->fd_type != pj_SOCK_DGRAM()) {
380 /* Write completion of the whole stream. */
381 pj_list_erase(write_op);
382
383 /* Clear operation if there's no more data to send. */
384 if (pj_list_empty(&h->write_list))
385 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
386
387 }
388
389 /* Unlock; from this point we don't need to hold key's mutex
390 * (unless concurrency is disabled, which in this case we should
391 * hold the mutex while calling the callback) */
392 if (h->allow_concurrent) {
393 /* concurrency may be changed while we're in the callback, so
394 * save it to a flag.
395 */
396 has_lock = PJ_FALSE;
397 pj_ioqueue_unlock_key(h);
398 PJ_RACE_ME(5);
399 } else {
400 has_lock = PJ_TRUE;
401 }
402
403 /* Call callback. */
404 if (h->cb.on_write_complete && !IS_CLOSING(h)) {
405 (*h->cb.on_write_complete)(h,
406 (pj_ioqueue_op_key_t*)write_op,
407 write_op->written);
408 }
409
410 if (has_lock) {
411 pj_ioqueue_unlock_key(h);
412 }
413
414 } else {
415 pj_ioqueue_unlock_key(h);
416 }
417
418 /* Done. */
419 } else {
420 /*
421 * This is normal; execution may fall here when multiple threads
422 * are signalled for the same event, but only one thread eventually
423 * able to process the event.
424 */
425 pj_ioqueue_unlock_key(h);
426
427 return PJ_FALSE;
428 }
429
430 return PJ_TRUE;
431 }
432
ioqueue_dispatch_read_event(pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * h)433 pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue,
434 pj_ioqueue_key_t *h )
435 {
436 pj_status_t rc;
437
438 /* Try lock the key. */
439 rc = pj_ioqueue_trylock_key(h);
440 if (rc != PJ_SUCCESS) {
441 return PJ_FALSE;
442 }
443
444 if (IS_CLOSING(h)) {
445 pj_ioqueue_unlock_key(h);
446 return PJ_TRUE;
447 }
448
449 # if PJ_HAS_TCP
450 if (!pj_list_empty(&h->accept_list)) {
451
452 struct accept_operation *accept_op;
453 pj_bool_t has_lock;
454
455 /* Get one accept operation from the list. */
456 accept_op = h->accept_list.next;
457 pj_list_erase(accept_op);
458 accept_op->op = PJ_IOQUEUE_OP_NONE;
459
460 /* Clear bit in fdset if there is no more pending accept */
461 if (pj_list_empty(&h->accept_list))
462 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
463
464 rc=pj_sock_accept(h->fd, accept_op->accept_fd,
465 accept_op->rmt_addr, accept_op->addrlen);
466 if (rc==PJ_SUCCESS && accept_op->local_addr) {
467 rc = pj_sock_getsockname(*accept_op->accept_fd,
468 accept_op->local_addr,
469 accept_op->addrlen);
470 }
471
472 /* Unlock; from this point we don't need to hold key's mutex
473 * (unless concurrency is disabled, which in this case we should
474 * hold the mutex while calling the callback) */
475 if (h->allow_concurrent) {
476 /* concurrency may be changed while we're in the callback, so
477 * save it to a flag.
478 */
479 has_lock = PJ_FALSE;
480 pj_ioqueue_unlock_key(h);
481 PJ_RACE_ME(5);
482 } else {
483 has_lock = PJ_TRUE;
484 }
485
486 /* Call callback. */
487 if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
488 (*h->cb.on_accept_complete)(h,
489 (pj_ioqueue_op_key_t*)accept_op,
490 *accept_op->accept_fd, rc);
491 }
492
493 if (has_lock) {
494 pj_ioqueue_unlock_key(h);
495 }
496 }
497 else
498 # endif
499 if (key_has_pending_read(h)) {
500 struct read_operation *read_op;
501 pj_ssize_t bytes_read;
502 pj_bool_t has_lock;
503
504 /* Get one pending read operation from the list. */
505 read_op = h->read_list.next;
506 pj_list_erase(read_op);
507
508 /* Clear fdset if there is no pending read. */
509 if (pj_list_empty(&h->read_list))
510 ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
511
512 bytes_read = read_op->size;
513
514 if (read_op->op == PJ_IOQUEUE_OP_RECV_FROM) {
515 read_op->op = PJ_IOQUEUE_OP_NONE;
516 rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
517 read_op->flags,
518 read_op->rmt_addr,
519 read_op->rmt_addrlen);
520 } else if (read_op->op == PJ_IOQUEUE_OP_RECV) {
521 read_op->op = PJ_IOQUEUE_OP_NONE;
522 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
523 read_op->flags);
524 } else {
525 pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
526 read_op->op = PJ_IOQUEUE_OP_NONE;
527 /*
528 * User has specified pj_ioqueue_read().
529 * On Win32, we should do ReadFile(). But because we got
530 * here because of select() anyway, user must have put a
531 * socket descriptor on h->fd, which in this case we can
532 * just call pj_sock_recv() instead of ReadFile().
533 * On Unix, user may put a file in h->fd, so we'll have
534 * to call read() here.
535 * This may not compile on systems which doesn't have
536 * read(). That's why we only specify PJ_LINUX here so
537 * that error is easier to catch.
538 */
539 # if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
540 defined(PJ_WIN64) && PJ_WIN64 != 0 || \
541 defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
542 rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
543 read_op->flags);
544 //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
545 // &bytes_read, NULL);
546 # elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
547 bytes_read = read(h->fd, read_op->buf, bytes_read);
548 rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
549 # else
550 # error "Implement read() for this platform!"
551 # endif
552 }
553
554 if (rc != PJ_SUCCESS) {
555 # if (defined(PJ_WIN32) && PJ_WIN32 != 0) || \
556 (defined(PJ_WIN64) && PJ_WIN64 != 0)
557 /* On Win32, for UDP, WSAECONNRESET on the receive side
558 * indicates that previous sending has triggered ICMP Port
559 * Unreachable message.
560 * But we wouldn't know at this point which one of previous
561 * key that has triggered the error, since UDP socket can
562 * be shared!
563 * So we'll just ignore it!
564 */
565
566 if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
567 //PJ_LOG(4,(THIS_FILE,
568 // "Ignored ICMP port unreach. on key=%p", h));
569 }
570 # endif
571
572 /* In any case we would report this to caller. */
573 bytes_read = -rc;
574
575 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
576 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
577 /* Special treatment for dead UDP sockets here, see ticket #1107 */
578 if (rc == PJ_STATUS_FROM_OS(ENOTCONN) && !IS_CLOSING(h) &&
579 h->fd_type==pj_SOCK_DGRAM())
580 {
581 rc = replace_udp_sock(h);
582 if (rc != PJ_SUCCESS) {
583 bytes_read = -rc;
584 }
585 }
586 #endif
587 }
588
589 /* Unlock; from this point we don't need to hold key's mutex
590 * (unless concurrency is disabled, which in this case we should
591 * hold the mutex while calling the callback) */
592 if (h->allow_concurrent) {
593 /* concurrency may be changed while we're in the callback, so
594 * save it to a flag.
595 */
596 has_lock = PJ_FALSE;
597 pj_ioqueue_unlock_key(h);
598 PJ_RACE_ME(5);
599 } else {
600 has_lock = PJ_TRUE;
601 }
602
603 /* Call callback. */
604 if (h->cb.on_read_complete && !IS_CLOSING(h)) {
605 (*h->cb.on_read_complete)(h,
606 (pj_ioqueue_op_key_t*)read_op,
607 bytes_read);
608 }
609
610 if (has_lock) {
611 pj_ioqueue_unlock_key(h);
612 }
613
614 } else {
615 /*
616 * This is normal; execution may fall here when multiple threads
617 * are signalled for the same event, but only one thread eventually
618 * able to process the event.
619 */
620 pj_ioqueue_unlock_key(h);
621
622 return PJ_FALSE;
623 }
624
625 return PJ_TRUE;
626 }
627
628
ioqueue_dispatch_exception_event(pj_ioqueue_t * ioqueue,pj_ioqueue_key_t * h)629 pj_bool_t ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
630 pj_ioqueue_key_t *h )
631 {
632 pj_bool_t has_lock;
633 pj_status_t rc;
634
635 /* Try lock the key. */
636 rc = pj_ioqueue_trylock_key(h);
637 if (rc != PJ_SUCCESS) {
638 return PJ_FALSE;
639 }
640
641 if (!h->connecting) {
642 /* It is possible that more than one thread was woken up, thus
643 * the remaining thread will see h->connecting as zero because
644 * it has been processed by other thread.
645 */
646 pj_ioqueue_unlock_key(h);
647 return PJ_TRUE;
648 }
649
650 if (IS_CLOSING(h)) {
651 pj_ioqueue_unlock_key(h);
652 return PJ_TRUE;
653 }
654
655 /* Clear operation. */
656 h->connecting = 0;
657
658 ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
659 ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
660
661 /* Unlock; from this point we don't need to hold key's mutex
662 * (unless concurrency is disabled, which in this case we should
663 * hold the mutex while calling the callback) */
664 if (h->allow_concurrent) {
665 /* concurrency may be changed while we're in the callback, so
666 * save it to a flag.
667 */
668 has_lock = PJ_FALSE;
669 pj_ioqueue_unlock_key(h);
670 PJ_RACE_ME(5);
671 } else {
672 has_lock = PJ_TRUE;
673 }
674
675 /* Call callback. */
676 if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
677 pj_status_t status = -1;
678 #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
679 int value;
680 int vallen = sizeof(value);
681 int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
682 &value, &vallen);
683 if (gs_rc == 0) {
684 status = PJ_RETURN_OS_ERROR(value);
685 }
686 #endif
687
688 (*h->cb.on_connect_complete)(h, status);
689 }
690
691 if (has_lock) {
692 pj_ioqueue_unlock_key(h);
693 }
694
695 return PJ_TRUE;
696 }
697
698 /*
699 * pj_ioqueue_recv()
700 *
701 * Start asynchronous recv() from the socket.
702 */
pj_ioqueue_recv(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,void * buffer,pj_ssize_t * length,unsigned flags)703 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
704 pj_ioqueue_op_key_t *op_key,
705 void *buffer,
706 pj_ssize_t *length,
707 unsigned flags )
708 {
709 struct read_operation *read_op;
710
711 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
712 PJ_CHECK_STACK();
713
714 /* Check if key is closing (need to do this first before accessing
715 * other variables, since they might have been destroyed. See ticket
716 * #469).
717 */
718 if (IS_CLOSING(key))
719 return PJ_ECANCELLED;
720
721 read_op = (struct read_operation*)op_key;
722 PJ_ASSERT_RETURN(read_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
723 read_op->op = PJ_IOQUEUE_OP_NONE;
724
725 /* Try to see if there's data immediately available.
726 */
727 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
728 pj_status_t status;
729 pj_ssize_t size;
730
731 size = *length;
732 status = pj_sock_recv(key->fd, buffer, &size, flags);
733 if (status == PJ_SUCCESS) {
734 /* Yes! Data is available! */
735 *length = size;
736 return PJ_SUCCESS;
737 } else {
738 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
739 * the error to caller.
740 */
741 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
742 return status;
743 }
744 }
745
746 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
747
748 /*
749 * No data is immediately available.
750 * Must schedule asynchronous operation to the ioqueue.
751 */
752 read_op->op = PJ_IOQUEUE_OP_RECV;
753 read_op->buf = buffer;
754 read_op->size = *length;
755 read_op->flags = flags;
756
757 pj_ioqueue_lock_key(key);
758 /* Check again. Handle may have been closed after the previous check
759 * in multithreaded app. If we add bad handle to the set it will
760 * corrupt the ioqueue set. See #913
761 */
762 if (IS_CLOSING(key)) {
763 pj_ioqueue_unlock_key(key);
764 return PJ_ECANCELLED;
765 }
766 pj_list_insert_before(&key->read_list, read_op);
767 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
768 pj_ioqueue_unlock_key(key);
769
770 return PJ_EPENDING;
771 }
772
773 /*
774 * pj_ioqueue_recvfrom()
775 *
776 * Start asynchronous recvfrom() from the socket.
777 */
pj_ioqueue_recvfrom(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,void * buffer,pj_ssize_t * length,unsigned flags,pj_sockaddr_t * addr,int * addrlen)778 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
779 pj_ioqueue_op_key_t *op_key,
780 void *buffer,
781 pj_ssize_t *length,
782 unsigned flags,
783 pj_sockaddr_t *addr,
784 int *addrlen)
785 {
786 struct read_operation *read_op;
787
788 PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
789 PJ_CHECK_STACK();
790
791 /* Check if key is closing. */
792 if (IS_CLOSING(key))
793 return PJ_ECANCELLED;
794
795 read_op = (struct read_operation*)op_key;
796 PJ_ASSERT_RETURN(read_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
797 read_op->op = PJ_IOQUEUE_OP_NONE;
798
799 /* Try to see if there's data immediately available.
800 */
801 if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
802 pj_status_t status;
803 pj_ssize_t size;
804
805 size = *length;
806 status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
807 addr, addrlen);
808 if (status == PJ_SUCCESS) {
809 /* Yes! Data is available! */
810 *length = size;
811 return PJ_SUCCESS;
812 } else {
813 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
814 * the error to caller.
815 */
816 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
817 return status;
818 }
819 }
820
821 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
822
823 /*
824 * No data is immediately available.
825 * Must schedule asynchronous operation to the ioqueue.
826 */
827 read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
828 read_op->buf = buffer;
829 read_op->size = *length;
830 read_op->flags = flags;
831 read_op->rmt_addr = addr;
832 read_op->rmt_addrlen = addrlen;
833
834 pj_ioqueue_lock_key(key);
835 /* Check again. Handle may have been closed after the previous check
836 * in multithreaded app. If we add bad handle to the set it will
837 * corrupt the ioqueue set. See #913
838 */
839 if (IS_CLOSING(key)) {
840 pj_ioqueue_unlock_key(key);
841 return PJ_ECANCELLED;
842 }
843 pj_list_insert_before(&key->read_list, read_op);
844 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
845 pj_ioqueue_unlock_key(key);
846
847 return PJ_EPENDING;
848 }
849
850 /*
851 * pj_ioqueue_send()
852 *
853 * Start asynchronous send() to the descriptor.
854 */
pj_ioqueue_send(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,const void * data,pj_ssize_t * length,unsigned flags)855 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
856 pj_ioqueue_op_key_t *op_key,
857 const void *data,
858 pj_ssize_t *length,
859 unsigned flags)
860 {
861 struct write_operation *write_op;
862 pj_status_t status;
863 unsigned retry;
864 pj_ssize_t sent;
865
866 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
867 PJ_CHECK_STACK();
868
869 /* Check if key is closing. */
870 if (IS_CLOSING(key))
871 return PJ_ECANCELLED;
872
873 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
874 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
875
876 /* Fast track:
877 * Try to send data immediately, only if there's no pending write!
878 * Note:
879 * We are speculating that the list is empty here without properly
880 * acquiring ioqueue's mutex first. This is intentional, to maximize
881 * performance via parallelism.
882 *
883 * This should be safe, because:
884 * - by convention, we require caller to make sure that the
885 * key is not unregistered while other threads are invoking
886 * an operation on the same key.
887 * - pj_list_empty() is safe to be invoked by multiple threads,
888 * even when other threads are modifying the list.
889 */
890 if (pj_list_empty(&key->write_list)) {
891 /*
892 * See if data can be sent immediately.
893 */
894 sent = *length;
895 status = pj_sock_send(key->fd, data, &sent, flags);
896 if (status == PJ_SUCCESS) {
897 /* Success! */
898 *length = sent;
899 return PJ_SUCCESS;
900 } else {
901 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
902 * the error to caller.
903 */
904 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
905 return status;
906 }
907 }
908 }
909
910 /*
911 * Schedule asynchronous send.
912 */
913 write_op = (struct write_operation*)op_key;
914
915 /* Spin if write_op has pending operation */
916 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
917 pj_thread_sleep(0);
918
919 /* Last chance */
920 if (write_op->op) {
921 /* Unable to send packet because there is already pending write in the
922 * write_op. We could not put the operation into the write_op
923 * because write_op already contains a pending operation! And
924 * we could not send the packet directly with send() either,
925 * because that will break the order of the packet. So we can
926 * only return error here.
927 *
928 * This could happen for example in multithreads program,
929 * where polling is done by one thread, while other threads are doing
930 * the sending only. If the polling thread runs on lower priority
931 * than the sending thread, then it's possible that the pending
932 * write flag is not cleared in-time because clearing is only done
933 * during polling.
934 *
935 * Aplication should specify multiple write operation keys on
936 * situation like this.
937 */
938 //pj_assert(!"ioqueue: there is pending operation on this key!");
939 return PJ_EBUSY;
940 }
941
942 write_op->op = PJ_IOQUEUE_OP_SEND;
943 write_op->buf = (char*)data;
944 write_op->size = *length;
945 write_op->written = 0;
946 write_op->flags = flags;
947
948 pj_ioqueue_lock_key(key);
949 /* Check again. Handle may have been closed after the previous check
950 * in multithreaded app. If we add bad handle to the set it will
951 * corrupt the ioqueue set. See #913
952 */
953 if (IS_CLOSING(key)) {
954 pj_ioqueue_unlock_key(key);
955 return PJ_ECANCELLED;
956 }
957 pj_list_insert_before(&key->write_list, write_op);
958 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
959 pj_ioqueue_unlock_key(key);
960
961 return PJ_EPENDING;
962 }
963
964
965 /*
966 * pj_ioqueue_sendto()
967 *
968 * Start asynchronous write() to the descriptor.
969 */
pj_ioqueue_sendto(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,const void * data,pj_ssize_t * length,pj_uint32_t flags,const pj_sockaddr_t * addr,int addrlen)970 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
971 pj_ioqueue_op_key_t *op_key,
972 const void *data,
973 pj_ssize_t *length,
974 pj_uint32_t flags,
975 const pj_sockaddr_t *addr,
976 int addrlen)
977 {
978 struct write_operation *write_op;
979 unsigned retry;
980 pj_bool_t restart_retry = PJ_FALSE;
981 pj_status_t status;
982 pj_ssize_t sent;
983
984 PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
985 PJ_CHECK_STACK();
986
987 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
988 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
989 retry_on_restart:
990 #else
991 PJ_UNUSED_ARG(restart_retry);
992 #endif
993 /* Check if key is closing. */
994 if (IS_CLOSING(key))
995 return PJ_ECANCELLED;
996
997 /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
998 flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
999
1000 /* Fast track:
1001 * Try to send data immediately, only if there's no pending write!
1002 * Note:
1003 * We are speculating that the list is empty here without properly
1004 * acquiring ioqueue's mutex first. This is intentional, to maximize
1005 * performance via parallelism.
1006 *
1007 * This should be safe, because:
1008 * - by convention, we require caller to make sure that the
1009 * key is not unregistered while other threads are invoking
1010 * an operation on the same key.
1011 * - pj_list_empty() is safe to be invoked by multiple threads,
1012 * even when other threads are modifying the list.
1013 */
1014 if (pj_list_empty(&key->write_list)) {
1015 /*
1016 * See if data can be sent immediately.
1017 */
1018 sent = *length;
1019 status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
1020 if (status == PJ_SUCCESS) {
1021 /* Success! */
1022 *length = sent;
1023 return PJ_SUCCESS;
1024 } else {
1025 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1026 * the error to caller.
1027 */
1028 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1029 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
1030 PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
1031 /* Special treatment for dead UDP sockets here, see ticket #1107 */
1032 if (status == PJ_STATUS_FROM_OS(EPIPE) && !IS_CLOSING(key) &&
1033 key->fd_type == pj_SOCK_DGRAM())
1034 {
1035 if (!restart_retry) {
1036 PJ_PERROR(4, (THIS_FILE, status,
1037 "Send error for socket %d, retrying",
1038 key->fd));
1039 status = replace_udp_sock(key);
1040 if (status == PJ_SUCCESS) {
1041 restart_retry = PJ_TRUE;
1042 goto retry_on_restart;
1043 }
1044 }
1045 status = PJ_ESOCKETSTOP;
1046 }
1047 #endif
1048 return status;
1049 }
1050 }
1051 }
1052
1053 /*
1054 * Check that address storage can hold the address parameter.
1055 */
1056 PJ_ASSERT_RETURN(addrlen <= (int)sizeof(pj_sockaddr_in), PJ_EBUG);
1057
1058 /*
1059 * Schedule asynchronous send.
1060 */
1061 write_op = (struct write_operation*)op_key;
1062
1063 /* Spin if write_op has pending operation */
1064 for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
1065 pj_thread_sleep(0);
1066
1067 /* Last chance */
1068 if (write_op->op) {
1069 /* Unable to send packet because there is already pending write on the
1070 * write_op. We could not put the operation into the write_op
1071 * because write_op already contains a pending operation! And
1072 * we could not send the packet directly with sendto() either,
1073 * because that will break the order of the packet. So we can
1074 * only return error here.
1075 *
1076 * This could happen for example in multithreads program,
1077 * where polling is done by one thread, while other threads are doing
1078 * the sending only. If the polling thread runs on lower priority
1079 * than the sending thread, then it's possible that the pending
1080 * write flag is not cleared in-time because clearing is only done
1081 * during polling.
1082 *
1083 * Aplication should specify multiple write operation keys on
1084 * situation like this.
1085 */
1086 //pj_assert(!"ioqueue: there is pending operation on this key!");
1087 return PJ_EBUSY;
1088 }
1089
1090 write_op->op = PJ_IOQUEUE_OP_SEND_TO;
1091 write_op->buf = (char*)data;
1092 write_op->size = *length;
1093 write_op->written = 0;
1094 write_op->flags = flags;
1095 pj_memcpy(&write_op->rmt_addr, addr, addrlen);
1096 write_op->rmt_addrlen = addrlen;
1097
1098 pj_ioqueue_lock_key(key);
1099 /* Check again. Handle may have been closed after the previous check
1100 * in multithreaded app. If we add bad handle to the set it will
1101 * corrupt the ioqueue set. See #913
1102 */
1103 if (IS_CLOSING(key)) {
1104 pj_ioqueue_unlock_key(key);
1105 return PJ_ECANCELLED;
1106 }
1107 pj_list_insert_before(&key->write_list, write_op);
1108 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1109 pj_ioqueue_unlock_key(key);
1110
1111 return PJ_EPENDING;
1112 }
1113
1114 #if PJ_HAS_TCP
1115 /*
1116 * Initiate overlapped accept() operation.
1117 */
pj_ioqueue_accept(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_sock_t * new_sock,pj_sockaddr_t * local,pj_sockaddr_t * remote,int * addrlen)1118 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
1119 pj_ioqueue_op_key_t *op_key,
1120 pj_sock_t *new_sock,
1121 pj_sockaddr_t *local,
1122 pj_sockaddr_t *remote,
1123 int *addrlen)
1124 {
1125 struct accept_operation *accept_op;
1126 pj_status_t status;
1127
1128 /* check parameters. All must be specified! */
1129 PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
1130
1131 /* Check if key is closing. */
1132 if (IS_CLOSING(key))
1133 return PJ_ECANCELLED;
1134
1135 accept_op = (struct accept_operation*)op_key;
1136 PJ_ASSERT_RETURN(accept_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING);
1137 accept_op->op = PJ_IOQUEUE_OP_NONE;
1138
1139 /* Fast track:
1140 * See if there's new connection available immediately.
1141 */
1142 if (pj_list_empty(&key->accept_list)) {
1143 status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
1144 if (status == PJ_SUCCESS) {
1145 /* Yes! New connection is available! */
1146 if (local && addrlen) {
1147 status = pj_sock_getsockname(*new_sock, local, addrlen);
1148 if (status != PJ_SUCCESS) {
1149 pj_sock_close(*new_sock);
1150 *new_sock = PJ_INVALID_SOCKET;
1151 return status;
1152 }
1153 }
1154 return PJ_SUCCESS;
1155 } else {
1156 /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
1157 * the error to caller.
1158 */
1159 if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
1160 return status;
1161 }
1162 }
1163 }
1164
1165 /*
1166 * No connection is available immediately.
1167 * Schedule accept() operation to be completed when there is incoming
1168 * connection available.
1169 */
1170 accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
1171 accept_op->accept_fd = new_sock;
1172 accept_op->rmt_addr = remote;
1173 accept_op->addrlen= addrlen;
1174 accept_op->local_addr = local;
1175
1176 pj_ioqueue_lock_key(key);
1177 /* Check again. Handle may have been closed after the previous check
1178 * in multithreaded app. If we add bad handle to the set it will
1179 * corrupt the ioqueue set. See #913
1180 */
1181 if (IS_CLOSING(key)) {
1182 pj_ioqueue_unlock_key(key);
1183 return PJ_ECANCELLED;
1184 }
1185 pj_list_insert_before(&key->accept_list, accept_op);
1186 ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
1187 pj_ioqueue_unlock_key(key);
1188
1189 return PJ_EPENDING;
1190 }
1191
1192 /*
1193 * Initiate overlapped connect() operation (well, it's non-blocking actually,
1194 * since there's no overlapped version of connect()).
1195 */
pj_ioqueue_connect(pj_ioqueue_key_t * key,const pj_sockaddr_t * addr,int addrlen)1196 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
1197 const pj_sockaddr_t *addr,
1198 int addrlen )
1199 {
1200 pj_status_t status;
1201
1202 /* check parameters. All must be specified! */
1203 PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
1204
1205 /* Check if key is closing. */
1206 if (IS_CLOSING(key))
1207 return PJ_ECANCELLED;
1208
1209 /* Check if socket has not been marked for connecting */
1210 if (key->connecting != 0)
1211 return PJ_EPENDING;
1212
1213 status = pj_sock_connect(key->fd, addr, addrlen);
1214 if (status == PJ_SUCCESS) {
1215 /* Connected! */
1216 return PJ_SUCCESS;
1217 } else {
1218 if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
1219 /* Pending! */
1220 pj_ioqueue_lock_key(key);
1221 /* Check again. Handle may have been closed after the previous
1222 * check in multithreaded app. See #913
1223 */
1224 if (IS_CLOSING(key)) {
1225 pj_ioqueue_unlock_key(key);
1226 return PJ_ECANCELLED;
1227 }
1228 key->connecting = PJ_TRUE;
1229 ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
1230 ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
1231 pj_ioqueue_unlock_key(key);
1232 return PJ_EPENDING;
1233 } else {
1234 /* Error! */
1235 return status;
1236 }
1237 }
1238 }
1239 #endif /* PJ_HAS_TCP */
1240
1241
pj_ioqueue_op_key_init(pj_ioqueue_op_key_t * op_key,pj_size_t size)1242 PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
1243 pj_size_t size )
1244 {
1245 pj_bzero(op_key, size);
1246 }
1247
1248
1249 /*
1250 * pj_ioqueue_is_pending()
1251 */
pj_ioqueue_is_pending(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key)1252 PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
1253 pj_ioqueue_op_key_t *op_key )
1254 {
1255 struct generic_operation *op_rec;
1256
1257 PJ_UNUSED_ARG(key);
1258
1259 op_rec = (struct generic_operation*)op_key;
1260 return op_rec->op != 0;
1261 }
1262
1263
1264 /*
1265 * pj_ioqueue_post_completion()
1266 */
pj_ioqueue_post_completion(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_status)1267 PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
1268 pj_ioqueue_op_key_t *op_key,
1269 pj_ssize_t bytes_status )
1270 {
1271 struct generic_operation *op_rec;
1272
1273 /*
1274 * Find the operation key in all pending operation list to
1275 * really make sure that it's still there; then call the callback.
1276 */
1277 pj_ioqueue_lock_key(key);
1278
1279 /* Find the operation in the pending read list. */
1280 op_rec = (struct generic_operation*)key->read_list.next;
1281 while (op_rec != (void*)&key->read_list) {
1282 if (op_rec == (void*)op_key) {
1283 pj_list_erase(op_rec);
1284 op_rec->op = PJ_IOQUEUE_OP_NONE;
1285 ioqueue_remove_from_set(key->ioqueue, key, READABLE_EVENT);
1286 pj_ioqueue_unlock_key(key);
1287
1288 if (key->cb.on_read_complete)
1289 (*key->cb.on_read_complete)(key, op_key, bytes_status);
1290 return PJ_SUCCESS;
1291 }
1292 op_rec = op_rec->next;
1293 }
1294
1295 /* Find the operation in the pending write list. */
1296 op_rec = (struct generic_operation*)key->write_list.next;
1297 while (op_rec != (void*)&key->write_list) {
1298 if (op_rec == (void*)op_key) {
1299 pj_list_erase(op_rec);
1300 op_rec->op = PJ_IOQUEUE_OP_NONE;
1301 ioqueue_remove_from_set(key->ioqueue, key, WRITEABLE_EVENT);
1302 pj_ioqueue_unlock_key(key);
1303
1304 if (key->cb.on_write_complete)
1305 (*key->cb.on_write_complete)(key, op_key, bytes_status);
1306 return PJ_SUCCESS;
1307 }
1308 op_rec = op_rec->next;
1309 }
1310
1311 /* Find the operation in the pending accept list. */
1312 op_rec = (struct generic_operation*)key->accept_list.next;
1313 while (op_rec != (void*)&key->accept_list) {
1314 if (op_rec == (void*)op_key) {
1315 pj_list_erase(op_rec);
1316 op_rec->op = PJ_IOQUEUE_OP_NONE;
1317 pj_ioqueue_unlock_key(key);
1318
1319 if (key->cb.on_accept_complete) {
1320 (*key->cb.on_accept_complete)(key, op_key,
1321 PJ_INVALID_SOCKET,
1322 (pj_status_t)bytes_status);
1323 }
1324 return PJ_SUCCESS;
1325 }
1326 op_rec = op_rec->next;
1327 }
1328
1329 /* Clear connecting operation. */
1330 if (key->connecting) {
1331 key->connecting = 0;
1332 ioqueue_remove_from_set(key->ioqueue, key, WRITEABLE_EVENT);
1333 ioqueue_remove_from_set(key->ioqueue, key, EXCEPTION_EVENT);
1334 }
1335
1336 pj_ioqueue_unlock_key(key);
1337
1338 return PJ_EINVALIDOP;
1339 }
1340
pj_ioqueue_set_default_concurrency(pj_ioqueue_t * ioqueue,pj_bool_t allow)1341 PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
1342 pj_bool_t allow)
1343 {
1344 PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
1345 ioqueue->default_concurrency = allow;
1346 return PJ_SUCCESS;
1347 }
1348
1349
pj_ioqueue_set_concurrency(pj_ioqueue_key_t * key,pj_bool_t allow)1350 PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
1351 pj_bool_t allow)
1352 {
1353 PJ_ASSERT_RETURN(key, PJ_EINVAL);
1354
1355 /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
1356 * disabled.
1357 */
1358 PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
1359
1360 key->allow_concurrent = allow;
1361 return PJ_SUCCESS;
1362 }
1363
pj_ioqueue_lock_key(pj_ioqueue_key_t * key)1364 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
1365 {
1366 if (key->grp_lock)
1367 return pj_grp_lock_acquire(key->grp_lock);
1368 else
1369 return pj_lock_acquire(key->lock);
1370 }
1371
pj_ioqueue_trylock_key(pj_ioqueue_key_t * key)1372 PJ_DEF(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key)
1373 {
1374 if (key->grp_lock)
1375 return pj_grp_lock_tryacquire(key->grp_lock);
1376 else
1377 return pj_lock_tryacquire(key->lock);
1378 }
1379
pj_ioqueue_unlock_key(pj_ioqueue_key_t * key)1380 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
1381 {
1382 if (key->grp_lock)
1383 return pj_grp_lock_release(key->grp_lock);
1384 else
1385 return pj_lock_release(key->lock);
1386 }
1387
1388
1389