1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 #include "primpl.h"
7 #include "pprmwait.h"
8
9 #define _MW_REHASH_MAX 11
10
11 static PRLock *mw_lock = NULL;
12 static _PRGlobalState *mw_state = NULL;
13
14 static PRIntervalTime max_polling_interval;
15
16 #ifdef WINNT
17
18 typedef struct TimerEvent {
19 PRIntervalTime absolute;
20 void (*func)(void *);
21 void *arg;
22 LONG ref_count;
23 PRCList links;
24 } TimerEvent;
25
26 #define TIMER_EVENT_PTR(_qp) \
27 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
28
29 struct {
30 PRLock *ml;
31 PRCondVar *new_timer;
32 PRCondVar *cancel_timer;
33 PRThread *manager_thread;
34 PRCList timer_queue;
35 } tm_vars;
36
37 static PRStatus TimerInit(void);
38 static void TimerManager(void *arg);
39 static TimerEvent *CreateTimer(PRIntervalTime timeout,
40 void (*func)(void *), void *arg);
41 static PRBool CancelTimer(TimerEvent *timer);
42
TimerManager(void * arg)43 static void TimerManager(void *arg)
44 {
45 PRIntervalTime now;
46 PRIntervalTime timeout;
47 PRCList *head;
48 TimerEvent *timer;
49
50 PR_Lock(tm_vars.ml);
51 while (1)
52 {
53 if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
54 {
55 PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
56 }
57 else
58 {
59 now = PR_IntervalNow();
60 head = PR_LIST_HEAD(&tm_vars.timer_queue);
61 timer = TIMER_EVENT_PTR(head);
62 if ((PRInt32) (now - timer->absolute) >= 0)
63 {
64 PR_REMOVE_LINK(head);
65 /*
66 * make its prev and next point to itself so that
67 * it's obvious that it's not on the timer_queue.
68 */
69 PR_INIT_CLIST(head);
70 PR_ASSERT(2 == timer->ref_count);
71 PR_Unlock(tm_vars.ml);
72 timer->func(timer->arg);
73 PR_Lock(tm_vars.ml);
74 timer->ref_count -= 1;
75 if (0 == timer->ref_count)
76 {
77 PR_NotifyAllCondVar(tm_vars.cancel_timer);
78 }
79 }
80 else
81 {
82 timeout = (PRIntervalTime)(timer->absolute - now);
83 PR_WaitCondVar(tm_vars.new_timer, timeout);
84 }
85 }
86 }
87 PR_Unlock(tm_vars.ml);
88 }
89
CreateTimer(PRIntervalTime timeout,void (* func)(void *),void * arg)90 static TimerEvent *CreateTimer(
91 PRIntervalTime timeout,
92 void (*func)(void *),
93 void *arg)
94 {
95 TimerEvent *timer;
96 PRCList *links, *tail;
97 TimerEvent *elem;
98
99 timer = PR_NEW(TimerEvent);
100 if (NULL == timer)
101 {
102 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
103 return timer;
104 }
105 timer->absolute = PR_IntervalNow() + timeout;
106 timer->func = func;
107 timer->arg = arg;
108 timer->ref_count = 2;
109 PR_Lock(tm_vars.ml);
110 tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
111 while (links->prev != tail)
112 {
113 elem = TIMER_EVENT_PTR(links);
114 if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
115 {
116 break;
117 }
118 links = links->prev;
119 }
120 PR_INSERT_AFTER(&timer->links, links);
121 PR_NotifyCondVar(tm_vars.new_timer);
122 PR_Unlock(tm_vars.ml);
123 return timer;
124 }
125
CancelTimer(TimerEvent * timer)126 static PRBool CancelTimer(TimerEvent *timer)
127 {
128 PRBool canceled = PR_FALSE;
129
130 PR_Lock(tm_vars.ml);
131 timer->ref_count -= 1;
132 if (timer->links.prev == &timer->links)
133 {
134 while (timer->ref_count == 1)
135 {
136 PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
137 }
138 }
139 else
140 {
141 PR_REMOVE_LINK(&timer->links);
142 canceled = PR_TRUE;
143 }
144 PR_Unlock(tm_vars.ml);
145 PR_DELETE(timer);
146 return canceled;
147 }
148
TimerInit(void)149 static PRStatus TimerInit(void)
150 {
151 tm_vars.ml = PR_NewLock();
152 if (NULL == tm_vars.ml)
153 {
154 goto failed;
155 }
156 tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
157 if (NULL == tm_vars.new_timer)
158 {
159 goto failed;
160 }
161 tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
162 if (NULL == tm_vars.cancel_timer)
163 {
164 goto failed;
165 }
166 PR_INIT_CLIST(&tm_vars.timer_queue);
167 tm_vars.manager_thread = PR_CreateThread(
168 PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
169 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
170 if (NULL == tm_vars.manager_thread)
171 {
172 goto failed;
173 }
174 return PR_SUCCESS;
175
176 failed:
177 if (NULL != tm_vars.cancel_timer)
178 {
179 PR_DestroyCondVar(tm_vars.cancel_timer);
180 }
181 if (NULL != tm_vars.new_timer)
182 {
183 PR_DestroyCondVar(tm_vars.new_timer);
184 }
185 if (NULL != tm_vars.ml)
186 {
187 PR_DestroyLock(tm_vars.ml);
188 }
189 return PR_FAILURE;
190 }
191
192 #endif /* WINNT */
193
194 /******************************************************************/
195 /******************************************************************/
196 /************************ The private portion *********************/
197 /******************************************************************/
198 /******************************************************************/
_PR_InitMW(void)199 void _PR_InitMW(void)
200 {
201 #ifdef WINNT
202 /*
203 * We use NT 4's InterlockedCompareExchange() to operate
204 * on PRMWStatus variables.
205 */
206 PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
207 TimerInit();
208 #endif
209 mw_lock = PR_NewLock();
210 PR_ASSERT(NULL != mw_lock);
211 mw_state = PR_NEWZAP(_PRGlobalState);
212 PR_ASSERT(NULL != mw_state);
213 PR_INIT_CLIST(&mw_state->group_list);
214 max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
215 } /* _PR_InitMW */
216
_PR_CleanupMW(void)217 void _PR_CleanupMW(void)
218 {
219 PR_DestroyLock(mw_lock);
220 mw_lock = NULL;
221 if (mw_state->group) {
222 PR_DestroyWaitGroup(mw_state->group);
223 /* mw_state->group is set to NULL as a side effect. */
224 }
225 PR_DELETE(mw_state);
226 } /* _PR_CleanupMW */
227
MW_Init2(void)228 static PRWaitGroup *MW_Init2(void)
229 {
230 PRWaitGroup *group = mw_state->group; /* it's the null group */
231 if (NULL == group) /* there is this special case */
232 {
233 group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
234 if (NULL == group) {
235 goto failed_alloc;
236 }
237 PR_Lock(mw_lock);
238 if (NULL == mw_state->group)
239 {
240 mw_state->group = group;
241 group = NULL;
242 }
243 PR_Unlock(mw_lock);
244 if (group != NULL) {
245 (void)PR_DestroyWaitGroup(group);
246 }
247 group = mw_state->group; /* somebody beat us to it */
248 }
249 failed_alloc:
250 return group; /* whatever */
251 } /* MW_Init2 */
252
MW_AddHashInternal(PRRecvWait * desc,_PRWaiterHash * hash)253 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
254 {
255 /*
256 ** The entries are put in the table using the fd (PRFileDesc*) of
257 ** the receive descriptor as the key. This allows us to locate
258 ** the appropriate entry aqain when the poll operation finishes.
259 **
260 ** The pointer to the file descriptor object is first divided by
261 ** the natural alignment of a pointer in the belief that object
262 ** will have at least that many zeros in the low order bits.
263 ** This may not be a good assuption.
264 **
265 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
266 ** that we declare defeat and force the table to be reconstructed.
267 ** Since some fds might be added more than once, won't that cause
268 ** collisions even in an empty table?
269 */
270 PRIntn rehash = _MW_REHASH_MAX;
271 PRRecvWait **waiter;
272 PRUintn hidx = _MW_HASH(desc->fd, hash->length);
273 PRUintn hoffset = 0;
274
275 while (rehash-- > 0)
276 {
277 waiter = &hash->recv_wait;
278 if (NULL == waiter[hidx])
279 {
280 waiter[hidx] = desc;
281 hash->count += 1;
282 #if 0
283 printf("Adding 0x%x->0x%x ", desc, desc->fd);
284 printf(
285 "table[%u:%u:*%u]: 0x%x->0x%x\n",
286 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
287 #endif
288 return _prmw_success;
289 }
290 if (desc == waiter[hidx])
291 {
292 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
293 return _prmw_error;
294 }
295 #if 0
296 printf("Failing 0x%x->0x%x ", desc, desc->fd);
297 printf(
298 "table[*%u:%u:%u]: 0x%x->0x%x\n",
299 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
300 #endif
301 if (0 == hoffset)
302 {
303 hoffset = _MW_HASH2(desc->fd, hash->length);
304 PR_ASSERT(0 != hoffset);
305 }
306 hidx = (hidx + hoffset) % (hash->length);
307 }
308 return _prmw_rehash;
309 } /* MW_AddHashInternal */
310
MW_ExpandHashInternal(PRWaitGroup * group)311 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
312 {
313 PRRecvWait **desc;
314 PRUint32 pidx, length;
315 _PRWaiterHash *newHash, *oldHash = group->waiter;
316 PRBool retry;
317 _PR_HashStory hrv;
318
319 static const PRInt32 prime_number[] = {
320 _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
321 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771
322 };
323 PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
324
325 /* look up the next size we'd like to use for the hash table */
326 for (pidx = 0; pidx < primes; ++pidx)
327 {
328 if (prime_number[pidx] == oldHash->length)
329 {
330 break;
331 }
332 }
333 /* table size must be one of the prime numbers */
334 PR_ASSERT(pidx < primes);
335
336 /* if pidx == primes - 1, we can't expand the table any more */
337 while (pidx < primes - 1)
338 {
339 /* next size */
340 ++pidx;
341 length = prime_number[pidx];
342
343 /* allocate the new hash table and fill it in with the old */
344 newHash = (_PRWaiterHash*)PR_CALLOC(
345 sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
346 if (NULL == newHash)
347 {
348 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
349 return _prmw_error;
350 }
351
352 newHash->length = length;
353 retry = PR_FALSE;
354 for (desc = &oldHash->recv_wait;
355 newHash->count < oldHash->count; ++desc)
356 {
357 PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
358 if (NULL != *desc)
359 {
360 hrv = MW_AddHashInternal(*desc, newHash);
361 PR_ASSERT(_prmw_error != hrv);
362 if (_prmw_success != hrv)
363 {
364 PR_DELETE(newHash);
365 retry = PR_TRUE;
366 break;
367 }
368 }
369 }
370 if (retry) {
371 continue;
372 }
373
374 PR_DELETE(group->waiter);
375 group->waiter = newHash;
376 group->p_timestamp += 1;
377 return _prmw_success;
378 }
379
380 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
381 return _prmw_error; /* we're hosed */
382 } /* MW_ExpandHashInternal */
383
384 #ifndef WINNT
_MW_DoneInternal(PRWaitGroup * group,PRRecvWait ** waiter,PRMWStatus outcome)385 static void _MW_DoneInternal(
386 PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
387 {
388 /*
389 ** Add this receive wait object to the list of finished I/O
390 ** operations for this particular group. If there are other
391 ** threads waiting on the group, notify one. If not, arrange
392 ** for this thread to return.
393 */
394
395 #if 0
396 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
397 #endif
398 (*waiter)->outcome = outcome;
399 PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
400 PR_NotifyCondVar(group->io_complete);
401 PR_ASSERT(0 != group->waiter->count);
402 group->waiter->count -= 1;
403 *waiter = NULL;
404 } /* _MW_DoneInternal */
405 #endif /* WINNT */
406
_MW_LookupInternal(PRWaitGroup * group,PRFileDesc * fd)407 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
408 {
409 /*
410 ** Find the receive wait object corresponding to the file descriptor.
411 ** Only search the wait group specified.
412 */
413 PRRecvWait **desc;
414 PRIntn rehash = _MW_REHASH_MAX;
415 _PRWaiterHash *hash = group->waiter;
416 PRUintn hidx = _MW_HASH(fd, hash->length);
417 PRUintn hoffset = 0;
418
419 while (rehash-- > 0)
420 {
421 desc = (&hash->recv_wait) + hidx;
422 if ((*desc != NULL) && ((*desc)->fd == fd)) {
423 return desc;
424 }
425 if (0 == hoffset)
426 {
427 hoffset = _MW_HASH2(fd, hash->length);
428 PR_ASSERT(0 != hoffset);
429 }
430 hidx = (hidx + hoffset) % (hash->length);
431 }
432 return NULL;
433 } /* _MW_LookupInternal */
434
435 #ifndef WINNT
_MW_PollInternal(PRWaitGroup * group)436 static PRStatus _MW_PollInternal(PRWaitGroup *group)
437 {
438 PRRecvWait **waiter;
439 PRStatus rv = PR_FAILURE;
440 PRInt32 count, count_ready;
441 PRIntervalTime polling_interval;
442
443 group->poller = PR_GetCurrentThread();
444
445 while (PR_TRUE)
446 {
447 PRIntervalTime now, since_last_poll;
448 PRPollDesc *poll_list;
449
450 while (0 == group->waiter->count)
451 {
452 PRStatus st;
453 st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
454 if (_prmw_running != group->state)
455 {
456 PR_SetError(PR_INVALID_STATE_ERROR, 0);
457 goto aborted;
458 }
459 if (_MW_ABORTED(st)) {
460 goto aborted;
461 }
462 }
463
464 /*
465 ** There's something to do. See if our existing polling list
466 ** is large enough for what we have to do?
467 */
468
469 while (group->polling_count < group->waiter->count)
470 {
471 PRUint32 old_count = group->waiter->count;
472 PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
473 PRSize new_size = sizeof(PRPollDesc) * new_count;
474 PRPollDesc *old_polling_list = group->polling_list;
475
476 PR_Unlock(group->ml);
477 poll_list = (PRPollDesc*)PR_CALLOC(new_size);
478 if (NULL == poll_list)
479 {
480 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
481 PR_Lock(group->ml);
482 goto failed_alloc;
483 }
484 if (NULL != old_polling_list) {
485 PR_DELETE(old_polling_list);
486 }
487 PR_Lock(group->ml);
488 if (_prmw_running != group->state)
489 {
490 PR_DELETE(poll_list);
491 PR_SetError(PR_INVALID_STATE_ERROR, 0);
492 goto aborted;
493 }
494 group->polling_list = poll_list;
495 group->polling_count = new_count;
496 }
497
498 now = PR_IntervalNow();
499 polling_interval = max_polling_interval;
500 since_last_poll = now - group->last_poll;
501
502 waiter = &group->waiter->recv_wait;
503 poll_list = group->polling_list;
504 for (count = 0; count < group->waiter->count; ++waiter)
505 {
506 PR_ASSERT(waiter < &group->waiter->recv_wait
507 + group->waiter->length);
508 if (NULL != *waiter) /* a live one! */
509 {
510 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
511 && (since_last_poll >= (*waiter)->timeout)) {
512 _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
513 }
514 else
515 {
516 if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
517 {
518 (*waiter)->timeout -= since_last_poll;
519 if ((*waiter)->timeout < polling_interval) {
520 polling_interval = (*waiter)->timeout;
521 }
522 }
523 PR_ASSERT(poll_list < group->polling_list
524 + group->polling_count);
525 poll_list->fd = (*waiter)->fd;
526 poll_list->in_flags = PR_POLL_READ;
527 poll_list->out_flags = 0;
528 #if 0
529 printf(
530 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
531 poll_list, count, poll_list->fd, (*waiter)->timeout);
532 #endif
533 poll_list += 1;
534 count += 1;
535 }
536 }
537 }
538
539 PR_ASSERT(count == group->waiter->count);
540
541 /*
542 ** If there are no more threads waiting for completion,
543 ** we need to return.
544 */
545 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
546 && (1 == group->waiting_threads)) {
547 break;
548 }
549
550 if (0 == count) {
551 continue; /* wait for new business */
552 }
553
554 group->last_poll = now;
555
556 PR_Unlock(group->ml);
557
558 count_ready = PR_Poll(group->polling_list, count, polling_interval);
559
560 PR_Lock(group->ml);
561
562 if (_prmw_running != group->state)
563 {
564 PR_SetError(PR_INVALID_STATE_ERROR, 0);
565 goto aborted;
566 }
567 if (-1 == count_ready)
568 {
569 goto failed_poll; /* that's a shame */
570 }
571 else if (0 < count_ready)
572 {
573 for (poll_list = group->polling_list; count > 0;
574 poll_list++, count--)
575 {
576 PR_ASSERT(
577 poll_list < group->polling_list + group->polling_count);
578 if (poll_list->out_flags != 0)
579 {
580 waiter = _MW_LookupInternal(group, poll_list->fd);
581 /*
582 ** If 'waiter' is NULL, that means the wait receive
583 ** descriptor has been canceled.
584 */
585 if (NULL != waiter) {
586 _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
587 }
588 }
589 }
590 }
591 /*
592 ** If there are no more threads waiting for completion,
593 ** we need to return.
594 ** This thread was "borrowed" to do the polling, but it really
595 ** belongs to the client.
596 */
597 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
598 && (1 == group->waiting_threads)) {
599 break;
600 }
601 }
602
603 rv = PR_SUCCESS;
604
605 aborted:
606 failed_poll:
607 failed_alloc:
608 group->poller = NULL; /* we were that, not we ain't */
609 if ((_prmw_running == group->state) && (group->waiting_threads > 1))
610 {
611 /* Wake up one thread to become the new poller. */
612 PR_NotifyCondVar(group->io_complete);
613 }
614 return rv; /* we return with the lock held */
615 } /* _MW_PollInternal */
616 #endif /* !WINNT */
617
MW_TestForShutdownInternal(PRWaitGroup * group)618 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
619 {
620 PRMWGroupState rv = group->state;
621 /*
622 ** Looking at the group's fields is safe because
623 ** once the group's state is no longer running, it
624 ** cannot revert and there is a safe check on entry
625 ** to make sure no more threads are made to wait.
626 */
627 if ((_prmw_stopping == rv)
628 && (0 == group->waiting_threads))
629 {
630 rv = group->state = _prmw_stopped;
631 PR_NotifyCondVar(group->mw_manage);
632 }
633 return rv;
634 } /* MW_TestForShutdownInternal */
635
636 #ifndef WINNT
_MW_InitialRecv(PRCList * io_ready)637 static void _MW_InitialRecv(PRCList *io_ready)
638 {
639 PRRecvWait *desc = (PRRecvWait*)io_ready;
640 if ((NULL == desc->buffer.start)
641 || (0 == desc->buffer.length)) {
642 desc->bytesRecv = 0;
643 }
644 else
645 {
646 desc->bytesRecv = (desc->fd->methods->recv)(
647 desc->fd, desc->buffer.start,
648 desc->buffer.length, 0, desc->timeout);
649 if (desc->bytesRecv < 0) { /* SetError should already be there */
650 desc->outcome = PR_MW_FAILURE;
651 }
652 }
653 } /* _MW_InitialRecv */
654 #endif
655
656 #ifdef WINNT
NT_TimeProc(void * arg)657 static void NT_TimeProc(void *arg)
658 {
659 _MDOverlapped *overlapped = (_MDOverlapped *)arg;
660 PRRecvWait *desc = overlapped->data.mw.desc;
661 PRFileDesc *bottom;
662
663 if (InterlockedCompareExchange((LONG *)&desc->outcome,
664 (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
665 {
666 /* This wait recv descriptor has already completed. */
667 return;
668 }
669
670 /* close the osfd to abort the outstanding async io request */
671 /* $$$$
672 ** Little late to be checking if NSPR's on the bottom of stack,
673 ** but if we don't check, we can't assert that the private data
674 ** is what we think it is.
675 ** $$$$
676 */
677 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
678 PR_ASSERT(NULL != bottom);
679 if (NULL != bottom) /* now what!?!?! */
680 {
681 bottom->secret->state = _PR_FILEDESC_CLOSED;
682 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
683 {
684 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
685 PR_NOT_REACHED("What shall I do?");
686 }
687 }
688 return;
689 } /* NT_TimeProc */
690
NT_HashRemove(PRWaitGroup * group,PRFileDesc * fd)691 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
692 {
693 PRRecvWait **waiter;
694
695 _PR_MD_LOCK(&group->mdlock);
696 waiter = _MW_LookupInternal(group, fd);
697 if (NULL != waiter)
698 {
699 group->waiter->count -= 1;
700 *waiter = NULL;
701 }
702 _PR_MD_UNLOCK(&group->mdlock);
703 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
704 }
705
NT_HashRemoveInternal(PRWaitGroup * group,PRFileDesc * fd)706 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
707 {
708 PRRecvWait **waiter;
709
710 waiter = _MW_LookupInternal(group, fd);
711 if (NULL != waiter)
712 {
713 group->waiter->count -= 1;
714 *waiter = NULL;
715 }
716 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
717 }
718 #endif /* WINNT */
719
720 /******************************************************************/
721 /******************************************************************/
722 /********************** The public API portion ********************/
723 /******************************************************************/
724 /******************************************************************/
PR_AddWaitFileDesc(PRWaitGroup * group,PRRecvWait * desc)725 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
726 PRWaitGroup *group, PRRecvWait *desc)
727 {
728 _PR_HashStory hrv;
729 PRStatus rv = PR_FAILURE;
730 #ifdef WINNT
731 _MDOverlapped *overlapped;
732 HANDLE hFile;
733 BOOL bResult;
734 DWORD dwError;
735 PRFileDesc *bottom;
736 #endif
737
738 if (!_pr_initialized) {
739 _PR_ImplicitInitialization();
740 }
741 if ((NULL == group) && (NULL == (group = MW_Init2())))
742 {
743 return rv;
744 }
745
746 PR_ASSERT(NULL != desc->fd);
747
748 desc->outcome = PR_MW_PENDING; /* nice, well known value */
749 desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
750
751 PR_Lock(group->ml);
752
753 if (_prmw_running != group->state)
754 {
755 /* Not allowed to add after cancelling the group */
756 desc->outcome = PR_MW_INTERRUPT;
757 PR_SetError(PR_INVALID_STATE_ERROR, 0);
758 PR_Unlock(group->ml);
759 return rv;
760 }
761
762 #ifdef WINNT
763 _PR_MD_LOCK(&group->mdlock);
764 #endif
765
766 /*
767 ** If the waiter count is zero at this point, there's no telling
768 ** how long we've been idle. Therefore, initialize the beginning
769 ** of the timing interval. As long as the list doesn't go empty,
770 ** it will maintain itself.
771 */
772 if (0 == group->waiter->count) {
773 group->last_poll = PR_IntervalNow();
774 }
775
776 do
777 {
778 hrv = MW_AddHashInternal(desc, group->waiter);
779 if (_prmw_rehash != hrv) {
780 break;
781 }
782 hrv = MW_ExpandHashInternal(group); /* gruesome */
783 if (_prmw_success != hrv) {
784 break;
785 }
786 } while (PR_TRUE);
787
788 #ifdef WINNT
789 _PR_MD_UNLOCK(&group->mdlock);
790 #endif
791
792 PR_NotifyCondVar(group->new_business); /* tell the world */
793 rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
794 PR_Unlock(group->ml);
795
796 #ifdef WINNT
797 overlapped = PR_NEWZAP(_MDOverlapped);
798 if (NULL == overlapped)
799 {
800 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
801 NT_HashRemove(group, desc->fd);
802 return rv;
803 }
804 overlapped->ioModel = _MD_MultiWaitIO;
805 overlapped->data.mw.desc = desc;
806 overlapped->data.mw.group = group;
807 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
808 {
809 overlapped->data.mw.timer = CreateTimer(
810 desc->timeout,
811 NT_TimeProc,
812 overlapped);
813 if (0 == overlapped->data.mw.timer)
814 {
815 NT_HashRemove(group, desc->fd);
816 PR_DELETE(overlapped);
817 /*
818 * XXX It appears that a maximum of 16 timer events can
819 * be outstanding. GetLastError() returns 0 when I try it.
820 */
821 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
822 return PR_FAILURE;
823 }
824 }
825
826 /* Reach to the bottom layer to get the OS fd */
827 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
828 PR_ASSERT(NULL != bottom);
829 if (NULL == bottom)
830 {
831 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
832 return PR_FAILURE;
833 }
834 hFile = (HANDLE)bottom->secret->md.osfd;
835 if (!bottom->secret->md.io_model_committed)
836 {
837 PRInt32 st;
838 st = _md_Associate(hFile);
839 PR_ASSERT(0 != st);
840 bottom->secret->md.io_model_committed = PR_TRUE;
841 }
842 bResult = ReadFile(hFile,
843 desc->buffer.start,
844 (DWORD)desc->buffer.length,
845 NULL,
846 &overlapped->overlapped);
847 if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
848 {
849 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
850 {
851 if (InterlockedCompareExchange((LONG *)&desc->outcome,
852 (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
853 == (LONG)PR_MW_PENDING)
854 {
855 CancelTimer(overlapped->data.mw.timer);
856 }
857 NT_HashRemove(group, desc->fd);
858 PR_DELETE(overlapped);
859 }
860 _PR_MD_MAP_READ_ERROR(dwError);
861 rv = PR_FAILURE;
862 }
863 #endif
864
865 return rv;
866 } /* PR_AddWaitFileDesc */
867
PR_WaitRecvReady(PRWaitGroup * group)868 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
869 {
870 PRCList *io_ready = NULL;
871 #ifdef WINNT
872 PRThread *me = _PR_MD_CURRENT_THREAD();
873 _MDOverlapped *overlapped;
874 #endif
875
876 if (!_pr_initialized) {
877 _PR_ImplicitInitialization();
878 }
879 if ((NULL == group) && (NULL == (group = MW_Init2()))) {
880 goto failed_init;
881 }
882
883 PR_Lock(group->ml);
884
885 if (_prmw_running != group->state)
886 {
887 PR_SetError(PR_INVALID_STATE_ERROR, 0);
888 goto invalid_state;
889 }
890
891 group->waiting_threads += 1; /* the polling thread is counted */
892
893 #ifdef WINNT
894 _PR_MD_LOCK(&group->mdlock);
895 while (PR_CLIST_IS_EMPTY(&group->io_ready))
896 {
897 _PR_THREAD_LOCK(me);
898 me->state = _PR_IO_WAIT;
899 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
900 if (!_PR_IS_NATIVE_THREAD(me))
901 {
902 _PR_SLEEPQ_LOCK(me->cpu);
903 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
904 _PR_SLEEPQ_UNLOCK(me->cpu);
905 }
906 _PR_THREAD_UNLOCK(me);
907 _PR_MD_UNLOCK(&group->mdlock);
908 PR_Unlock(group->ml);
909 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
910 me->state = _PR_RUNNING;
911 PR_Lock(group->ml);
912 _PR_MD_LOCK(&group->mdlock);
913 if (_PR_PENDING_INTERRUPT(me)) {
914 PR_REMOVE_LINK(&me->waitQLinks);
915 _PR_MD_UNLOCK(&group->mdlock);
916 me->flags &= ~_PR_INTERRUPT;
917 me->io_suspended = PR_FALSE;
918 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
919 goto aborted;
920 }
921 }
922 io_ready = PR_LIST_HEAD(&group->io_ready);
923 PR_ASSERT(io_ready != NULL);
924 PR_REMOVE_LINK(io_ready);
925 _PR_MD_UNLOCK(&group->mdlock);
926 overlapped = (_MDOverlapped *)
927 ((char *)io_ready - offsetof(_MDOverlapped, data));
928 io_ready = &overlapped->data.mw.desc->internal;
929 #else
930 do
931 {
932 /*
933 ** If the I/O ready list isn't empty, have this thread
934 ** return with the first receive wait object that's available.
935 */
936 if (PR_CLIST_IS_EMPTY(&group->io_ready))
937 {
938 /*
939 ** Is there a polling thread yet? If not, grab this thread
940 ** and use it.
941 */
942 if (NULL == group->poller)
943 {
944 /*
945 ** This thread will stay do polling until it becomes the only one
946 ** left to service a completion. Then it will return and there will
947 ** be none left to actually poll or to run completions.
948 **
949 ** The polling function should only return w/ failure or
950 ** with some I/O ready.
951 */
952 if (PR_FAILURE == _MW_PollInternal(group)) {
953 goto failed_poll;
954 }
955 }
956 else
957 {
958 /*
959 ** There are four reasons a thread can be awakened from
960 ** a wait on the io_complete condition variable.
961 ** 1. Some I/O has completed, i.e., the io_ready list
962 ** is nonempty.
963 ** 2. The wait group is canceled.
964 ** 3. The thread is interrupted.
965 ** 4. The current polling thread has to leave and needs
966 ** a replacement.
967 ** The logic to find a new polling thread is made more
968 ** complicated by all the other possible events.
969 ** I tried my best to write the logic clearly, but
970 ** it is still full of if's with continue and goto.
971 */
972 PRStatus st;
973 do
974 {
975 st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
976 if (_prmw_running != group->state)
977 {
978 PR_SetError(PR_INVALID_STATE_ERROR, 0);
979 goto aborted;
980 }
981 if (_MW_ABORTED(st) || (NULL == group->poller)) {
982 break;
983 }
984 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
985
986 /*
987 ** The thread is interrupted and has to leave. It might
988 ** have also been awakened to process ready i/o or be the
989 ** new poller. To be safe, if either condition is true,
990 ** we awaken another thread to take its place.
991 */
992 if (_MW_ABORTED(st))
993 {
994 if ((NULL == group->poller
995 || !PR_CLIST_IS_EMPTY(&group->io_ready))
996 && group->waiting_threads > 1) {
997 PR_NotifyCondVar(group->io_complete);
998 }
999 goto aborted;
1000 }
1001
1002 /*
1003 ** A new poller is needed, but can I be the new poller?
1004 ** If there is no i/o ready, sure. But if there is any
1005 ** i/o ready, it has a higher priority. I want to
1006 ** process the ready i/o first and wake up another
1007 ** thread to be the new poller.
1008 */
1009 if (NULL == group->poller)
1010 {
1011 if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
1012 continue;
1013 }
1014 if (group->waiting_threads > 1) {
1015 PR_NotifyCondVar(group->io_complete);
1016 }
1017 }
1018 }
1019 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
1020 }
1021 io_ready = PR_LIST_HEAD(&group->io_ready);
1022 PR_NotifyCondVar(group->io_taken);
1023 PR_ASSERT(io_ready != NULL);
1024 PR_REMOVE_LINK(io_ready);
1025 } while (NULL == io_ready);
1026
1027 failed_poll:
1028
1029 #endif
1030
1031 aborted:
1032
1033 group->waiting_threads -= 1;
1034 invalid_state:
1035 (void)MW_TestForShutdownInternal(group);
1036 PR_Unlock(group->ml);
1037
1038 failed_init:
1039 if (NULL != io_ready)
1040 {
1041 /* If the operation failed, record the reason why */
1042 switch (((PRRecvWait*)io_ready)->outcome)
1043 {
1044 case PR_MW_PENDING:
1045 PR_ASSERT(0);
1046 break;
1047 case PR_MW_SUCCESS:
1048 #ifndef WINNT
1049 _MW_InitialRecv(io_ready);
1050 #endif
1051 break;
1052 #ifdef WINNT
1053 case PR_MW_FAILURE:
1054 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
1055 break;
1056 #endif
1057 case PR_MW_TIMEOUT:
1058 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
1059 break;
1060 case PR_MW_INTERRUPT:
1061 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
1062 break;
1063 default: break;
1064 }
1065 #ifdef WINNT
1066 if (NULL != overlapped->data.mw.timer)
1067 {
1068 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1069 != overlapped->data.mw.desc->timeout);
1070 CancelTimer(overlapped->data.mw.timer);
1071 }
1072 else
1073 {
1074 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1075 == overlapped->data.mw.desc->timeout);
1076 }
1077 PR_DELETE(overlapped);
1078 #endif
1079 }
1080 return (PRRecvWait*)io_ready;
1081 } /* PR_WaitRecvReady */
1082
PR_CancelWaitFileDesc(PRWaitGroup * group,PRRecvWait * desc)1083 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
1084 {
1085 #if !defined(WINNT)
1086 PRRecvWait **recv_wait;
1087 #endif
1088 PRStatus rv = PR_SUCCESS;
1089 if (NULL == group) {
1090 group = mw_state->group;
1091 }
1092 PR_ASSERT(NULL != group);
1093 if (NULL == group)
1094 {
1095 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1096 return PR_FAILURE;
1097 }
1098
1099 PR_Lock(group->ml);
1100
1101 if (_prmw_running != group->state)
1102 {
1103 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1104 rv = PR_FAILURE;
1105 goto unlock;
1106 }
1107
1108 #ifdef WINNT
1109 if (InterlockedCompareExchange((LONG *)&desc->outcome,
1110 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
1111 {
1112 PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
1113 PR_ASSERT(NULL != bottom);
1114 if (NULL == bottom)
1115 {
1116 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1117 goto unlock;
1118 }
1119 bottom->secret->state = _PR_FILEDESC_CLOSED;
1120 #if 0
1121 fprintf(stderr, "cancel wait recv: closing socket\n");
1122 #endif
1123 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1124 {
1125 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1126 exit(1);
1127 }
1128 }
1129 #else
1130 if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
1131 {
1132 /* it was in the wait table */
1133 _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1134 goto unlock;
1135 }
1136 if (!PR_CLIST_IS_EMPTY(&group->io_ready))
1137 {
1138 /* is it already complete? */
1139 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1140 do
1141 {
1142 PRRecvWait *done = (PRRecvWait*)head;
1143 if (done == desc) {
1144 goto unlock;
1145 }
1146 head = PR_NEXT_LINK(head);
1147 } while (head != &group->io_ready);
1148 }
1149 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1150 rv = PR_FAILURE;
1151
1152 #endif
1153 unlock:
1154 PR_Unlock(group->ml);
1155 return rv;
1156 } /* PR_CancelWaitFileDesc */
1157
PR_CancelWaitGroup(PRWaitGroup * group)1158 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
1159 {
1160 PRRecvWait **desc;
1161 PRRecvWait *recv_wait = NULL;
1162 #ifdef WINNT
1163 _MDOverlapped *overlapped;
1164 PRRecvWait **end;
1165 PRThread *me = _PR_MD_CURRENT_THREAD();
1166 #endif
1167
1168 if (NULL == group) {
1169 group = mw_state->group;
1170 }
1171 PR_ASSERT(NULL != group);
1172 if (NULL == group)
1173 {
1174 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1175 return NULL;
1176 }
1177
1178 PR_Lock(group->ml);
1179 if (_prmw_stopped != group->state)
1180 {
1181 if (_prmw_running == group->state) {
1182 group->state = _prmw_stopping; /* so nothing new comes in */
1183 }
1184 if (0 == group->waiting_threads) { /* is there anybody else? */
1185 group->state = _prmw_stopped; /* we can stop right now */
1186 }
1187 else
1188 {
1189 PR_NotifyAllCondVar(group->new_business);
1190 PR_NotifyAllCondVar(group->io_complete);
1191 }
1192 while (_prmw_stopped != group->state) {
1193 (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1194 }
1195 }
1196
1197 #ifdef WINNT
1198 _PR_MD_LOCK(&group->mdlock);
1199 #endif
1200 /* make all the existing descriptors look done/interrupted */
1201 #ifdef WINNT
1202 end = &group->waiter->recv_wait + group->waiter->length;
1203 for (desc = &group->waiter->recv_wait; desc < end; ++desc)
1204 {
1205 if (NULL != *desc)
1206 {
1207 if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
1208 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
1209 == (LONG)PR_MW_PENDING)
1210 {
1211 PRFileDesc *bottom = PR_GetIdentitiesLayer(
1212 (*desc)->fd, PR_NSPR_IO_LAYER);
1213 PR_ASSERT(NULL != bottom);
1214 if (NULL == bottom)
1215 {
1216 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1217 goto invalid_arg;
1218 }
1219 bottom->secret->state = _PR_FILEDESC_CLOSED;
1220 #if 0
1221 fprintf(stderr, "cancel wait group: closing socket\n");
1222 #endif
1223 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1224 {
1225 fprintf(stderr, "closesocket failed: %d\n",
1226 WSAGetLastError());
1227 exit(1);
1228 }
1229 }
1230 }
1231 }
1232 while (group->waiter->count > 0)
1233 {
1234 _PR_THREAD_LOCK(me);
1235 me->state = _PR_IO_WAIT;
1236 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1237 if (!_PR_IS_NATIVE_THREAD(me))
1238 {
1239 _PR_SLEEPQ_LOCK(me->cpu);
1240 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1241 _PR_SLEEPQ_UNLOCK(me->cpu);
1242 }
1243 _PR_THREAD_UNLOCK(me);
1244 _PR_MD_UNLOCK(&group->mdlock);
1245 PR_Unlock(group->ml);
1246 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1247 me->state = _PR_RUNNING;
1248 PR_Lock(group->ml);
1249 _PR_MD_LOCK(&group->mdlock);
1250 }
1251 #else
1252 for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
1253 {
1254 PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1255 if (NULL != *desc) {
1256 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1257 }
1258 }
1259 #endif
1260
1261 /* take first element of finished list and return it or NULL */
1262 if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
1263 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1264 }
1265 else
1266 {
1267 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1268 PR_REMOVE_AND_INIT_LINK(head);
1269 #ifdef WINNT
1270 overlapped = (_MDOverlapped *)
1271 ((char *)head - offsetof(_MDOverlapped, data));
1272 head = &overlapped->data.mw.desc->internal;
1273 if (NULL != overlapped->data.mw.timer)
1274 {
1275 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1276 != overlapped->data.mw.desc->timeout);
1277 CancelTimer(overlapped->data.mw.timer);
1278 }
1279 else
1280 {
1281 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1282 == overlapped->data.mw.desc->timeout);
1283 }
1284 PR_DELETE(overlapped);
1285 #endif
1286 recv_wait = (PRRecvWait*)head;
1287 }
1288 #ifdef WINNT
1289 invalid_arg:
1290 _PR_MD_UNLOCK(&group->mdlock);
1291 #endif
1292 PR_Unlock(group->ml);
1293
1294 return recv_wait;
1295 } /* PR_CancelWaitGroup */
1296
PR_CreateWaitGroup(PRInt32 size)1297 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
1298 {
1299 PRWaitGroup *wg;
1300
1301 if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
1302 {
1303 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1304 goto failed;
1305 }
1306 /* the wait group itself */
1307 wg->ml = PR_NewLock();
1308 if (NULL == wg->ml) {
1309 goto failed_lock;
1310 }
1311 wg->io_taken = PR_NewCondVar(wg->ml);
1312 if (NULL == wg->io_taken) {
1313 goto failed_cvar0;
1314 }
1315 wg->io_complete = PR_NewCondVar(wg->ml);
1316 if (NULL == wg->io_complete) {
1317 goto failed_cvar1;
1318 }
1319 wg->new_business = PR_NewCondVar(wg->ml);
1320 if (NULL == wg->new_business) {
1321 goto failed_cvar2;
1322 }
1323 wg->mw_manage = PR_NewCondVar(wg->ml);
1324 if (NULL == wg->mw_manage) {
1325 goto failed_cvar3;
1326 }
1327
1328 PR_INIT_CLIST(&wg->group_link);
1329 PR_INIT_CLIST(&wg->io_ready);
1330
1331 /* the waiters sequence */
1332 wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1333 sizeof(_PRWaiterHash) +
1334 (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1335 if (NULL == wg->waiter)
1336 {
1337 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1338 goto failed_waiter;
1339 }
1340 wg->waiter->count = 0;
1341 wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1342
1343 #ifdef WINNT
1344 _PR_MD_NEW_LOCK(&wg->mdlock);
1345 PR_INIT_CLIST(&wg->wait_list);
1346 #endif /* WINNT */
1347
1348 PR_Lock(mw_lock);
1349 PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1350 PR_Unlock(mw_lock);
1351 return wg;
1352
1353 failed_waiter:
1354 PR_DestroyCondVar(wg->mw_manage);
1355 failed_cvar3:
1356 PR_DestroyCondVar(wg->new_business);
1357 failed_cvar2:
1358 PR_DestroyCondVar(wg->io_complete);
1359 failed_cvar1:
1360 PR_DestroyCondVar(wg->io_taken);
1361 failed_cvar0:
1362 PR_DestroyLock(wg->ml);
1363 failed_lock:
1364 PR_DELETE(wg);
1365 wg = NULL;
1366
1367 failed:
1368 return wg;
1369 } /* MW_CreateWaitGroup */
1370
PR_DestroyWaitGroup(PRWaitGroup * group)1371 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
1372 {
1373 PRStatus rv = PR_SUCCESS;
1374 if (NULL == group) {
1375 group = mw_state->group;
1376 }
1377 PR_ASSERT(NULL != group);
1378 if (NULL != group)
1379 {
1380 PR_Lock(group->ml);
1381 if ((group->waiting_threads == 0)
1382 && (group->waiter->count == 0)
1383 && PR_CLIST_IS_EMPTY(&group->io_ready))
1384 {
1385 group->state = _prmw_stopped;
1386 }
1387 else
1388 {
1389 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1390 rv = PR_FAILURE;
1391 }
1392 PR_Unlock(group->ml);
1393 if (PR_FAILURE == rv) {
1394 return rv;
1395 }
1396
1397 PR_Lock(mw_lock);
1398 PR_REMOVE_LINK(&group->group_link);
1399 PR_Unlock(mw_lock);
1400
1401 #ifdef WINNT
1402 /*
1403 * XXX make sure wait_list is empty and waiter is empty.
1404 * These must be checked while holding mdlock.
1405 */
1406 _PR_MD_FREE_LOCK(&group->mdlock);
1407 #endif
1408
1409 PR_DELETE(group->waiter);
1410 PR_DELETE(group->polling_list);
1411 PR_DestroyCondVar(group->mw_manage);
1412 PR_DestroyCondVar(group->new_business);
1413 PR_DestroyCondVar(group->io_complete);
1414 PR_DestroyCondVar(group->io_taken);
1415 PR_DestroyLock(group->ml);
1416 if (group == mw_state->group) {
1417 mw_state->group = NULL;
1418 }
1419 PR_DELETE(group);
1420 }
1421 else
1422 {
1423 /* The default wait group is not created yet. */
1424 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1425 rv = PR_FAILURE;
1426 }
1427 return rv;
1428 } /* PR_DestroyWaitGroup */
1429
1430 /**********************************************************************
1431 ***********************************************************************
1432 ******************** Wait group enumerations **************************
1433 ***********************************************************************
1434 **********************************************************************/
1435
PR_CreateMWaitEnumerator(PRWaitGroup * group)1436 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
1437 {
1438 PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
1439 if (NULL == enumerator) {
1440 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1441 }
1442 else
1443 {
1444 enumerator->group = group;
1445 enumerator->seal = _PR_ENUM_SEALED;
1446 }
1447 return enumerator;
1448 } /* PR_CreateMWaitEnumerator */
1449
PR_DestroyMWaitEnumerator(PRMWaitEnumerator * enumerator)1450 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
1451 {
1452 PR_ASSERT(NULL != enumerator);
1453 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1454 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
1455 {
1456 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1457 return PR_FAILURE;
1458 }
1459 enumerator->seal = _PR_ENUM_UNSEALED;
1460 PR_Free(enumerator);
1461 return PR_SUCCESS;
1462 } /* PR_DestroyMWaitEnumerator */
1463
PR_EnumerateWaitGroup(PRMWaitEnumerator * enumerator,const PRRecvWait * previous)1464 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
1465 PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
1466 {
1467 PRRecvWait *result = NULL;
1468
1469 /* entry point sanity checking */
1470 PR_ASSERT(NULL != enumerator);
1471 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1472 if ((NULL == enumerator)
1473 || (_PR_ENUM_SEALED != enumerator->seal)) {
1474 goto bad_argument;
1475 }
1476
1477 /* beginning of enumeration */
1478 if (NULL == previous)
1479 {
1480 if (NULL == enumerator->group)
1481 {
1482 enumerator->group = mw_state->group;
1483 if (NULL == enumerator->group)
1484 {
1485 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1486 return NULL;
1487 }
1488 }
1489 enumerator->waiter = &enumerator->group->waiter->recv_wait;
1490 enumerator->p_timestamp = enumerator->group->p_timestamp;
1491 enumerator->thread = PR_GetCurrentThread();
1492 enumerator->index = 0;
1493 }
1494 /* continuing an enumeration */
1495 else
1496 {
1497 PRThread *me = PR_GetCurrentThread();
1498 PR_ASSERT(me == enumerator->thread);
1499 if (me != enumerator->thread) {
1500 goto bad_argument;
1501 }
1502
1503 /* need to restart the enumeration */
1504 if (enumerator->p_timestamp != enumerator->group->p_timestamp) {
1505 return PR_EnumerateWaitGroup(enumerator, NULL);
1506 }
1507 }
1508
1509 /* actually progress the enumeration */
1510 #if defined(WINNT)
1511 _PR_MD_LOCK(&enumerator->group->mdlock);
1512 #else
1513 PR_Lock(enumerator->group->ml);
1514 #endif
1515 while (enumerator->index++ < enumerator->group->waiter->length)
1516 {
1517 if (NULL != (result = *(enumerator->waiter)++)) {
1518 break;
1519 }
1520 }
1521 #if defined(WINNT)
1522 _PR_MD_UNLOCK(&enumerator->group->mdlock);
1523 #else
1524 PR_Unlock(enumerator->group->ml);
1525 #endif
1526
1527 return result; /* what we live for */
1528
1529 bad_argument:
1530 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1531 return NULL; /* probably ambiguous */
1532 } /* PR_EnumerateWaitGroup */
1533
1534 /* prmwait.c */
1535