1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this
4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 #include "nspr.h"
7 
8 /*
9  * Thread pools
10  *  Thread pools create and manage threads to provide support for
11  *  scheduling jobs onto one or more threads.
12  *
13  */
14 #ifdef OPT_WINNT
15 #include <windows.h>
16 #endif
17 
18 /*
19  * worker thread
20  */
21 typedef struct wthread {
22     PRCList     links;
23     PRThread    *thread;
24 } wthread;
25 
26 /*
27  * queue of timer jobs
28  */
29 typedef struct timer_jobq {
30     PRCList     list;
31     PRLock      *lock;
32     PRCondVar   *cv;
33     PRInt32     cnt;
34     PRCList     wthreads;
35 } timer_jobq;
36 
37 /*
38  * queue of jobs
39  */
40 typedef struct tp_jobq {
41     PRCList     list;
42     PRInt32     cnt;
43     PRLock      *lock;
44     PRCondVar   *cv;
45     PRCList     wthreads;
46 #ifdef OPT_WINNT
47     HANDLE      nt_completion_port;
48 #endif
49 } tp_jobq;
50 
51 /*
52  * queue of IO jobs
53  */
54 typedef struct io_jobq {
55     PRCList     list;
56     PRPollDesc  *pollfds;
57     PRInt32     npollfds;
58     PRJob       **polljobs;
59     PRLock      *lock;
60     PRInt32     cnt;
61     PRFileDesc  *notify_fd;
62     PRCList     wthreads;
63 } io_jobq;
64 
65 /*
66  * Threadpool
67  */
68 struct PRThreadPool {
69     PRInt32     init_threads;
70     PRInt32     max_threads;
71     PRInt32     current_threads;
72     PRInt32     idle_threads;
73     PRUint32    stacksize;
74     tp_jobq     jobq;
75     io_jobq     ioq;
76     timer_jobq  timerq;
77     PRLock      *join_lock;     /* used with jobp->join_cv */
78     PRCondVar   *shutdown_cv;
79     PRBool      shutdown;
80 };
81 
82 typedef enum io_op_type
83 { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
84 
85 #ifdef OPT_WINNT
86 typedef struct NT_notifier {
87     OVERLAPPED overlapped;      /* must be first */
88     PRJob   *jobp;
89 } NT_notifier;
90 #endif
91 
92 struct PRJob {
93     PRCList         links;      /*  for linking jobs */
94     PRBool          on_ioq;     /* job on ioq */
95     PRBool          on_timerq;  /* job on timerq */
96     PRJobFn         job_func;
97     void            *job_arg;
98     PRCondVar       *join_cv;
99     PRBool          join_wait;  /* == PR_TRUE, when waiting to join */
100     PRCondVar       *cancel_cv; /* for cancelling IO jobs */
101     PRBool          cancel_io;  /* for cancelling IO jobs */
102     PRThreadPool    *tpool;     /* back pointer to thread pool */
103     PRJobIoDesc     *iod;
104     io_op_type      io_op;
105     PRInt16         io_poll_flags;
106     PRNetAddr       *netaddr;
107     PRIntervalTime  timeout;    /* relative value */
108     PRIntervalTime  absolute;
109 #ifdef OPT_WINNT
110     NT_notifier     nt_notifier;
111 #endif
112 };
113 
114 #define JOB_LINKS_PTR(_qp) \
115     ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
116 
117 #define WTHREAD_LINKS_PTR(_qp) \
118     ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
119 
120 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
121 
122 #define JOIN_NOTIFY(_jobp)                              \
123                 PR_BEGIN_MACRO                          \
124                 PR_Lock(_jobp->tpool->join_lock);       \
125                 _jobp->join_wait = PR_FALSE;            \
126                 PR_NotifyCondVar(_jobp->join_cv);       \
127                 PR_Unlock(_jobp->tpool->join_lock);     \
128                 PR_END_MACRO
129 
130 #define CANCEL_IO_JOB(jobp)                             \
131                 PR_BEGIN_MACRO                          \
132                 jobp->cancel_io = PR_FALSE;             \
133                 jobp->on_ioq = PR_FALSE;                \
134                 PR_REMOVE_AND_INIT_LINK(&jobp->links);  \
135                 tp->ioq.cnt--;                          \
136                 PR_NotifyCondVar(jobp->cancel_cv);      \
137                 PR_END_MACRO
138 
139 static void delete_job(PRJob *jobp);
140 static PRThreadPool * alloc_threadpool(void);
141 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
142 static void notify_ioq(PRThreadPool *tp);
143 static void notify_timerq(PRThreadPool *tp);
144 
145 /*
146  * locks are acquired in the following order
147  *
148  *  tp->ioq.lock,tp->timerq.lock
149  *          |
150  *          V
151  *      tp->jobq->lock
152  */
153 
154 /*
155  * worker thread function
156  */
wstart(void * arg)157 static void wstart(void *arg)
158 {
159     PRThreadPool *tp = (PRThreadPool *) arg;
160     PRCList *head;
161 
162     /*
163      * execute jobs until shutdown
164      */
165     while (!tp->shutdown) {
166         PRJob *jobp;
167 #ifdef OPT_WINNT
168         BOOL rv;
169         DWORD unused, shutdown;
170         LPOVERLAPPED olp;
171 
172         PR_Lock(tp->jobq.lock);
173         tp->idle_threads++;
174         PR_Unlock(tp->jobq.lock);
175         rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
176                                        &unused, &shutdown, &olp, INFINITE);
177 
178         PR_ASSERT(rv);
179         if (shutdown) {
180             break;
181         }
182         jobp = ((NT_notifier *) olp)->jobp;
183         PR_Lock(tp->jobq.lock);
184         tp->idle_threads--;
185         tp->jobq.cnt--;
186         PR_Unlock(tp->jobq.lock);
187 #else
188 
189         PR_Lock(tp->jobq.lock);
190         while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
191             tp->idle_threads++;
192             PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
193             tp->idle_threads--;
194         }
195         if (tp->shutdown) {
196             PR_Unlock(tp->jobq.lock);
197             break;
198         }
199         head = PR_LIST_HEAD(&tp->jobq.list);
200         /*
201          * remove job from queue
202          */
203         PR_REMOVE_AND_INIT_LINK(head);
204         tp->jobq.cnt--;
205         jobp = JOB_LINKS_PTR(head);
206         PR_Unlock(tp->jobq.lock);
207 #endif
208 
209         jobp->job_func(jobp->job_arg);
210         if (!JOINABLE_JOB(jobp)) {
211             delete_job(jobp);
212         } else {
213             JOIN_NOTIFY(jobp);
214         }
215     }
216     PR_Lock(tp->jobq.lock);
217     tp->current_threads--;
218     PR_Unlock(tp->jobq.lock);
219 }
220 
221 /*
222  * add a job to the work queue
223  */
224 static void
add_to_jobq(PRThreadPool * tp,PRJob * jobp)225 add_to_jobq(PRThreadPool *tp, PRJob *jobp)
226 {
227     /*
228      * add to jobq
229      */
230 #ifdef OPT_WINNT
231     PR_Lock(tp->jobq.lock);
232     tp->jobq.cnt++;
233     PR_Unlock(tp->jobq.lock);
234     /*
235      * notify worker thread(s)
236      */
237     PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
238                                FALSE, &jobp->nt_notifier.overlapped);
239 #else
240     PR_Lock(tp->jobq.lock);
241     PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
242     tp->jobq.cnt++;
243     if ((tp->idle_threads < tp->jobq.cnt) &&
244         (tp->current_threads < tp->max_threads)) {
245         wthread *wthrp;
246         /*
247          * increment thread count and unlock the jobq lock
248          */
249         tp->current_threads++;
250         PR_Unlock(tp->jobq.lock);
251         /* create new worker thread */
252         wthrp = PR_NEWZAP(wthread);
253         if (wthrp) {
254             wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
255                                             tp, PR_PRIORITY_NORMAL,
256                                             PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
257             if (NULL == wthrp->thread) {
258                 PR_DELETE(wthrp);  /* this sets wthrp to NULL */
259             }
260         }
261         PR_Lock(tp->jobq.lock);
262         if (NULL == wthrp) {
263             tp->current_threads--;
264         } else {
265             PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
266         }
267     }
268     /*
269      * wakeup a worker thread
270      */
271     PR_NotifyCondVar(tp->jobq.cv);
272     PR_Unlock(tp->jobq.lock);
273 #endif
274 }
275 
276 /*
277  * io worker thread function
278  */
io_wstart(void * arg)279 static void io_wstart(void *arg)
280 {
281     PRThreadPool *tp = (PRThreadPool *) arg;
282     int pollfd_cnt, pollfds_used;
283     int rv;
284     PRCList *qp, *nextqp;
285     PRPollDesc *pollfds = NULL;
286     PRJob **polljobs = NULL;
287     int poll_timeout;
288     PRIntervalTime now;
289 
290     /*
291      * scan io_jobq
292      * construct poll list
293      * call PR_Poll
294      * for all fds, for which poll returns true, move the job to
295      * jobq and wakeup worker thread.
296      */
297     while (!tp->shutdown) {
298         PRJob *jobp;
299 
300         pollfd_cnt = tp->ioq.cnt + 10;
301         if (pollfd_cnt > tp->ioq.npollfds) {
302 
303             /*
304              * re-allocate pollfd array if the current one is not large
305              * enough
306              */
307             if (NULL != tp->ioq.pollfds) {
308                 PR_Free(tp->ioq.pollfds);
309             }
310             tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
311                               (sizeof(PRPollDesc) + sizeof(PRJob *)));
312             PR_ASSERT(NULL != tp->ioq.pollfds);
313             /*
314              * array of pollfds
315              */
316             pollfds = tp->ioq.pollfds;
317             tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
318             /*
319              * parallel array of jobs
320              */
321             polljobs = tp->ioq.polljobs;
322             tp->ioq.npollfds = pollfd_cnt;
323         }
324 
325         pollfds_used = 0;
326         /*
327          * add the notify fd; used for unblocking io thread(s)
328          */
329         pollfds[pollfds_used].fd = tp->ioq.notify_fd;
330         pollfds[pollfds_used].in_flags = PR_POLL_READ;
331         pollfds[pollfds_used].out_flags = 0;
332         polljobs[pollfds_used] = NULL;
333         pollfds_used++;
334         /*
335          * fill in the pollfd array
336          */
337         PR_Lock(tp->ioq.lock);
338         for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
339             nextqp = qp->next;
340             jobp = JOB_LINKS_PTR(qp);
341             if (jobp->cancel_io) {
342                 CANCEL_IO_JOB(jobp);
343                 continue;
344             }
345             if (pollfds_used == (pollfd_cnt)) {
346                 break;
347             }
348             pollfds[pollfds_used].fd = jobp->iod->socket;
349             pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
350             pollfds[pollfds_used].out_flags = 0;
351             polljobs[pollfds_used] = jobp;
352 
353             pollfds_used++;
354         }
355         if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
356             qp = tp->ioq.list.next;
357             jobp = JOB_LINKS_PTR(qp);
358             if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
359                 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
360             }
361             else if (PR_INTERVAL_NO_WAIT == jobp->timeout) {
362                 poll_timeout = PR_INTERVAL_NO_WAIT;
363             }
364             else {
365                 poll_timeout = jobp->absolute - PR_IntervalNow();
366                 if (poll_timeout <= 0) { /* already timed out */
367                     poll_timeout = PR_INTERVAL_NO_WAIT;
368                 }
369             }
370         } else {
371             poll_timeout = PR_INTERVAL_NO_TIMEOUT;
372         }
373         PR_Unlock(tp->ioq.lock);
374 
375         /*
376          * XXXX
377          * should retry if more jobs have been added to the queue?
378          *
379          */
380         PR_ASSERT(pollfds_used <= pollfd_cnt);
381         rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
382 
383         if (tp->shutdown) {
384             break;
385         }
386 
387         if (rv > 0) {
388             /*
389              * at least one io event is set
390              */
391             PRStatus rval_status;
392             PRInt32 index;
393 
394             PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
395             /*
396              * reset the pollable event, if notified
397              */
398             if (pollfds[0].out_flags & PR_POLL_READ) {
399                 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
400                 PR_ASSERT(PR_SUCCESS == rval_status);
401             }
402 
403             for(index = 1; index < (pollfds_used); index++) {
404                 PRInt16 events = pollfds[index].in_flags;
405                 PRInt16 revents = pollfds[index].out_flags;
406                 jobp = polljobs[index];
407 
408                 if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
409                     (revents & PR_POLL_ERR) ||
410                     ((events & PR_POLL_WRITE) &&
411                      (revents & PR_POLL_HUP))) { /* write op & hup */
412                     PR_Lock(tp->ioq.lock);
413                     if (jobp->cancel_io) {
414                         CANCEL_IO_JOB(jobp);
415                         PR_Unlock(tp->ioq.lock);
416                         continue;
417                     }
418                     PR_REMOVE_AND_INIT_LINK(&jobp->links);
419                     tp->ioq.cnt--;
420                     jobp->on_ioq = PR_FALSE;
421                     PR_Unlock(tp->ioq.lock);
422 
423                     /* set error */
424                     if (PR_POLL_NVAL & revents) {
425                         jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
426                     }
427                     else if (PR_POLL_HUP & revents) {
428                         jobp->iod->error = PR_CONNECT_RESET_ERROR;
429                     }
430                     else {
431                         jobp->iod->error = PR_IO_ERROR;
432                     }
433 
434                     /*
435                      * add to jobq
436                      */
437                     add_to_jobq(tp, jobp);
438                 } else if (revents) {
439                     /*
440                      * add to jobq
441                      */
442                     PR_Lock(tp->ioq.lock);
443                     if (jobp->cancel_io) {
444                         CANCEL_IO_JOB(jobp);
445                         PR_Unlock(tp->ioq.lock);
446                         continue;
447                     }
448                     PR_REMOVE_AND_INIT_LINK(&jobp->links);
449                     tp->ioq.cnt--;
450                     jobp->on_ioq = PR_FALSE;
451                     PR_Unlock(tp->ioq.lock);
452 
453                     if (jobp->io_op == JOB_IO_CONNECT) {
454                         if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) {
455                             jobp->iod->error = 0;
456                         }
457                         else {
458                             jobp->iod->error = PR_GetError();
459                         }
460                     } else {
461                         jobp->iod->error = 0;
462                     }
463 
464                     add_to_jobq(tp, jobp);
465                 }
466             }
467         }
468         /*
469          * timeout processing
470          */
471         now = PR_IntervalNow();
472         PR_Lock(tp->ioq.lock);
473         for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
474             nextqp = qp->next;
475             jobp = JOB_LINKS_PTR(qp);
476             if (jobp->cancel_io) {
477                 CANCEL_IO_JOB(jobp);
478                 continue;
479             }
480             if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
481                 break;
482             }
483             if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
484                 ((PRInt32)(jobp->absolute - now) > 0)) {
485                 break;
486             }
487             PR_REMOVE_AND_INIT_LINK(&jobp->links);
488             tp->ioq.cnt--;
489             jobp->on_ioq = PR_FALSE;
490             jobp->iod->error = PR_IO_TIMEOUT_ERROR;
491             add_to_jobq(tp, jobp);
492         }
493         PR_Unlock(tp->ioq.lock);
494     }
495 }
496 
497 /*
498  * timer worker thread function
499  */
timer_wstart(void * arg)500 static void timer_wstart(void *arg)
501 {
502     PRThreadPool *tp = (PRThreadPool *) arg;
503     PRCList *qp;
504     PRIntervalTime timeout;
505     PRIntervalTime now;
506 
507     /*
508      * call PR_WaitCondVar with minimum value of all timeouts
509      */
510     while (!tp->shutdown) {
511         PRJob *jobp;
512 
513         PR_Lock(tp->timerq.lock);
514         if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
515             timeout = PR_INTERVAL_NO_TIMEOUT;
516         } else {
517             PRCList *qp;
518 
519             qp = tp->timerq.list.next;
520             jobp = JOB_LINKS_PTR(qp);
521 
522             timeout = jobp->absolute - PR_IntervalNow();
523             if (timeout <= 0) {
524                 timeout = PR_INTERVAL_NO_WAIT;    /* already timed out */
525             }
526         }
527         if (PR_INTERVAL_NO_WAIT != timeout) {
528             PR_WaitCondVar(tp->timerq.cv, timeout);
529         }
530         if (tp->shutdown) {
531             PR_Unlock(tp->timerq.lock);
532             break;
533         }
534         /*
535          * move expired-timer jobs to jobq
536          */
537         now = PR_IntervalNow();
538         while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
539             qp = tp->timerq.list.next;
540             jobp = JOB_LINKS_PTR(qp);
541 
542             if ((PRInt32)(jobp->absolute - now) > 0) {
543                 break;
544             }
545             /*
546              * job timed out
547              */
548             PR_REMOVE_AND_INIT_LINK(&jobp->links);
549             tp->timerq.cnt--;
550             jobp->on_timerq = PR_FALSE;
551             add_to_jobq(tp, jobp);
552         }
553         PR_Unlock(tp->timerq.lock);
554     }
555 }
556 
557 static void
delete_threadpool(PRThreadPool * tp)558 delete_threadpool(PRThreadPool *tp)
559 {
560     if (NULL != tp) {
561         if (NULL != tp->shutdown_cv) {
562             PR_DestroyCondVar(tp->shutdown_cv);
563         }
564         if (NULL != tp->jobq.cv) {
565             PR_DestroyCondVar(tp->jobq.cv);
566         }
567         if (NULL != tp->jobq.lock) {
568             PR_DestroyLock(tp->jobq.lock);
569         }
570         if (NULL != tp->join_lock) {
571             PR_DestroyLock(tp->join_lock);
572         }
573 #ifdef OPT_WINNT
574         if (NULL != tp->jobq.nt_completion_port) {
575             CloseHandle(tp->jobq.nt_completion_port);
576         }
577 #endif
578         /* Timer queue */
579         if (NULL != tp->timerq.cv) {
580             PR_DestroyCondVar(tp->timerq.cv);
581         }
582         if (NULL != tp->timerq.lock) {
583             PR_DestroyLock(tp->timerq.lock);
584         }
585 
586         if (NULL != tp->ioq.lock) {
587             PR_DestroyLock(tp->ioq.lock);
588         }
589         if (NULL != tp->ioq.pollfds) {
590             PR_Free(tp->ioq.pollfds);
591         }
592         if (NULL != tp->ioq.notify_fd) {
593             PR_DestroyPollableEvent(tp->ioq.notify_fd);
594         }
595         PR_Free(tp);
596     }
597     return;
598 }
599 
600 static PRThreadPool *
alloc_threadpool(void)601 alloc_threadpool(void)
602 {
603     PRThreadPool *tp;
604 
605     tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
606     if (NULL == tp) {
607         goto failed;
608     }
609     tp->jobq.lock = PR_NewLock();
610     if (NULL == tp->jobq.lock) {
611         goto failed;
612     }
613     tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
614     if (NULL == tp->jobq.cv) {
615         goto failed;
616     }
617     tp->join_lock = PR_NewLock();
618     if (NULL == tp->join_lock) {
619         goto failed;
620     }
621 #ifdef OPT_WINNT
622     tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
623                                   NULL, 0, 0);
624     if (NULL == tp->jobq.nt_completion_port) {
625         goto failed;
626     }
627 #endif
628 
629     tp->ioq.lock = PR_NewLock();
630     if (NULL == tp->ioq.lock) {
631         goto failed;
632     }
633 
634     /* Timer queue */
635 
636     tp->timerq.lock = PR_NewLock();
637     if (NULL == tp->timerq.lock) {
638         goto failed;
639     }
640     tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
641     if (NULL == tp->timerq.cv) {
642         goto failed;
643     }
644 
645     tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
646     if (NULL == tp->shutdown_cv) {
647         goto failed;
648     }
649     tp->ioq.notify_fd = PR_NewPollableEvent();
650     if (NULL == tp->ioq.notify_fd) {
651         goto failed;
652     }
653     return tp;
654 failed:
655     delete_threadpool(tp);
656     PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
657     return NULL;
658 }
659 
660 /* Create thread pool */
661 PR_IMPLEMENT(PRThreadPool *)
PR_CreateThreadPool(PRInt32 initial_threads,PRInt32 max_threads,PRUint32 stacksize)662 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
663                     PRUint32 stacksize)
664 {
665     PRThreadPool *tp;
666     PRThread *thr;
667     int i;
668     wthread *wthrp;
669 
670     tp = alloc_threadpool();
671     if (NULL == tp) {
672         return NULL;
673     }
674 
675     tp->init_threads = initial_threads;
676     tp->max_threads = max_threads;
677     tp->stacksize = stacksize;
678     PR_INIT_CLIST(&tp->jobq.list);
679     PR_INIT_CLIST(&tp->ioq.list);
680     PR_INIT_CLIST(&tp->timerq.list);
681     PR_INIT_CLIST(&tp->jobq.wthreads);
682     PR_INIT_CLIST(&tp->ioq.wthreads);
683     PR_INIT_CLIST(&tp->timerq.wthreads);
684     tp->shutdown = PR_FALSE;
685 
686     PR_Lock(tp->jobq.lock);
687     for(i=0; i < initial_threads; ++i) {
688 
689         thr = PR_CreateThread(PR_USER_THREAD, wstart,
690                               tp, PR_PRIORITY_NORMAL,
691                               PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
692         PR_ASSERT(thr);
693         wthrp = PR_NEWZAP(wthread);
694         PR_ASSERT(wthrp);
695         wthrp->thread = thr;
696         PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
697     }
698     tp->current_threads = initial_threads;
699 
700     thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
701                           tp, PR_PRIORITY_NORMAL,
702                           PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
703     PR_ASSERT(thr);
704     wthrp = PR_NEWZAP(wthread);
705     PR_ASSERT(wthrp);
706     wthrp->thread = thr;
707     PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
708 
709     thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
710                           tp, PR_PRIORITY_NORMAL,
711                           PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
712     PR_ASSERT(thr);
713     wthrp = PR_NEWZAP(wthread);
714     PR_ASSERT(wthrp);
715     wthrp->thread = thr;
716     PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
717 
718     PR_Unlock(tp->jobq.lock);
719     return tp;
720 }
721 
722 static void
delete_job(PRJob * jobp)723 delete_job(PRJob *jobp)
724 {
725     if (NULL != jobp) {
726         if (NULL != jobp->join_cv) {
727             PR_DestroyCondVar(jobp->join_cv);
728             jobp->join_cv = NULL;
729         }
730         if (NULL != jobp->cancel_cv) {
731             PR_DestroyCondVar(jobp->cancel_cv);
732             jobp->cancel_cv = NULL;
733         }
734         PR_DELETE(jobp);
735     }
736 }
737 
738 static PRJob *
alloc_job(PRBool joinable,PRThreadPool * tp)739 alloc_job(PRBool joinable, PRThreadPool *tp)
740 {
741     PRJob *jobp;
742 
743     jobp = PR_NEWZAP(PRJob);
744     if (NULL == jobp) {
745         goto failed;
746     }
747     if (joinable) {
748         jobp->join_cv = PR_NewCondVar(tp->join_lock);
749         jobp->join_wait = PR_TRUE;
750         if (NULL == jobp->join_cv) {
751             goto failed;
752         }
753     } else {
754         jobp->join_cv = NULL;
755     }
756 #ifdef OPT_WINNT
757     jobp->nt_notifier.jobp = jobp;
758 #endif
759     return jobp;
760 failed:
761     delete_job(jobp);
762     PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
763     return NULL;
764 }
765 
766 /* queue a job */
767 PR_IMPLEMENT(PRJob *)
PR_QueueJob(PRThreadPool * tpool,PRJobFn fn,void * arg,PRBool joinable)768 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
769 {
770     PRJob *jobp;
771 
772     jobp = alloc_job(joinable, tpool);
773     if (NULL == jobp) {
774         return NULL;
775     }
776 
777     jobp->job_func = fn;
778     jobp->job_arg = arg;
779     jobp->tpool = tpool;
780 
781     add_to_jobq(tpool, jobp);
782     return jobp;
783 }
784 
785 /* queue a job, when a socket is readable or writeable */
786 static PRJob *
queue_io_job(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable,io_op_type op)787 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
788              PRBool joinable, io_op_type op)
789 {
790     PRJob *jobp;
791     PRIntervalTime now;
792 
793     jobp = alloc_job(joinable, tpool);
794     if (NULL == jobp) {
795         return NULL;
796     }
797 
798     /*
799      * Add a new job to io_jobq
800      * wakeup io worker thread
801      */
802 
803     jobp->job_func = fn;
804     jobp->job_arg = arg;
805     jobp->tpool = tpool;
806     jobp->iod = iod;
807     if (JOB_IO_READ == op) {
808         jobp->io_op = JOB_IO_READ;
809         jobp->io_poll_flags = PR_POLL_READ;
810     } else if (JOB_IO_WRITE == op) {
811         jobp->io_op = JOB_IO_WRITE;
812         jobp->io_poll_flags = PR_POLL_WRITE;
813     } else if (JOB_IO_ACCEPT == op) {
814         jobp->io_op = JOB_IO_ACCEPT;
815         jobp->io_poll_flags = PR_POLL_READ;
816     } else if (JOB_IO_CONNECT == op) {
817         jobp->io_op = JOB_IO_CONNECT;
818         jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
819     } else {
820         delete_job(jobp);
821         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
822         return NULL;
823     }
824 
825     jobp->timeout = iod->timeout;
826     if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
827         (PR_INTERVAL_NO_WAIT == iod->timeout)) {
828         jobp->absolute = iod->timeout;
829     } else {
830         now = PR_IntervalNow();
831         jobp->absolute = now + iod->timeout;
832     }
833 
834 
835     PR_Lock(tpool->ioq.lock);
836 
837     if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
838         (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
839         PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
840     } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
841         PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
842     } else {
843         PRCList *qp;
844         PRJob *tmp_jobp;
845         /*
846          * insert into the timeout-sorted ioq
847          */
848         for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
849              qp = qp->prev) {
850             tmp_jobp = JOB_LINKS_PTR(qp);
851             if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
852                 break;
853             }
854         }
855         PR_INSERT_AFTER(&jobp->links,qp);
856     }
857 
858     jobp->on_ioq = PR_TRUE;
859     tpool->ioq.cnt++;
860     /*
861      * notify io worker thread(s)
862      */
863     PR_Unlock(tpool->ioq.lock);
864     notify_ioq(tpool);
865     return jobp;
866 }
867 
868 /* queue a job, when a socket is readable */
869 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Read(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)870 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
871                  PRBool joinable)
872 {
873     return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
874 }
875 
876 /* queue a job, when a socket is writeable */
877 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Write(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)878 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
879                   PRBool joinable)
880 {
881     return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
882 }
883 
884 
885 /* queue a job, when a socket has a pending connection */
886 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Accept(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)887 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
888                    void * arg, PRBool joinable)
889 {
890     return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
891 }
892 
893 /* queue a job, when a socket can be connected */
894 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Connect(PRThreadPool * tpool,PRJobIoDesc * iod,const PRNetAddr * addr,PRJobFn fn,void * arg,PRBool joinable)895 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
896                     const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
897 {
898     PRStatus rv;
899     PRErrorCode err;
900 
901     rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
902     if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) {
903         /* connection pending */
904         return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
905     }
906     /*
907      * connection succeeded or failed; add to jobq right away
908      */
909     if (rv == PR_FAILURE) {
910         iod->error = err;
911     }
912     else {
913         iod->error = 0;
914     }
915     return(PR_QueueJob(tpool, fn, arg, joinable));
916 
917 }
918 
919 /* queue a job, when a timer expires */
920 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Timer(PRThreadPool * tpool,PRIntervalTime timeout,PRJobFn fn,void * arg,PRBool joinable)921 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
922                   PRJobFn fn, void * arg, PRBool joinable)
923 {
924     PRIntervalTime now;
925     PRJob *jobp;
926 
927     if (PR_INTERVAL_NO_TIMEOUT == timeout) {
928         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
929         return NULL;
930     }
931     if (PR_INTERVAL_NO_WAIT == timeout) {
932         /*
933          * no waiting; add to jobq right away
934          */
935         return(PR_QueueJob(tpool, fn, arg, joinable));
936     }
937     jobp = alloc_job(joinable, tpool);
938     if (NULL == jobp) {
939         return NULL;
940     }
941 
942     /*
943      * Add a new job to timer_jobq
944      * wakeup timer worker thread
945      */
946 
947     jobp->job_func = fn;
948     jobp->job_arg = arg;
949     jobp->tpool = tpool;
950     jobp->timeout = timeout;
951 
952     now = PR_IntervalNow();
953     jobp->absolute = now + timeout;
954 
955 
956     PR_Lock(tpool->timerq.lock);
957     jobp->on_timerq = PR_TRUE;
958     if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
959         PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
960     }
961     else {
962         PRCList *qp;
963         PRJob *tmp_jobp;
964         /*
965          * insert into the sorted timer jobq
966          */
967         for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
968              qp = qp->prev) {
969             tmp_jobp = JOB_LINKS_PTR(qp);
970             if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
971                 break;
972             }
973         }
974         PR_INSERT_AFTER(&jobp->links,qp);
975     }
976     tpool->timerq.cnt++;
977     /*
978      * notify timer worker thread(s)
979      */
980     notify_timerq(tpool);
981     PR_Unlock(tpool->timerq.lock);
982     return jobp;
983 }
984 
985 static void
notify_timerq(PRThreadPool * tp)986 notify_timerq(PRThreadPool *tp)
987 {
988     /*
989      * wakeup the timer thread(s)
990      */
991     PR_NotifyCondVar(tp->timerq.cv);
992 }
993 
994 static void
notify_ioq(PRThreadPool * tp)995 notify_ioq(PRThreadPool *tp)
996 {
997     PRStatus rval_status;
998 
999     /*
1000      * wakeup the io thread(s)
1001      */
1002     rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
1003     PR_ASSERT(PR_SUCCESS == rval_status);
1004 }
1005 
1006 /*
1007  * cancel a job
1008  *
1009  *  XXXX: is this needed? likely to be removed
1010  */
1011 PR_IMPLEMENT(PRStatus)
PR_CancelJob(PRJob * jobp)1012 PR_CancelJob(PRJob *jobp) {
1013 
1014     PRStatus rval = PR_FAILURE;
1015     PRThreadPool *tp;
1016 
1017     if (jobp->on_timerq) {
1018         /*
1019          * now, check again while holding the timerq lock
1020          */
1021         tp = jobp->tpool;
1022         PR_Lock(tp->timerq.lock);
1023         if (jobp->on_timerq) {
1024             jobp->on_timerq = PR_FALSE;
1025             PR_REMOVE_AND_INIT_LINK(&jobp->links);
1026             tp->timerq.cnt--;
1027             PR_Unlock(tp->timerq.lock);
1028             if (!JOINABLE_JOB(jobp)) {
1029                 delete_job(jobp);
1030             } else {
1031                 JOIN_NOTIFY(jobp);
1032             }
1033             rval = PR_SUCCESS;
1034         } else {
1035             PR_Unlock(tp->timerq.lock);
1036         }
1037     } else if (jobp->on_ioq) {
1038         /*
1039          * now, check again while holding the ioq lock
1040          */
1041         tp = jobp->tpool;
1042         PR_Lock(tp->ioq.lock);
1043         if (jobp->on_ioq) {
1044             jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
1045             if (NULL == jobp->cancel_cv) {
1046                 PR_Unlock(tp->ioq.lock);
1047                 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
1048                 return PR_FAILURE;
1049             }
1050             /*
1051              * mark job 'cancelled' and notify io thread(s)
1052              * XXXX:
1053              *      this assumes there is only one io thread; when there
1054              *      are multiple threads, the io thread processing this job
1055              *      must be notified.
1056              */
1057             jobp->cancel_io = PR_TRUE;
1058             PR_Unlock(tp->ioq.lock);    /* release, reacquire ioq lock */
1059             notify_ioq(tp);
1060             PR_Lock(tp->ioq.lock);
1061             while (jobp->cancel_io) {
1062                 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
1063             }
1064             PR_Unlock(tp->ioq.lock);
1065             PR_ASSERT(!jobp->on_ioq);
1066             if (!JOINABLE_JOB(jobp)) {
1067                 delete_job(jobp);
1068             } else {
1069                 JOIN_NOTIFY(jobp);
1070             }
1071             rval = PR_SUCCESS;
1072         } else {
1073             PR_Unlock(tp->ioq.lock);
1074         }
1075     }
1076     if (PR_FAILURE == rval) {
1077         PR_SetError(PR_INVALID_STATE_ERROR, 0);
1078     }
1079     return rval;
1080 }
1081 
1082 /* join a job, wait until completion */
1083 PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob * jobp)1084 PR_JoinJob(PRJob *jobp)
1085 {
1086     if (!JOINABLE_JOB(jobp)) {
1087         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1088         return PR_FAILURE;
1089     }
1090     PR_Lock(jobp->tpool->join_lock);
1091     while(jobp->join_wait) {
1092         PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
1093     }
1094     PR_Unlock(jobp->tpool->join_lock);
1095     delete_job(jobp);
1096     return PR_SUCCESS;
1097 }
1098 
1099 /* shutdown threadpool */
1100 PR_IMPLEMENT(PRStatus)
PR_ShutdownThreadPool(PRThreadPool * tpool)1101 PR_ShutdownThreadPool(PRThreadPool *tpool)
1102 {
1103     PRStatus rval = PR_SUCCESS;
1104 
1105     PR_Lock(tpool->jobq.lock);
1106     tpool->shutdown = PR_TRUE;
1107     PR_NotifyAllCondVar(tpool->shutdown_cv);
1108     PR_Unlock(tpool->jobq.lock);
1109 
1110     return rval;
1111 }
1112 
1113 /*
1114  * join thread pool
1115  *  wait for termination of worker threads
1116  *  reclaim threadpool resources
1117  */
1118 PR_IMPLEMENT(PRStatus)
PR_JoinThreadPool(PRThreadPool * tpool)1119 PR_JoinThreadPool(PRThreadPool *tpool)
1120 {
1121     PRStatus rval = PR_SUCCESS;
1122     PRCList *head;
1123     PRStatus rval_status;
1124 
1125     PR_Lock(tpool->jobq.lock);
1126     while (!tpool->shutdown) {
1127         PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
1128     }
1129 
1130     /*
1131      * wakeup worker threads
1132      */
1133 #ifdef OPT_WINNT
1134     /*
1135      * post shutdown notification for all threads
1136      */
1137     {
1138         int i;
1139         for(i=0; i < tpool->current_threads; i++) {
1140             PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
1141                                        TRUE, NULL);
1142         }
1143     }
1144 #else
1145     PR_NotifyAllCondVar(tpool->jobq.cv);
1146 #endif
1147 
1148     /*
1149      * wakeup io thread(s)
1150      */
1151     notify_ioq(tpool);
1152 
1153     /*
1154      * wakeup timer thread(s)
1155      */
1156     PR_Lock(tpool->timerq.lock);
1157     notify_timerq(tpool);
1158     PR_Unlock(tpool->timerq.lock);
1159 
1160     while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
1161         wthread *wthrp;
1162 
1163         head = PR_LIST_HEAD(&tpool->jobq.wthreads);
1164         PR_REMOVE_AND_INIT_LINK(head);
1165         PR_Unlock(tpool->jobq.lock);
1166         wthrp = WTHREAD_LINKS_PTR(head);
1167         rval_status = PR_JoinThread(wthrp->thread);
1168         PR_ASSERT(PR_SUCCESS == rval_status);
1169         PR_DELETE(wthrp);
1170         PR_Lock(tpool->jobq.lock);
1171     }
1172     PR_Unlock(tpool->jobq.lock);
1173     while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
1174         wthread *wthrp;
1175 
1176         head = PR_LIST_HEAD(&tpool->ioq.wthreads);
1177         PR_REMOVE_AND_INIT_LINK(head);
1178         wthrp = WTHREAD_LINKS_PTR(head);
1179         rval_status = PR_JoinThread(wthrp->thread);
1180         PR_ASSERT(PR_SUCCESS == rval_status);
1181         PR_DELETE(wthrp);
1182     }
1183 
1184     while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
1185         wthread *wthrp;
1186 
1187         head = PR_LIST_HEAD(&tpool->timerq.wthreads);
1188         PR_REMOVE_AND_INIT_LINK(head);
1189         wthrp = WTHREAD_LINKS_PTR(head);
1190         rval_status = PR_JoinThread(wthrp->thread);
1191         PR_ASSERT(PR_SUCCESS == rval_status);
1192         PR_DELETE(wthrp);
1193     }
1194 
1195     /*
1196      * Delete queued jobs
1197      */
1198     while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
1199         PRJob *jobp;
1200 
1201         head = PR_LIST_HEAD(&tpool->jobq.list);
1202         PR_REMOVE_AND_INIT_LINK(head);
1203         jobp = JOB_LINKS_PTR(head);
1204         tpool->jobq.cnt--;
1205         delete_job(jobp);
1206     }
1207 
1208     /* delete io jobs */
1209     while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
1210         PRJob *jobp;
1211 
1212         head = PR_LIST_HEAD(&tpool->ioq.list);
1213         PR_REMOVE_AND_INIT_LINK(head);
1214         tpool->ioq.cnt--;
1215         jobp = JOB_LINKS_PTR(head);
1216         delete_job(jobp);
1217     }
1218 
1219     /* delete timer jobs */
1220     while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
1221         PRJob *jobp;
1222 
1223         head = PR_LIST_HEAD(&tpool->timerq.list);
1224         PR_REMOVE_AND_INIT_LINK(head);
1225         tpool->timerq.cnt--;
1226         jobp = JOB_LINKS_PTR(head);
1227         delete_job(jobp);
1228     }
1229 
1230     PR_ASSERT(0 == tpool->jobq.cnt);
1231     PR_ASSERT(0 == tpool->ioq.cnt);
1232     PR_ASSERT(0 == tpool->timerq.cnt);
1233 
1234     delete_threadpool(tpool);
1235     return rval;
1236 }
1237