1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this
4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 #include "primpl.h"
7 #include "pprmwait.h"
8 
9 #define _MW_REHASH_MAX 11
10 
11 static PRLock *mw_lock = NULL;
12 static _PRGlobalState *mw_state = NULL;
13 
14 static PRIntervalTime max_polling_interval;
15 
16 #ifdef WINNT
17 
18 typedef struct TimerEvent {
19     PRIntervalTime absolute;
20     void (*func)(void *);
21     void *arg;
22     LONG ref_count;
23     PRCList links;
24 } TimerEvent;
25 
26 #define TIMER_EVENT_PTR(_qp) \
27     ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
28 
29 struct {
30     PRLock *ml;
31     PRCondVar *new_timer;
32     PRCondVar *cancel_timer;
33     PRThread *manager_thread;
34     PRCList timer_queue;
35 } tm_vars;
36 
37 static PRStatus TimerInit(void);
38 static void TimerManager(void *arg);
39 static TimerEvent *CreateTimer(PRIntervalTime timeout,
40                                void (*func)(void *), void *arg);
41 static PRBool CancelTimer(TimerEvent *timer);
42 
TimerManager(void * arg)43 static void TimerManager(void *arg)
44 {
45     PRIntervalTime now;
46     PRIntervalTime timeout;
47     PRCList *head;
48     TimerEvent *timer;
49 
50     PR_Lock(tm_vars.ml);
51     while (1)
52     {
53         if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
54         {
55             PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
56         }
57         else
58         {
59             now = PR_IntervalNow();
60             head = PR_LIST_HEAD(&tm_vars.timer_queue);
61             timer = TIMER_EVENT_PTR(head);
62             if ((PRInt32) (now - timer->absolute) >= 0)
63             {
64                 PR_REMOVE_LINK(head);
65                 /*
66                  * make its prev and next point to itself so that
67                  * it's obvious that it's not on the timer_queue.
68                  */
69                 PR_INIT_CLIST(head);
70                 PR_ASSERT(2 == timer->ref_count);
71                 PR_Unlock(tm_vars.ml);
72                 timer->func(timer->arg);
73                 PR_Lock(tm_vars.ml);
74                 timer->ref_count -= 1;
75                 if (0 == timer->ref_count)
76                 {
77                     PR_NotifyAllCondVar(tm_vars.cancel_timer);
78                 }
79             }
80             else
81             {
82                 timeout = (PRIntervalTime)(timer->absolute - now);
83                 PR_WaitCondVar(tm_vars.new_timer, timeout);
84             }
85         }
86     }
87     PR_Unlock(tm_vars.ml);
88 }
89 
CreateTimer(PRIntervalTime timeout,void (* func)(void *),void * arg)90 static TimerEvent *CreateTimer(
91     PRIntervalTime timeout,
92     void (*func)(void *),
93     void *arg)
94 {
95     TimerEvent *timer;
96     PRCList *links, *tail;
97     TimerEvent *elem;
98 
99     timer = PR_NEW(TimerEvent);
100     if (NULL == timer)
101     {
102         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
103         return timer;
104     }
105     timer->absolute = PR_IntervalNow() + timeout;
106     timer->func = func;
107     timer->arg = arg;
108     timer->ref_count = 2;
109     PR_Lock(tm_vars.ml);
110     tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
111     while (links->prev != tail)
112     {
113         elem = TIMER_EVENT_PTR(links);
114         if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
115         {
116             break;
117         }
118         links = links->prev;
119     }
120     PR_INSERT_AFTER(&timer->links, links);
121     PR_NotifyCondVar(tm_vars.new_timer);
122     PR_Unlock(tm_vars.ml);
123     return timer;
124 }
125 
CancelTimer(TimerEvent * timer)126 static PRBool CancelTimer(TimerEvent *timer)
127 {
128     PRBool canceled = PR_FALSE;
129 
130     PR_Lock(tm_vars.ml);
131     timer->ref_count -= 1;
132     if (timer->links.prev == &timer->links)
133     {
134         while (timer->ref_count == 1)
135         {
136             PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
137         }
138     }
139     else
140     {
141         PR_REMOVE_LINK(&timer->links);
142         canceled = PR_TRUE;
143     }
144     PR_Unlock(tm_vars.ml);
145     PR_DELETE(timer);
146     return canceled;
147 }
148 
TimerInit(void)149 static PRStatus TimerInit(void)
150 {
151     tm_vars.ml = PR_NewLock();
152     if (NULL == tm_vars.ml)
153     {
154         goto failed;
155     }
156     tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
157     if (NULL == tm_vars.new_timer)
158     {
159         goto failed;
160     }
161     tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
162     if (NULL == tm_vars.cancel_timer)
163     {
164         goto failed;
165     }
166     PR_INIT_CLIST(&tm_vars.timer_queue);
167     tm_vars.manager_thread = PR_CreateThread(
168                                  PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
169                                  PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
170     if (NULL == tm_vars.manager_thread)
171     {
172         goto failed;
173     }
174     return PR_SUCCESS;
175 
176 failed:
177     if (NULL != tm_vars.cancel_timer)
178     {
179         PR_DestroyCondVar(tm_vars.cancel_timer);
180     }
181     if (NULL != tm_vars.new_timer)
182     {
183         PR_DestroyCondVar(tm_vars.new_timer);
184     }
185     if (NULL != tm_vars.ml)
186     {
187         PR_DestroyLock(tm_vars.ml);
188     }
189     return PR_FAILURE;
190 }
191 
192 #endif /* WINNT */
193 
194 /******************************************************************/
195 /******************************************************************/
196 /************************ The private portion *********************/
197 /******************************************************************/
198 /******************************************************************/
_PR_InitMW(void)199 void _PR_InitMW(void)
200 {
201 #ifdef WINNT
202     /*
203      * We use NT 4's InterlockedCompareExchange() to operate
204      * on PRMWStatus variables.
205      */
206     PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
207     TimerInit();
208 #endif
209     mw_lock = PR_NewLock();
210     PR_ASSERT(NULL != mw_lock);
211     mw_state = PR_NEWZAP(_PRGlobalState);
212     PR_ASSERT(NULL != mw_state);
213     PR_INIT_CLIST(&mw_state->group_list);
214     max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
215 }  /* _PR_InitMW */
216 
_PR_CleanupMW(void)217 void _PR_CleanupMW(void)
218 {
219     PR_DestroyLock(mw_lock);
220     mw_lock = NULL;
221     if (mw_state->group) {
222         PR_DestroyWaitGroup(mw_state->group);
223         /* mw_state->group is set to NULL as a side effect. */
224     }
225     PR_DELETE(mw_state);
226 }  /* _PR_CleanupMW */
227 
MW_Init2(void)228 static PRWaitGroup *MW_Init2(void)
229 {
230     PRWaitGroup *group = mw_state->group;  /* it's the null group */
231     if (NULL == group)  /* there is this special case */
232     {
233         group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
234         if (NULL == group) {
235             goto failed_alloc;
236         }
237         PR_Lock(mw_lock);
238         if (NULL == mw_state->group)
239         {
240             mw_state->group = group;
241             group = NULL;
242         }
243         PR_Unlock(mw_lock);
244         if (group != NULL) {
245             (void)PR_DestroyWaitGroup(group);
246         }
247         group = mw_state->group;  /* somebody beat us to it */
248     }
249 failed_alloc:
250     return group;  /* whatever */
251 }  /* MW_Init2 */
252 
MW_AddHashInternal(PRRecvWait * desc,_PRWaiterHash * hash)253 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
254 {
255     /*
256     ** The entries are put in the table using the fd (PRFileDesc*) of
257     ** the receive descriptor as the key. This allows us to locate
258     ** the appropriate entry aqain when the poll operation finishes.
259     **
260     ** The pointer to the file descriptor object is first divided by
261     ** the natural alignment of a pointer in the belief that object
262     ** will have at least that many zeros in the low order bits.
263     ** This may not be a good assuption.
264     **
265     ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
266     ** that we declare defeat and force the table to be reconstructed.
267     ** Since some fds might be added more than once, won't that cause
268     ** collisions even in an empty table?
269     */
270     PRIntn rehash = _MW_REHASH_MAX;
271     PRRecvWait **waiter;
272     PRUintn hidx = _MW_HASH(desc->fd, hash->length);
273     PRUintn hoffset = 0;
274 
275     while (rehash-- > 0)
276     {
277         waiter = &hash->recv_wait;
278         if (NULL == waiter[hidx])
279         {
280             waiter[hidx] = desc;
281             hash->count += 1;
282 #if 0
283             printf("Adding 0x%x->0x%x ", desc, desc->fd);
284             printf(
285                 "table[%u:%u:*%u]: 0x%x->0x%x\n",
286                 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
287 #endif
288             return _prmw_success;
289         }
290         if (desc == waiter[hidx])
291         {
292             PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);  /* desc already in table */
293             return _prmw_error;
294         }
295 #if 0
296         printf("Failing 0x%x->0x%x ", desc, desc->fd);
297         printf(
298             "table[*%u:%u:%u]: 0x%x->0x%x\n",
299             hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
300 #endif
301         if (0 == hoffset)
302         {
303             hoffset = _MW_HASH2(desc->fd, hash->length);
304             PR_ASSERT(0 != hoffset);
305         }
306         hidx = (hidx + hoffset) % (hash->length);
307     }
308     return _prmw_rehash;
309 }  /* MW_AddHashInternal */
310 
MW_ExpandHashInternal(PRWaitGroup * group)311 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
312 {
313     PRRecvWait **desc;
314     PRUint32 pidx, length;
315     _PRWaiterHash *newHash, *oldHash = group->waiter;
316     PRBool retry;
317     _PR_HashStory hrv;
318 
319     static const PRInt32 prime_number[] = {
320         _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
321         2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771
322     };
323     PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
324 
325     /* look up the next size we'd like to use for the hash table */
326     for (pidx = 0; pidx < primes; ++pidx)
327     {
328         if (prime_number[pidx] == oldHash->length)
329         {
330             break;
331         }
332     }
333     /* table size must be one of the prime numbers */
334     PR_ASSERT(pidx < primes);
335 
336     /* if pidx == primes - 1, we can't expand the table any more */
337     while (pidx < primes - 1)
338     {
339         /* next size */
340         ++pidx;
341         length = prime_number[pidx];
342 
343         /* allocate the new hash table and fill it in with the old */
344         newHash = (_PRWaiterHash*)PR_CALLOC(
345                       sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
346         if (NULL == newHash)
347         {
348             PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
349             return _prmw_error;
350         }
351 
352         newHash->length = length;
353         retry = PR_FALSE;
354         for (desc = &oldHash->recv_wait;
355              newHash->count < oldHash->count; ++desc)
356         {
357             PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
358             if (NULL != *desc)
359             {
360                 hrv = MW_AddHashInternal(*desc, newHash);
361                 PR_ASSERT(_prmw_error != hrv);
362                 if (_prmw_success != hrv)
363                 {
364                     PR_DELETE(newHash);
365                     retry = PR_TRUE;
366                     break;
367                 }
368             }
369         }
370         if (retry) {
371             continue;
372         }
373 
374         PR_DELETE(group->waiter);
375         group->waiter = newHash;
376         group->p_timestamp += 1;
377         return _prmw_success;
378     }
379 
380     PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
381     return _prmw_error;  /* we're hosed */
382 }  /* MW_ExpandHashInternal */
383 
384 #ifndef WINNT
_MW_DoneInternal(PRWaitGroup * group,PRRecvWait ** waiter,PRMWStatus outcome)385 static void _MW_DoneInternal(
386     PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
387 {
388     /*
389     ** Add this receive wait object to the list of finished I/O
390     ** operations for this particular group. If there are other
391     ** threads waiting on the group, notify one. If not, arrange
392     ** for this thread to return.
393     */
394 
395 #if 0
396     printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
397 #endif
398     (*waiter)->outcome = outcome;
399     PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
400     PR_NotifyCondVar(group->io_complete);
401     PR_ASSERT(0 != group->waiter->count);
402     group->waiter->count -= 1;
403     *waiter = NULL;
404 }  /* _MW_DoneInternal */
405 #endif /* WINNT */
406 
_MW_LookupInternal(PRWaitGroup * group,PRFileDesc * fd)407 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
408 {
409     /*
410     ** Find the receive wait object corresponding to the file descriptor.
411     ** Only search the wait group specified.
412     */
413     PRRecvWait **desc;
414     PRIntn rehash = _MW_REHASH_MAX;
415     _PRWaiterHash *hash = group->waiter;
416     PRUintn hidx = _MW_HASH(fd, hash->length);
417     PRUintn hoffset = 0;
418 
419     while (rehash-- > 0)
420     {
421         desc = (&hash->recv_wait) + hidx;
422         if ((*desc != NULL) && ((*desc)->fd == fd)) {
423             return desc;
424         }
425         if (0 == hoffset)
426         {
427             hoffset = _MW_HASH2(fd, hash->length);
428             PR_ASSERT(0 != hoffset);
429         }
430         hidx = (hidx + hoffset) % (hash->length);
431     }
432     return NULL;
433 }  /* _MW_LookupInternal */
434 
435 #ifndef WINNT
_MW_PollInternal(PRWaitGroup * group)436 static PRStatus _MW_PollInternal(PRWaitGroup *group)
437 {
438     PRRecvWait **waiter;
439     PRStatus rv = PR_FAILURE;
440     PRInt32 count, count_ready;
441     PRIntervalTime polling_interval;
442 
443     group->poller = PR_GetCurrentThread();
444 
445     while (PR_TRUE)
446     {
447         PRIntervalTime now, since_last_poll;
448         PRPollDesc *poll_list;
449 
450         while (0 == group->waiter->count)
451         {
452             PRStatus st;
453             st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
454             if (_prmw_running != group->state)
455             {
456                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
457                 goto aborted;
458             }
459             if (_MW_ABORTED(st)) {
460                 goto aborted;
461             }
462         }
463 
464         /*
465         ** There's something to do. See if our existing polling list
466         ** is large enough for what we have to do?
467         */
468 
469         while (group->polling_count < group->waiter->count)
470         {
471             PRUint32 old_count = group->waiter->count;
472             PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
473             PRSize new_size = sizeof(PRPollDesc) * new_count;
474             PRPollDesc *old_polling_list = group->polling_list;
475 
476             PR_Unlock(group->ml);
477             poll_list = (PRPollDesc*)PR_CALLOC(new_size);
478             if (NULL == poll_list)
479             {
480                 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
481                 PR_Lock(group->ml);
482                 goto failed_alloc;
483             }
484             if (NULL != old_polling_list) {
485                 PR_DELETE(old_polling_list);
486             }
487             PR_Lock(group->ml);
488             if (_prmw_running != group->state)
489             {
490                 PR_DELETE(poll_list);
491                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
492                 goto aborted;
493             }
494             group->polling_list = poll_list;
495             group->polling_count = new_count;
496         }
497 
498         now = PR_IntervalNow();
499         polling_interval = max_polling_interval;
500         since_last_poll = now - group->last_poll;
501 
502         waiter = &group->waiter->recv_wait;
503         poll_list = group->polling_list;
504         for (count = 0; count < group->waiter->count; ++waiter)
505         {
506             PR_ASSERT(waiter < &group->waiter->recv_wait
507                       + group->waiter->length);
508             if (NULL != *waiter)  /* a live one! */
509             {
510                 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
511                     && (since_last_poll >= (*waiter)->timeout)) {
512                     _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
513                 }
514                 else
515                 {
516                     if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
517                     {
518                         (*waiter)->timeout -= since_last_poll;
519                         if ((*waiter)->timeout < polling_interval) {
520                             polling_interval = (*waiter)->timeout;
521                         }
522                     }
523                     PR_ASSERT(poll_list < group->polling_list
524                               + group->polling_count);
525                     poll_list->fd = (*waiter)->fd;
526                     poll_list->in_flags = PR_POLL_READ;
527                     poll_list->out_flags = 0;
528 #if 0
529                     printf(
530                         "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
531                         poll_list, count, poll_list->fd, (*waiter)->timeout);
532 #endif
533                     poll_list += 1;
534                     count += 1;
535                 }
536             }
537         }
538 
539         PR_ASSERT(count == group->waiter->count);
540 
541         /*
542         ** If there are no more threads waiting for completion,
543         ** we need to return.
544         */
545         if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
546             && (1 == group->waiting_threads)) {
547             break;
548         }
549 
550         if (0 == count) {
551             continue;    /* wait for new business */
552         }
553 
554         group->last_poll = now;
555 
556         PR_Unlock(group->ml);
557 
558         count_ready = PR_Poll(group->polling_list, count, polling_interval);
559 
560         PR_Lock(group->ml);
561 
562         if (_prmw_running != group->state)
563         {
564             PR_SetError(PR_INVALID_STATE_ERROR, 0);
565             goto aborted;
566         }
567         if (-1 == count_ready)
568         {
569             goto failed_poll;  /* that's a shame */
570         }
571         else if (0 < count_ready)
572         {
573             for (poll_list = group->polling_list; count > 0;
574                  poll_list++, count--)
575             {
576                 PR_ASSERT(
577                     poll_list < group->polling_list + group->polling_count);
578                 if (poll_list->out_flags != 0)
579                 {
580                     waiter = _MW_LookupInternal(group, poll_list->fd);
581                     /*
582                     ** If 'waiter' is NULL, that means the wait receive
583                     ** descriptor has been canceled.
584                     */
585                     if (NULL != waiter) {
586                         _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
587                     }
588                 }
589             }
590         }
591         /*
592         ** If there are no more threads waiting for completion,
593         ** we need to return.
594         ** This thread was "borrowed" to do the polling, but it really
595         ** belongs to the client.
596         */
597         if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
598             && (1 == group->waiting_threads)) {
599             break;
600         }
601     }
602 
603     rv = PR_SUCCESS;
604 
605 aborted:
606 failed_poll:
607 failed_alloc:
608     group->poller = NULL;  /* we were that, not we ain't */
609     if ((_prmw_running == group->state) && (group->waiting_threads > 1))
610     {
611         /* Wake up one thread to become the new poller. */
612         PR_NotifyCondVar(group->io_complete);
613     }
614     return rv;  /* we return with the lock held */
615 }  /* _MW_PollInternal */
616 #endif /* !WINNT */
617 
MW_TestForShutdownInternal(PRWaitGroup * group)618 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
619 {
620     PRMWGroupState rv = group->state;
621     /*
622     ** Looking at the group's fields is safe because
623     ** once the group's state is no longer running, it
624     ** cannot revert and there is a safe check on entry
625     ** to make sure no more threads are made to wait.
626     */
627     if ((_prmw_stopping == rv)
628         && (0 == group->waiting_threads))
629     {
630         rv = group->state = _prmw_stopped;
631         PR_NotifyCondVar(group->mw_manage);
632     }
633     return rv;
634 }  /* MW_TestForShutdownInternal */
635 
636 #ifndef WINNT
_MW_InitialRecv(PRCList * io_ready)637 static void _MW_InitialRecv(PRCList *io_ready)
638 {
639     PRRecvWait *desc = (PRRecvWait*)io_ready;
640     if ((NULL == desc->buffer.start)
641         || (0 == desc->buffer.length)) {
642         desc->bytesRecv = 0;
643     }
644     else
645     {
646         desc->bytesRecv = (desc->fd->methods->recv)(
647                               desc->fd, desc->buffer.start,
648                               desc->buffer.length, 0, desc->timeout);
649         if (desc->bytesRecv < 0) { /* SetError should already be there */
650             desc->outcome = PR_MW_FAILURE;
651         }
652     }
653 }  /* _MW_InitialRecv */
654 #endif
655 
656 #ifdef WINNT
NT_TimeProc(void * arg)657 static void NT_TimeProc(void *arg)
658 {
659     _MDOverlapped *overlapped = (_MDOverlapped *)arg;
660     PRRecvWait *desc =  overlapped->data.mw.desc;
661     PRFileDesc *bottom;
662 
663     if (InterlockedCompareExchange((LONG *)&desc->outcome,
664                                    (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
665     {
666         /* This wait recv descriptor has already completed. */
667         return;
668     }
669 
670     /* close the osfd to abort the outstanding async io request */
671     /* $$$$
672     ** Little late to be checking if NSPR's on the bottom of stack,
673     ** but if we don't check, we can't assert that the private data
674     ** is what we think it is.
675     ** $$$$
676     */
677     bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
678     PR_ASSERT(NULL != bottom);
679     if (NULL != bottom)  /* now what!?!?! */
680     {
681         bottom->secret->state = _PR_FILEDESC_CLOSED;
682         if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
683         {
684             fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
685             PR_NOT_REACHED("What shall I do?");
686         }
687     }
688     return;
689 }  /* NT_TimeProc */
690 
NT_HashRemove(PRWaitGroup * group,PRFileDesc * fd)691 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
692 {
693     PRRecvWait **waiter;
694 
695     _PR_MD_LOCK(&group->mdlock);
696     waiter = _MW_LookupInternal(group, fd);
697     if (NULL != waiter)
698     {
699         group->waiter->count -= 1;
700         *waiter = NULL;
701     }
702     _PR_MD_UNLOCK(&group->mdlock);
703     return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
704 }
705 
NT_HashRemoveInternal(PRWaitGroup * group,PRFileDesc * fd)706 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
707 {
708     PRRecvWait **waiter;
709 
710     waiter = _MW_LookupInternal(group, fd);
711     if (NULL != waiter)
712     {
713         group->waiter->count -= 1;
714         *waiter = NULL;
715     }
716     return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
717 }
718 #endif /* WINNT */
719 
720 /******************************************************************/
721 /******************************************************************/
722 /********************** The public API portion ********************/
723 /******************************************************************/
724 /******************************************************************/
PR_AddWaitFileDesc(PRWaitGroup * group,PRRecvWait * desc)725 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
726     PRWaitGroup *group, PRRecvWait *desc)
727 {
728     _PR_HashStory hrv;
729     PRStatus rv = PR_FAILURE;
730 #ifdef WINNT
731     _MDOverlapped *overlapped;
732     HANDLE hFile;
733     BOOL bResult;
734     DWORD dwError;
735     PRFileDesc *bottom;
736 #endif
737 
738     if (!_pr_initialized) {
739         _PR_ImplicitInitialization();
740     }
741     if ((NULL == group) && (NULL == (group = MW_Init2())))
742     {
743         return rv;
744     }
745 
746     PR_ASSERT(NULL != desc->fd);
747 
748     desc->outcome = PR_MW_PENDING;  /* nice, well known value */
749     desc->bytesRecv = 0;  /* likewise, though this value is ambiguious */
750 
751     PR_Lock(group->ml);
752 
753     if (_prmw_running != group->state)
754     {
755         /* Not allowed to add after cancelling the group */
756         desc->outcome = PR_MW_INTERRUPT;
757         PR_SetError(PR_INVALID_STATE_ERROR, 0);
758         PR_Unlock(group->ml);
759         return rv;
760     }
761 
762 #ifdef WINNT
763     _PR_MD_LOCK(&group->mdlock);
764 #endif
765 
766     /*
767     ** If the waiter count is zero at this point, there's no telling
768     ** how long we've been idle. Therefore, initialize the beginning
769     ** of the timing interval. As long as the list doesn't go empty,
770     ** it will maintain itself.
771     */
772     if (0 == group->waiter->count) {
773         group->last_poll = PR_IntervalNow();
774     }
775 
776     do
777     {
778         hrv = MW_AddHashInternal(desc, group->waiter);
779         if (_prmw_rehash != hrv) {
780             break;
781         }
782         hrv = MW_ExpandHashInternal(group);  /* gruesome */
783         if (_prmw_success != hrv) {
784             break;
785         }
786     } while (PR_TRUE);
787 
788 #ifdef WINNT
789     _PR_MD_UNLOCK(&group->mdlock);
790 #endif
791 
792     PR_NotifyCondVar(group->new_business);  /* tell the world */
793     rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
794     PR_Unlock(group->ml);
795 
796 #ifdef WINNT
797     overlapped = PR_NEWZAP(_MDOverlapped);
798     if (NULL == overlapped)
799     {
800         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
801         NT_HashRemove(group, desc->fd);
802         return rv;
803     }
804     overlapped->ioModel = _MD_MultiWaitIO;
805     overlapped->data.mw.desc = desc;
806     overlapped->data.mw.group = group;
807     if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
808     {
809         overlapped->data.mw.timer = CreateTimer(
810                                         desc->timeout,
811                                         NT_TimeProc,
812                                         overlapped);
813         if (0 == overlapped->data.mw.timer)
814         {
815             NT_HashRemove(group, desc->fd);
816             PR_DELETE(overlapped);
817             /*
818              * XXX It appears that a maximum of 16 timer events can
819              * be outstanding. GetLastError() returns 0 when I try it.
820              */
821             PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
822             return PR_FAILURE;
823         }
824     }
825 
826     /* Reach to the bottom layer to get the OS fd */
827     bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
828     PR_ASSERT(NULL != bottom);
829     if (NULL == bottom)
830     {
831         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
832         return PR_FAILURE;
833     }
834     hFile = (HANDLE)bottom->secret->md.osfd;
835     if (!bottom->secret->md.io_model_committed)
836     {
837         PRInt32 st;
838         st = _md_Associate(hFile);
839         PR_ASSERT(0 != st);
840         bottom->secret->md.io_model_committed = PR_TRUE;
841     }
842     bResult = ReadFile(hFile,
843                        desc->buffer.start,
844                        (DWORD)desc->buffer.length,
845                        NULL,
846                        &overlapped->overlapped);
847     if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
848     {
849         if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
850         {
851             if (InterlockedCompareExchange((LONG *)&desc->outcome,
852                                            (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
853                 == (LONG)PR_MW_PENDING)
854             {
855                 CancelTimer(overlapped->data.mw.timer);
856             }
857             NT_HashRemove(group, desc->fd);
858             PR_DELETE(overlapped);
859         }
860         _PR_MD_MAP_READ_ERROR(dwError);
861         rv = PR_FAILURE;
862     }
863 #endif
864 
865     return rv;
866 }  /* PR_AddWaitFileDesc */
867 
PR_WaitRecvReady(PRWaitGroup * group)868 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
869 {
870     PRCList *io_ready = NULL;
871 #ifdef WINNT
872     PRThread *me = _PR_MD_CURRENT_THREAD();
873     _MDOverlapped *overlapped;
874 #endif
875 
876     if (!_pr_initialized) {
877         _PR_ImplicitInitialization();
878     }
879     if ((NULL == group) && (NULL == (group = MW_Init2()))) {
880         goto failed_init;
881     }
882 
883     PR_Lock(group->ml);
884 
885     if (_prmw_running != group->state)
886     {
887         PR_SetError(PR_INVALID_STATE_ERROR, 0);
888         goto invalid_state;
889     }
890 
891     group->waiting_threads += 1;  /* the polling thread is counted */
892 
893 #ifdef WINNT
894     _PR_MD_LOCK(&group->mdlock);
895     while (PR_CLIST_IS_EMPTY(&group->io_ready))
896     {
897         _PR_THREAD_LOCK(me);
898         me->state = _PR_IO_WAIT;
899         PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
900         if (!_PR_IS_NATIVE_THREAD(me))
901         {
902             _PR_SLEEPQ_LOCK(me->cpu);
903             _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
904             _PR_SLEEPQ_UNLOCK(me->cpu);
905         }
906         _PR_THREAD_UNLOCK(me);
907         _PR_MD_UNLOCK(&group->mdlock);
908         PR_Unlock(group->ml);
909         _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
910         me->state = _PR_RUNNING;
911         PR_Lock(group->ml);
912         _PR_MD_LOCK(&group->mdlock);
913         if (_PR_PENDING_INTERRUPT(me)) {
914             PR_REMOVE_LINK(&me->waitQLinks);
915             _PR_MD_UNLOCK(&group->mdlock);
916             me->flags &= ~_PR_INTERRUPT;
917             me->io_suspended = PR_FALSE;
918             PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
919             goto aborted;
920         }
921     }
922     io_ready = PR_LIST_HEAD(&group->io_ready);
923     PR_ASSERT(io_ready != NULL);
924     PR_REMOVE_LINK(io_ready);
925     _PR_MD_UNLOCK(&group->mdlock);
926     overlapped = (_MDOverlapped *)
927                  ((char *)io_ready - offsetof(_MDOverlapped, data));
928     io_ready = &overlapped->data.mw.desc->internal;
929 #else
930     do
931     {
932         /*
933         ** If the I/O ready list isn't empty, have this thread
934         ** return with the first receive wait object that's available.
935         */
936         if (PR_CLIST_IS_EMPTY(&group->io_ready))
937         {
938             /*
939             ** Is there a polling thread yet? If not, grab this thread
940             ** and use it.
941             */
942             if (NULL == group->poller)
943             {
944                 /*
945                 ** This thread will stay do polling until it becomes the only one
946                 ** left to service a completion. Then it will return and there will
947                 ** be none left to actually poll or to run completions.
948                 **
949                 ** The polling function should only return w/ failure or
950                 ** with some I/O ready.
951                 */
952                 if (PR_FAILURE == _MW_PollInternal(group)) {
953                     goto failed_poll;
954                 }
955             }
956             else
957             {
958                 /*
959                 ** There are four reasons a thread can be awakened from
960                 ** a wait on the io_complete condition variable.
961                 ** 1. Some I/O has completed, i.e., the io_ready list
962                 **    is nonempty.
963                 ** 2. The wait group is canceled.
964                 ** 3. The thread is interrupted.
965                 ** 4. The current polling thread has to leave and needs
966                 **    a replacement.
967                 ** The logic to find a new polling thread is made more
968                 ** complicated by all the other possible events.
969                 ** I tried my best to write the logic clearly, but
970                 ** it is still full of if's with continue and goto.
971                 */
972                 PRStatus st;
973                 do
974                 {
975                     st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
976                     if (_prmw_running != group->state)
977                     {
978                         PR_SetError(PR_INVALID_STATE_ERROR, 0);
979                         goto aborted;
980                     }
981                     if (_MW_ABORTED(st) || (NULL == group->poller)) {
982                         break;
983                     }
984                 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
985 
986                 /*
987                 ** The thread is interrupted and has to leave.  It might
988                 ** have also been awakened to process ready i/o or be the
989                 ** new poller.  To be safe, if either condition is true,
990                 ** we awaken another thread to take its place.
991                 */
992                 if (_MW_ABORTED(st))
993                 {
994                     if ((NULL == group->poller
995                          || !PR_CLIST_IS_EMPTY(&group->io_ready))
996                         && group->waiting_threads > 1) {
997                         PR_NotifyCondVar(group->io_complete);
998                     }
999                     goto aborted;
1000                 }
1001 
1002                 /*
1003                 ** A new poller is needed, but can I be the new poller?
1004                 ** If there is no i/o ready, sure.  But if there is any
1005                 ** i/o ready, it has a higher priority.  I want to
1006                 ** process the ready i/o first and wake up another
1007                 ** thread to be the new poller.
1008                 */
1009                 if (NULL == group->poller)
1010                 {
1011                     if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
1012                         continue;
1013                     }
1014                     if (group->waiting_threads > 1) {
1015                         PR_NotifyCondVar(group->io_complete);
1016                     }
1017                 }
1018             }
1019             PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
1020         }
1021         io_ready = PR_LIST_HEAD(&group->io_ready);
1022         PR_NotifyCondVar(group->io_taken);
1023         PR_ASSERT(io_ready != NULL);
1024         PR_REMOVE_LINK(io_ready);
1025     } while (NULL == io_ready);
1026 
1027 failed_poll:
1028 
1029 #endif
1030 
1031 aborted:
1032 
1033     group->waiting_threads -= 1;
1034 invalid_state:
1035     (void)MW_TestForShutdownInternal(group);
1036     PR_Unlock(group->ml);
1037 
1038 failed_init:
1039     if (NULL != io_ready)
1040     {
1041         /* If the operation failed, record the reason why */
1042         switch (((PRRecvWait*)io_ready)->outcome)
1043         {
1044             case PR_MW_PENDING:
1045                 PR_ASSERT(0);
1046                 break;
1047             case PR_MW_SUCCESS:
1048 #ifndef WINNT
1049                 _MW_InitialRecv(io_ready);
1050 #endif
1051                 break;
1052 #ifdef WINNT
1053             case PR_MW_FAILURE:
1054                 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
1055                 break;
1056 #endif
1057             case PR_MW_TIMEOUT:
1058                 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
1059                 break;
1060             case PR_MW_INTERRUPT:
1061                 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
1062                 break;
1063             default: break;
1064         }
1065 #ifdef WINNT
1066         if (NULL != overlapped->data.mw.timer)
1067         {
1068             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1069                       != overlapped->data.mw.desc->timeout);
1070             CancelTimer(overlapped->data.mw.timer);
1071         }
1072         else
1073         {
1074             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1075                       == overlapped->data.mw.desc->timeout);
1076         }
1077         PR_DELETE(overlapped);
1078 #endif
1079     }
1080     return (PRRecvWait*)io_ready;
1081 }  /* PR_WaitRecvReady */
1082 
PR_CancelWaitFileDesc(PRWaitGroup * group,PRRecvWait * desc)1083 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
1084 {
1085 #if !defined(WINNT)
1086     PRRecvWait **recv_wait;
1087 #endif
1088     PRStatus rv = PR_SUCCESS;
1089     if (NULL == group) {
1090         group = mw_state->group;
1091     }
1092     PR_ASSERT(NULL != group);
1093     if (NULL == group)
1094     {
1095         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1096         return PR_FAILURE;
1097     }
1098 
1099     PR_Lock(group->ml);
1100 
1101     if (_prmw_running != group->state)
1102     {
1103         PR_SetError(PR_INVALID_STATE_ERROR, 0);
1104         rv = PR_FAILURE;
1105         goto unlock;
1106     }
1107 
1108 #ifdef WINNT
1109     if (InterlockedCompareExchange((LONG *)&desc->outcome,
1110                                    (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
1111     {
1112         PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
1113         PR_ASSERT(NULL != bottom);
1114         if (NULL == bottom)
1115         {
1116             PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1117             goto unlock;
1118         }
1119         bottom->secret->state = _PR_FILEDESC_CLOSED;
1120 #if 0
1121         fprintf(stderr, "cancel wait recv: closing socket\n");
1122 #endif
1123         if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1124         {
1125             fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1126             exit(1);
1127         }
1128     }
1129 #else
1130     if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
1131     {
1132         /* it was in the wait table */
1133         _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1134         goto unlock;
1135     }
1136     if (!PR_CLIST_IS_EMPTY(&group->io_ready))
1137     {
1138         /* is it already complete? */
1139         PRCList *head = PR_LIST_HEAD(&group->io_ready);
1140         do
1141         {
1142             PRRecvWait *done = (PRRecvWait*)head;
1143             if (done == desc) {
1144                 goto unlock;
1145             }
1146             head = PR_NEXT_LINK(head);
1147         } while (head != &group->io_ready);
1148     }
1149     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1150     rv = PR_FAILURE;
1151 
1152 #endif
1153 unlock:
1154     PR_Unlock(group->ml);
1155     return rv;
1156 }  /* PR_CancelWaitFileDesc */
1157 
PR_CancelWaitGroup(PRWaitGroup * group)1158 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
1159 {
1160     PRRecvWait **desc;
1161     PRRecvWait *recv_wait = NULL;
1162 #ifdef WINNT
1163     _MDOverlapped *overlapped;
1164     PRRecvWait **end;
1165     PRThread *me = _PR_MD_CURRENT_THREAD();
1166 #endif
1167 
1168     if (NULL == group) {
1169         group = mw_state->group;
1170     }
1171     PR_ASSERT(NULL != group);
1172     if (NULL == group)
1173     {
1174         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1175         return NULL;
1176     }
1177 
1178     PR_Lock(group->ml);
1179     if (_prmw_stopped != group->state)
1180     {
1181         if (_prmw_running == group->state) {
1182             group->state = _prmw_stopping;    /* so nothing new comes in */
1183         }
1184         if (0 == group->waiting_threads) { /* is there anybody else? */
1185             group->state = _prmw_stopped;    /* we can stop right now */
1186         }
1187         else
1188         {
1189             PR_NotifyAllCondVar(group->new_business);
1190             PR_NotifyAllCondVar(group->io_complete);
1191         }
1192         while (_prmw_stopped != group->state) {
1193             (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1194         }
1195     }
1196 
1197 #ifdef WINNT
1198     _PR_MD_LOCK(&group->mdlock);
1199 #endif
1200     /* make all the existing descriptors look done/interrupted */
1201 #ifdef WINNT
1202     end = &group->waiter->recv_wait + group->waiter->length;
1203     for (desc = &group->waiter->recv_wait; desc < end; ++desc)
1204     {
1205         if (NULL != *desc)
1206         {
1207             if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
1208                                            (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
1209                 == (LONG)PR_MW_PENDING)
1210             {
1211                 PRFileDesc *bottom = PR_GetIdentitiesLayer(
1212                                          (*desc)->fd, PR_NSPR_IO_LAYER);
1213                 PR_ASSERT(NULL != bottom);
1214                 if (NULL == bottom)
1215                 {
1216                     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1217                     goto invalid_arg;
1218                 }
1219                 bottom->secret->state = _PR_FILEDESC_CLOSED;
1220 #if 0
1221                 fprintf(stderr, "cancel wait group: closing socket\n");
1222 #endif
1223                 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1224                 {
1225                     fprintf(stderr, "closesocket failed: %d\n",
1226                             WSAGetLastError());
1227                     exit(1);
1228                 }
1229             }
1230         }
1231     }
1232     while (group->waiter->count > 0)
1233     {
1234         _PR_THREAD_LOCK(me);
1235         me->state = _PR_IO_WAIT;
1236         PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1237         if (!_PR_IS_NATIVE_THREAD(me))
1238         {
1239             _PR_SLEEPQ_LOCK(me->cpu);
1240             _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1241             _PR_SLEEPQ_UNLOCK(me->cpu);
1242         }
1243         _PR_THREAD_UNLOCK(me);
1244         _PR_MD_UNLOCK(&group->mdlock);
1245         PR_Unlock(group->ml);
1246         _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1247         me->state = _PR_RUNNING;
1248         PR_Lock(group->ml);
1249         _PR_MD_LOCK(&group->mdlock);
1250     }
1251 #else
1252     for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
1253     {
1254         PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1255         if (NULL != *desc) {
1256             _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1257         }
1258     }
1259 #endif
1260 
1261     /* take first element of finished list and return it or NULL */
1262     if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
1263         PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1264     }
1265     else
1266     {
1267         PRCList *head = PR_LIST_HEAD(&group->io_ready);
1268         PR_REMOVE_AND_INIT_LINK(head);
1269 #ifdef WINNT
1270         overlapped = (_MDOverlapped *)
1271                      ((char *)head - offsetof(_MDOverlapped, data));
1272         head = &overlapped->data.mw.desc->internal;
1273         if (NULL != overlapped->data.mw.timer)
1274         {
1275             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1276                       != overlapped->data.mw.desc->timeout);
1277             CancelTimer(overlapped->data.mw.timer);
1278         }
1279         else
1280         {
1281             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1282                       == overlapped->data.mw.desc->timeout);
1283         }
1284         PR_DELETE(overlapped);
1285 #endif
1286         recv_wait = (PRRecvWait*)head;
1287     }
1288 #ifdef WINNT
1289 invalid_arg:
1290     _PR_MD_UNLOCK(&group->mdlock);
1291 #endif
1292     PR_Unlock(group->ml);
1293 
1294     return recv_wait;
1295 }  /* PR_CancelWaitGroup */
1296 
PR_CreateWaitGroup(PRInt32 size)1297 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
1298 {
1299     PRWaitGroup *wg;
1300 
1301     if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
1302     {
1303         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1304         goto failed;
1305     }
1306     /* the wait group itself */
1307     wg->ml = PR_NewLock();
1308     if (NULL == wg->ml) {
1309         goto failed_lock;
1310     }
1311     wg->io_taken = PR_NewCondVar(wg->ml);
1312     if (NULL == wg->io_taken) {
1313         goto failed_cvar0;
1314     }
1315     wg->io_complete = PR_NewCondVar(wg->ml);
1316     if (NULL == wg->io_complete) {
1317         goto failed_cvar1;
1318     }
1319     wg->new_business = PR_NewCondVar(wg->ml);
1320     if (NULL == wg->new_business) {
1321         goto failed_cvar2;
1322     }
1323     wg->mw_manage = PR_NewCondVar(wg->ml);
1324     if (NULL == wg->mw_manage) {
1325         goto failed_cvar3;
1326     }
1327 
1328     PR_INIT_CLIST(&wg->group_link);
1329     PR_INIT_CLIST(&wg->io_ready);
1330 
1331     /* the waiters sequence */
1332     wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1333                      sizeof(_PRWaiterHash) +
1334                      (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1335     if (NULL == wg->waiter)
1336     {
1337         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1338         goto failed_waiter;
1339     }
1340     wg->waiter->count = 0;
1341     wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1342 
1343 #ifdef WINNT
1344     _PR_MD_NEW_LOCK(&wg->mdlock);
1345     PR_INIT_CLIST(&wg->wait_list);
1346 #endif /* WINNT */
1347 
1348     PR_Lock(mw_lock);
1349     PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1350     PR_Unlock(mw_lock);
1351     return wg;
1352 
1353 failed_waiter:
1354     PR_DestroyCondVar(wg->mw_manage);
1355 failed_cvar3:
1356     PR_DestroyCondVar(wg->new_business);
1357 failed_cvar2:
1358     PR_DestroyCondVar(wg->io_complete);
1359 failed_cvar1:
1360     PR_DestroyCondVar(wg->io_taken);
1361 failed_cvar0:
1362     PR_DestroyLock(wg->ml);
1363 failed_lock:
1364     PR_DELETE(wg);
1365     wg = NULL;
1366 
1367 failed:
1368     return wg;
1369 }  /* MW_CreateWaitGroup */
1370 
PR_DestroyWaitGroup(PRWaitGroup * group)1371 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
1372 {
1373     PRStatus rv = PR_SUCCESS;
1374     if (NULL == group) {
1375         group = mw_state->group;
1376     }
1377     PR_ASSERT(NULL != group);
1378     if (NULL != group)
1379     {
1380         PR_Lock(group->ml);
1381         if ((group->waiting_threads == 0)
1382             && (group->waiter->count == 0)
1383             && PR_CLIST_IS_EMPTY(&group->io_ready))
1384         {
1385             group->state = _prmw_stopped;
1386         }
1387         else
1388         {
1389             PR_SetError(PR_INVALID_STATE_ERROR, 0);
1390             rv = PR_FAILURE;
1391         }
1392         PR_Unlock(group->ml);
1393         if (PR_FAILURE == rv) {
1394             return rv;
1395         }
1396 
1397         PR_Lock(mw_lock);
1398         PR_REMOVE_LINK(&group->group_link);
1399         PR_Unlock(mw_lock);
1400 
1401 #ifdef WINNT
1402         /*
1403          * XXX make sure wait_list is empty and waiter is empty.
1404          * These must be checked while holding mdlock.
1405          */
1406         _PR_MD_FREE_LOCK(&group->mdlock);
1407 #endif
1408 
1409         PR_DELETE(group->waiter);
1410         PR_DELETE(group->polling_list);
1411         PR_DestroyCondVar(group->mw_manage);
1412         PR_DestroyCondVar(group->new_business);
1413         PR_DestroyCondVar(group->io_complete);
1414         PR_DestroyCondVar(group->io_taken);
1415         PR_DestroyLock(group->ml);
1416         if (group == mw_state->group) {
1417             mw_state->group = NULL;
1418         }
1419         PR_DELETE(group);
1420     }
1421     else
1422     {
1423         /* The default wait group is not created yet. */
1424         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1425         rv = PR_FAILURE;
1426     }
1427     return rv;
1428 }  /* PR_DestroyWaitGroup */
1429 
1430 /**********************************************************************
1431 ***********************************************************************
1432 ******************** Wait group enumerations **************************
1433 ***********************************************************************
1434 **********************************************************************/
1435 
PR_CreateMWaitEnumerator(PRWaitGroup * group)1436 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
1437 {
1438     PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
1439     if (NULL == enumerator) {
1440         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1441     }
1442     else
1443     {
1444         enumerator->group = group;
1445         enumerator->seal = _PR_ENUM_SEALED;
1446     }
1447     return enumerator;
1448 }  /* PR_CreateMWaitEnumerator */
1449 
PR_DestroyMWaitEnumerator(PRMWaitEnumerator * enumerator)1450 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
1451 {
1452     PR_ASSERT(NULL != enumerator);
1453     PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1454     if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
1455     {
1456         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1457         return PR_FAILURE;
1458     }
1459     enumerator->seal = _PR_ENUM_UNSEALED;
1460     PR_Free(enumerator);
1461     return PR_SUCCESS;
1462 }  /* PR_DestroyMWaitEnumerator */
1463 
PR_EnumerateWaitGroup(PRMWaitEnumerator * enumerator,const PRRecvWait * previous)1464 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
1465     PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
1466 {
1467     PRRecvWait *result = NULL;
1468 
1469     /* entry point sanity checking */
1470     PR_ASSERT(NULL != enumerator);
1471     PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1472     if ((NULL == enumerator)
1473         || (_PR_ENUM_SEALED != enumerator->seal)) {
1474         goto bad_argument;
1475     }
1476 
1477     /* beginning of enumeration */
1478     if (NULL == previous)
1479     {
1480         if (NULL == enumerator->group)
1481         {
1482             enumerator->group = mw_state->group;
1483             if (NULL == enumerator->group)
1484             {
1485                 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1486                 return NULL;
1487             }
1488         }
1489         enumerator->waiter = &enumerator->group->waiter->recv_wait;
1490         enumerator->p_timestamp = enumerator->group->p_timestamp;
1491         enumerator->thread = PR_GetCurrentThread();
1492         enumerator->index = 0;
1493     }
1494     /* continuing an enumeration */
1495     else
1496     {
1497         PRThread *me = PR_GetCurrentThread();
1498         PR_ASSERT(me == enumerator->thread);
1499         if (me != enumerator->thread) {
1500             goto bad_argument;
1501         }
1502 
1503         /* need to restart the enumeration */
1504         if (enumerator->p_timestamp != enumerator->group->p_timestamp) {
1505             return PR_EnumerateWaitGroup(enumerator, NULL);
1506         }
1507     }
1508 
1509     /* actually progress the enumeration */
1510 #if defined(WINNT)
1511     _PR_MD_LOCK(&enumerator->group->mdlock);
1512 #else
1513     PR_Lock(enumerator->group->ml);
1514 #endif
1515     while (enumerator->index++ < enumerator->group->waiter->length)
1516     {
1517         if (NULL != (result = *(enumerator->waiter)++)) {
1518             break;
1519         }
1520     }
1521 #if defined(WINNT)
1522     _PR_MD_UNLOCK(&enumerator->group->mdlock);
1523 #else
1524     PR_Unlock(enumerator->group->ml);
1525 #endif
1526 
1527     return result;  /* what we live for */
1528 
1529 bad_argument:
1530     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1531     return NULL;  /* probably ambiguous */
1532 }  /* PR_EnumerateWaitGroup */
1533 
1534 /* prmwait.c */
1535