1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 #include "nspr.h"
7
8 /*
9 * Thread pools
10 * Thread pools create and manage threads to provide support for
11 * scheduling jobs onto one or more threads.
12 *
13 */
14 #ifdef OPT_WINNT
15 #include <windows.h>
16 #endif
17
18 /*
19 * worker thread
20 */
21 typedef struct wthread {
22 PRCList links;
23 PRThread *thread;
24 } wthread;
25
26 /*
27 * queue of timer jobs
28 */
29 typedef struct timer_jobq {
30 PRCList list;
31 PRLock *lock;
32 PRCondVar *cv;
33 PRInt32 cnt;
34 PRCList wthreads;
35 } timer_jobq;
36
37 /*
38 * queue of jobs
39 */
40 typedef struct tp_jobq {
41 PRCList list;
42 PRInt32 cnt;
43 PRLock *lock;
44 PRCondVar *cv;
45 PRCList wthreads;
46 #ifdef OPT_WINNT
47 HANDLE nt_completion_port;
48 #endif
49 } tp_jobq;
50
51 /*
52 * queue of IO jobs
53 */
54 typedef struct io_jobq {
55 PRCList list;
56 PRPollDesc *pollfds;
57 PRInt32 npollfds;
58 PRJob **polljobs;
59 PRLock *lock;
60 PRInt32 cnt;
61 PRFileDesc *notify_fd;
62 PRCList wthreads;
63 } io_jobq;
64
65 /*
66 * Threadpool
67 */
68 struct PRThreadPool {
69 PRInt32 init_threads;
70 PRInt32 max_threads;
71 PRInt32 current_threads;
72 PRInt32 idle_threads;
73 PRUint32 stacksize;
74 tp_jobq jobq;
75 io_jobq ioq;
76 timer_jobq timerq;
77 PRLock *join_lock; /* used with jobp->join_cv */
78 PRCondVar *shutdown_cv;
79 PRBool shutdown;
80 };
81
82 typedef enum io_op_type
83 { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
84
85 #ifdef OPT_WINNT
86 typedef struct NT_notifier {
87 OVERLAPPED overlapped; /* must be first */
88 PRJob *jobp;
89 } NT_notifier;
90 #endif
91
92 struct PRJob {
93 PRCList links; /* for linking jobs */
94 PRBool on_ioq; /* job on ioq */
95 PRBool on_timerq; /* job on timerq */
96 PRJobFn job_func;
97 void *job_arg;
98 PRCondVar *join_cv;
99 PRBool join_wait; /* == PR_TRUE, when waiting to join */
100 PRCondVar *cancel_cv; /* for cancelling IO jobs */
101 PRBool cancel_io; /* for cancelling IO jobs */
102 PRThreadPool *tpool; /* back pointer to thread pool */
103 PRJobIoDesc *iod;
104 io_op_type io_op;
105 PRInt16 io_poll_flags;
106 PRNetAddr *netaddr;
107 PRIntervalTime timeout; /* relative value */
108 PRIntervalTime absolute;
109 #ifdef OPT_WINNT
110 NT_notifier nt_notifier;
111 #endif
112 };
113
114 #define JOB_LINKS_PTR(_qp) \
115 ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
116
117 #define WTHREAD_LINKS_PTR(_qp) \
118 ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
119
120 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
121
122 #define JOIN_NOTIFY(_jobp) \
123 PR_BEGIN_MACRO \
124 PR_Lock(_jobp->tpool->join_lock); \
125 _jobp->join_wait = PR_FALSE; \
126 PR_NotifyCondVar(_jobp->join_cv); \
127 PR_Unlock(_jobp->tpool->join_lock); \
128 PR_END_MACRO
129
130 #define CANCEL_IO_JOB(jobp) \
131 PR_BEGIN_MACRO \
132 jobp->cancel_io = PR_FALSE; \
133 jobp->on_ioq = PR_FALSE; \
134 PR_REMOVE_AND_INIT_LINK(&jobp->links); \
135 tp->ioq.cnt--; \
136 PR_NotifyCondVar(jobp->cancel_cv); \
137 PR_END_MACRO
138
139 static void delete_job(PRJob *jobp);
140 static PRThreadPool * alloc_threadpool(void);
141 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
142 static void notify_ioq(PRThreadPool *tp);
143 static void notify_timerq(PRThreadPool *tp);
144
145 /*
146 * locks are acquired in the following order
147 *
148 * tp->ioq.lock,tp->timerq.lock
149 * |
150 * V
151 * tp->jobq->lock
152 */
153
154 /*
155 * worker thread function
156 */
wstart(void * arg)157 static void wstart(void *arg)
158 {
159 PRThreadPool *tp = (PRThreadPool *) arg;
160 PRCList *head;
161
162 /*
163 * execute jobs until shutdown
164 */
165 while (!tp->shutdown) {
166 PRJob *jobp;
167 #ifdef OPT_WINNT
168 BOOL rv;
169 DWORD unused, shutdown;
170 LPOVERLAPPED olp;
171
172 PR_Lock(tp->jobq.lock);
173 tp->idle_threads++;
174 PR_Unlock(tp->jobq.lock);
175 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
176 &unused, &shutdown, &olp, INFINITE);
177
178 PR_ASSERT(rv);
179 if (shutdown) {
180 break;
181 }
182 jobp = ((NT_notifier *) olp)->jobp;
183 PR_Lock(tp->jobq.lock);
184 tp->idle_threads--;
185 tp->jobq.cnt--;
186 PR_Unlock(tp->jobq.lock);
187 #else
188
189 PR_Lock(tp->jobq.lock);
190 while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
191 tp->idle_threads++;
192 PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
193 tp->idle_threads--;
194 }
195 if (tp->shutdown) {
196 PR_Unlock(tp->jobq.lock);
197 break;
198 }
199 head = PR_LIST_HEAD(&tp->jobq.list);
200 /*
201 * remove job from queue
202 */
203 PR_REMOVE_AND_INIT_LINK(head);
204 tp->jobq.cnt--;
205 jobp = JOB_LINKS_PTR(head);
206 PR_Unlock(tp->jobq.lock);
207 #endif
208
209 jobp->job_func(jobp->job_arg);
210 if (!JOINABLE_JOB(jobp)) {
211 delete_job(jobp);
212 } else {
213 JOIN_NOTIFY(jobp);
214 }
215 }
216 PR_Lock(tp->jobq.lock);
217 tp->current_threads--;
218 PR_Unlock(tp->jobq.lock);
219 }
220
221 /*
222 * add a job to the work queue
223 */
224 static void
add_to_jobq(PRThreadPool * tp,PRJob * jobp)225 add_to_jobq(PRThreadPool *tp, PRJob *jobp)
226 {
227 /*
228 * add to jobq
229 */
230 #ifdef OPT_WINNT
231 PR_Lock(tp->jobq.lock);
232 tp->jobq.cnt++;
233 PR_Unlock(tp->jobq.lock);
234 /*
235 * notify worker thread(s)
236 */
237 PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
238 FALSE, &jobp->nt_notifier.overlapped);
239 #else
240 PR_Lock(tp->jobq.lock);
241 PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
242 tp->jobq.cnt++;
243 if ((tp->idle_threads < tp->jobq.cnt) &&
244 (tp->current_threads < tp->max_threads)) {
245 wthread *wthrp;
246 /*
247 * increment thread count and unlock the jobq lock
248 */
249 tp->current_threads++;
250 PR_Unlock(tp->jobq.lock);
251 /* create new worker thread */
252 wthrp = PR_NEWZAP(wthread);
253 if (wthrp) {
254 wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
255 tp, PR_PRIORITY_NORMAL,
256 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
257 if (NULL == wthrp->thread) {
258 PR_DELETE(wthrp); /* this sets wthrp to NULL */
259 }
260 }
261 PR_Lock(tp->jobq.lock);
262 if (NULL == wthrp) {
263 tp->current_threads--;
264 } else {
265 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
266 }
267 }
268 /*
269 * wakeup a worker thread
270 */
271 PR_NotifyCondVar(tp->jobq.cv);
272 PR_Unlock(tp->jobq.lock);
273 #endif
274 }
275
276 /*
277 * io worker thread function
278 */
io_wstart(void * arg)279 static void io_wstart(void *arg)
280 {
281 PRThreadPool *tp = (PRThreadPool *) arg;
282 int pollfd_cnt, pollfds_used;
283 int rv;
284 PRCList *qp, *nextqp;
285 PRPollDesc *pollfds = NULL;
286 PRJob **polljobs = NULL;
287 int poll_timeout;
288 PRIntervalTime now;
289
290 /*
291 * scan io_jobq
292 * construct poll list
293 * call PR_Poll
294 * for all fds, for which poll returns true, move the job to
295 * jobq and wakeup worker thread.
296 */
297 while (!tp->shutdown) {
298 PRJob *jobp;
299
300 pollfd_cnt = tp->ioq.cnt + 10;
301 if (pollfd_cnt > tp->ioq.npollfds) {
302
303 /*
304 * re-allocate pollfd array if the current one is not large
305 * enough
306 */
307 if (NULL != tp->ioq.pollfds) {
308 PR_Free(tp->ioq.pollfds);
309 }
310 tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
311 (sizeof(PRPollDesc) + sizeof(PRJob *)));
312 PR_ASSERT(NULL != tp->ioq.pollfds);
313 /*
314 * array of pollfds
315 */
316 pollfds = tp->ioq.pollfds;
317 tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
318 /*
319 * parallel array of jobs
320 */
321 polljobs = tp->ioq.polljobs;
322 tp->ioq.npollfds = pollfd_cnt;
323 }
324
325 pollfds_used = 0;
326 /*
327 * add the notify fd; used for unblocking io thread(s)
328 */
329 pollfds[pollfds_used].fd = tp->ioq.notify_fd;
330 pollfds[pollfds_used].in_flags = PR_POLL_READ;
331 pollfds[pollfds_used].out_flags = 0;
332 polljobs[pollfds_used] = NULL;
333 pollfds_used++;
334 /*
335 * fill in the pollfd array
336 */
337 PR_Lock(tp->ioq.lock);
338 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
339 nextqp = qp->next;
340 jobp = JOB_LINKS_PTR(qp);
341 if (jobp->cancel_io) {
342 CANCEL_IO_JOB(jobp);
343 continue;
344 }
345 if (pollfds_used == (pollfd_cnt)) {
346 break;
347 }
348 pollfds[pollfds_used].fd = jobp->iod->socket;
349 pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
350 pollfds[pollfds_used].out_flags = 0;
351 polljobs[pollfds_used] = jobp;
352
353 pollfds_used++;
354 }
355 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
356 qp = tp->ioq.list.next;
357 jobp = JOB_LINKS_PTR(qp);
358 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
359 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
360 }
361 else if (PR_INTERVAL_NO_WAIT == jobp->timeout) {
362 poll_timeout = PR_INTERVAL_NO_WAIT;
363 }
364 else {
365 poll_timeout = jobp->absolute - PR_IntervalNow();
366 if (poll_timeout <= 0) { /* already timed out */
367 poll_timeout = PR_INTERVAL_NO_WAIT;
368 }
369 }
370 } else {
371 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
372 }
373 PR_Unlock(tp->ioq.lock);
374
375 /*
376 * XXXX
377 * should retry if more jobs have been added to the queue?
378 *
379 */
380 PR_ASSERT(pollfds_used <= pollfd_cnt);
381 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
382
383 if (tp->shutdown) {
384 break;
385 }
386
387 if (rv > 0) {
388 /*
389 * at least one io event is set
390 */
391 PRStatus rval_status;
392 PRInt32 index;
393
394 PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
395 /*
396 * reset the pollable event, if notified
397 */
398 if (pollfds[0].out_flags & PR_POLL_READ) {
399 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
400 PR_ASSERT(PR_SUCCESS == rval_status);
401 }
402
403 for(index = 1; index < (pollfds_used); index++) {
404 PRInt16 events = pollfds[index].in_flags;
405 PRInt16 revents = pollfds[index].out_flags;
406 jobp = polljobs[index];
407
408 if ((revents & PR_POLL_NVAL) || /* busted in all cases */
409 (revents & PR_POLL_ERR) ||
410 ((events & PR_POLL_WRITE) &&
411 (revents & PR_POLL_HUP))) { /* write op & hup */
412 PR_Lock(tp->ioq.lock);
413 if (jobp->cancel_io) {
414 CANCEL_IO_JOB(jobp);
415 PR_Unlock(tp->ioq.lock);
416 continue;
417 }
418 PR_REMOVE_AND_INIT_LINK(&jobp->links);
419 tp->ioq.cnt--;
420 jobp->on_ioq = PR_FALSE;
421 PR_Unlock(tp->ioq.lock);
422
423 /* set error */
424 if (PR_POLL_NVAL & revents) {
425 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
426 }
427 else if (PR_POLL_HUP & revents) {
428 jobp->iod->error = PR_CONNECT_RESET_ERROR;
429 }
430 else {
431 jobp->iod->error = PR_IO_ERROR;
432 }
433
434 /*
435 * add to jobq
436 */
437 add_to_jobq(tp, jobp);
438 } else if (revents) {
439 /*
440 * add to jobq
441 */
442 PR_Lock(tp->ioq.lock);
443 if (jobp->cancel_io) {
444 CANCEL_IO_JOB(jobp);
445 PR_Unlock(tp->ioq.lock);
446 continue;
447 }
448 PR_REMOVE_AND_INIT_LINK(&jobp->links);
449 tp->ioq.cnt--;
450 jobp->on_ioq = PR_FALSE;
451 PR_Unlock(tp->ioq.lock);
452
453 if (jobp->io_op == JOB_IO_CONNECT) {
454 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) {
455 jobp->iod->error = 0;
456 }
457 else {
458 jobp->iod->error = PR_GetError();
459 }
460 } else {
461 jobp->iod->error = 0;
462 }
463
464 add_to_jobq(tp, jobp);
465 }
466 }
467 }
468 /*
469 * timeout processing
470 */
471 now = PR_IntervalNow();
472 PR_Lock(tp->ioq.lock);
473 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
474 nextqp = qp->next;
475 jobp = JOB_LINKS_PTR(qp);
476 if (jobp->cancel_io) {
477 CANCEL_IO_JOB(jobp);
478 continue;
479 }
480 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
481 break;
482 }
483 if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
484 ((PRInt32)(jobp->absolute - now) > 0)) {
485 break;
486 }
487 PR_REMOVE_AND_INIT_LINK(&jobp->links);
488 tp->ioq.cnt--;
489 jobp->on_ioq = PR_FALSE;
490 jobp->iod->error = PR_IO_TIMEOUT_ERROR;
491 add_to_jobq(tp, jobp);
492 }
493 PR_Unlock(tp->ioq.lock);
494 }
495 }
496
497 /*
498 * timer worker thread function
499 */
timer_wstart(void * arg)500 static void timer_wstart(void *arg)
501 {
502 PRThreadPool *tp = (PRThreadPool *) arg;
503 PRCList *qp;
504 PRIntervalTime timeout;
505 PRIntervalTime now;
506
507 /*
508 * call PR_WaitCondVar with minimum value of all timeouts
509 */
510 while (!tp->shutdown) {
511 PRJob *jobp;
512
513 PR_Lock(tp->timerq.lock);
514 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
515 timeout = PR_INTERVAL_NO_TIMEOUT;
516 } else {
517 PRCList *qp;
518
519 qp = tp->timerq.list.next;
520 jobp = JOB_LINKS_PTR(qp);
521
522 timeout = jobp->absolute - PR_IntervalNow();
523 if (timeout <= 0) {
524 timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
525 }
526 }
527 if (PR_INTERVAL_NO_WAIT != timeout) {
528 PR_WaitCondVar(tp->timerq.cv, timeout);
529 }
530 if (tp->shutdown) {
531 PR_Unlock(tp->timerq.lock);
532 break;
533 }
534 /*
535 * move expired-timer jobs to jobq
536 */
537 now = PR_IntervalNow();
538 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
539 qp = tp->timerq.list.next;
540 jobp = JOB_LINKS_PTR(qp);
541
542 if ((PRInt32)(jobp->absolute - now) > 0) {
543 break;
544 }
545 /*
546 * job timed out
547 */
548 PR_REMOVE_AND_INIT_LINK(&jobp->links);
549 tp->timerq.cnt--;
550 jobp->on_timerq = PR_FALSE;
551 add_to_jobq(tp, jobp);
552 }
553 PR_Unlock(tp->timerq.lock);
554 }
555 }
556
557 static void
delete_threadpool(PRThreadPool * tp)558 delete_threadpool(PRThreadPool *tp)
559 {
560 if (NULL != tp) {
561 if (NULL != tp->shutdown_cv) {
562 PR_DestroyCondVar(tp->shutdown_cv);
563 }
564 if (NULL != tp->jobq.cv) {
565 PR_DestroyCondVar(tp->jobq.cv);
566 }
567 if (NULL != tp->jobq.lock) {
568 PR_DestroyLock(tp->jobq.lock);
569 }
570 if (NULL != tp->join_lock) {
571 PR_DestroyLock(tp->join_lock);
572 }
573 #ifdef OPT_WINNT
574 if (NULL != tp->jobq.nt_completion_port) {
575 CloseHandle(tp->jobq.nt_completion_port);
576 }
577 #endif
578 /* Timer queue */
579 if (NULL != tp->timerq.cv) {
580 PR_DestroyCondVar(tp->timerq.cv);
581 }
582 if (NULL != tp->timerq.lock) {
583 PR_DestroyLock(tp->timerq.lock);
584 }
585
586 if (NULL != tp->ioq.lock) {
587 PR_DestroyLock(tp->ioq.lock);
588 }
589 if (NULL != tp->ioq.pollfds) {
590 PR_Free(tp->ioq.pollfds);
591 }
592 if (NULL != tp->ioq.notify_fd) {
593 PR_DestroyPollableEvent(tp->ioq.notify_fd);
594 }
595 PR_Free(tp);
596 }
597 return;
598 }
599
600 static PRThreadPool *
alloc_threadpool(void)601 alloc_threadpool(void)
602 {
603 PRThreadPool *tp;
604
605 tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
606 if (NULL == tp) {
607 goto failed;
608 }
609 tp->jobq.lock = PR_NewLock();
610 if (NULL == tp->jobq.lock) {
611 goto failed;
612 }
613 tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
614 if (NULL == tp->jobq.cv) {
615 goto failed;
616 }
617 tp->join_lock = PR_NewLock();
618 if (NULL == tp->join_lock) {
619 goto failed;
620 }
621 #ifdef OPT_WINNT
622 tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
623 NULL, 0, 0);
624 if (NULL == tp->jobq.nt_completion_port) {
625 goto failed;
626 }
627 #endif
628
629 tp->ioq.lock = PR_NewLock();
630 if (NULL == tp->ioq.lock) {
631 goto failed;
632 }
633
634 /* Timer queue */
635
636 tp->timerq.lock = PR_NewLock();
637 if (NULL == tp->timerq.lock) {
638 goto failed;
639 }
640 tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
641 if (NULL == tp->timerq.cv) {
642 goto failed;
643 }
644
645 tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
646 if (NULL == tp->shutdown_cv) {
647 goto failed;
648 }
649 tp->ioq.notify_fd = PR_NewPollableEvent();
650 if (NULL == tp->ioq.notify_fd) {
651 goto failed;
652 }
653 return tp;
654 failed:
655 delete_threadpool(tp);
656 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
657 return NULL;
658 }
659
660 /* Create thread pool */
661 PR_IMPLEMENT(PRThreadPool *)
PR_CreateThreadPool(PRInt32 initial_threads,PRInt32 max_threads,PRUint32 stacksize)662 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
663 PRUint32 stacksize)
664 {
665 PRThreadPool *tp;
666 PRThread *thr;
667 int i;
668 wthread *wthrp;
669
670 tp = alloc_threadpool();
671 if (NULL == tp) {
672 return NULL;
673 }
674
675 tp->init_threads = initial_threads;
676 tp->max_threads = max_threads;
677 tp->stacksize = stacksize;
678 PR_INIT_CLIST(&tp->jobq.list);
679 PR_INIT_CLIST(&tp->ioq.list);
680 PR_INIT_CLIST(&tp->timerq.list);
681 PR_INIT_CLIST(&tp->jobq.wthreads);
682 PR_INIT_CLIST(&tp->ioq.wthreads);
683 PR_INIT_CLIST(&tp->timerq.wthreads);
684 tp->shutdown = PR_FALSE;
685
686 PR_Lock(tp->jobq.lock);
687 for(i=0; i < initial_threads; ++i) {
688
689 thr = PR_CreateThread(PR_USER_THREAD, wstart,
690 tp, PR_PRIORITY_NORMAL,
691 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
692 PR_ASSERT(thr);
693 wthrp = PR_NEWZAP(wthread);
694 PR_ASSERT(wthrp);
695 wthrp->thread = thr;
696 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
697 }
698 tp->current_threads = initial_threads;
699
700 thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
701 tp, PR_PRIORITY_NORMAL,
702 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
703 PR_ASSERT(thr);
704 wthrp = PR_NEWZAP(wthread);
705 PR_ASSERT(wthrp);
706 wthrp->thread = thr;
707 PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
708
709 thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
710 tp, PR_PRIORITY_NORMAL,
711 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
712 PR_ASSERT(thr);
713 wthrp = PR_NEWZAP(wthread);
714 PR_ASSERT(wthrp);
715 wthrp->thread = thr;
716 PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
717
718 PR_Unlock(tp->jobq.lock);
719 return tp;
720 }
721
722 static void
delete_job(PRJob * jobp)723 delete_job(PRJob *jobp)
724 {
725 if (NULL != jobp) {
726 if (NULL != jobp->join_cv) {
727 PR_DestroyCondVar(jobp->join_cv);
728 jobp->join_cv = NULL;
729 }
730 if (NULL != jobp->cancel_cv) {
731 PR_DestroyCondVar(jobp->cancel_cv);
732 jobp->cancel_cv = NULL;
733 }
734 PR_DELETE(jobp);
735 }
736 }
737
738 static PRJob *
alloc_job(PRBool joinable,PRThreadPool * tp)739 alloc_job(PRBool joinable, PRThreadPool *tp)
740 {
741 PRJob *jobp;
742
743 jobp = PR_NEWZAP(PRJob);
744 if (NULL == jobp) {
745 goto failed;
746 }
747 if (joinable) {
748 jobp->join_cv = PR_NewCondVar(tp->join_lock);
749 jobp->join_wait = PR_TRUE;
750 if (NULL == jobp->join_cv) {
751 goto failed;
752 }
753 } else {
754 jobp->join_cv = NULL;
755 }
756 #ifdef OPT_WINNT
757 jobp->nt_notifier.jobp = jobp;
758 #endif
759 return jobp;
760 failed:
761 delete_job(jobp);
762 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
763 return NULL;
764 }
765
766 /* queue a job */
767 PR_IMPLEMENT(PRJob *)
PR_QueueJob(PRThreadPool * tpool,PRJobFn fn,void * arg,PRBool joinable)768 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
769 {
770 PRJob *jobp;
771
772 jobp = alloc_job(joinable, tpool);
773 if (NULL == jobp) {
774 return NULL;
775 }
776
777 jobp->job_func = fn;
778 jobp->job_arg = arg;
779 jobp->tpool = tpool;
780
781 add_to_jobq(tpool, jobp);
782 return jobp;
783 }
784
785 /* queue a job, when a socket is readable or writeable */
786 static PRJob *
queue_io_job(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable,io_op_type op)787 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
788 PRBool joinable, io_op_type op)
789 {
790 PRJob *jobp;
791 PRIntervalTime now;
792
793 jobp = alloc_job(joinable, tpool);
794 if (NULL == jobp) {
795 return NULL;
796 }
797
798 /*
799 * Add a new job to io_jobq
800 * wakeup io worker thread
801 */
802
803 jobp->job_func = fn;
804 jobp->job_arg = arg;
805 jobp->tpool = tpool;
806 jobp->iod = iod;
807 if (JOB_IO_READ == op) {
808 jobp->io_op = JOB_IO_READ;
809 jobp->io_poll_flags = PR_POLL_READ;
810 } else if (JOB_IO_WRITE == op) {
811 jobp->io_op = JOB_IO_WRITE;
812 jobp->io_poll_flags = PR_POLL_WRITE;
813 } else if (JOB_IO_ACCEPT == op) {
814 jobp->io_op = JOB_IO_ACCEPT;
815 jobp->io_poll_flags = PR_POLL_READ;
816 } else if (JOB_IO_CONNECT == op) {
817 jobp->io_op = JOB_IO_CONNECT;
818 jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
819 } else {
820 delete_job(jobp);
821 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
822 return NULL;
823 }
824
825 jobp->timeout = iod->timeout;
826 if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
827 (PR_INTERVAL_NO_WAIT == iod->timeout)) {
828 jobp->absolute = iod->timeout;
829 } else {
830 now = PR_IntervalNow();
831 jobp->absolute = now + iod->timeout;
832 }
833
834
835 PR_Lock(tpool->ioq.lock);
836
837 if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
838 (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
839 PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
840 } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
841 PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
842 } else {
843 PRCList *qp;
844 PRJob *tmp_jobp;
845 /*
846 * insert into the timeout-sorted ioq
847 */
848 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
849 qp = qp->prev) {
850 tmp_jobp = JOB_LINKS_PTR(qp);
851 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
852 break;
853 }
854 }
855 PR_INSERT_AFTER(&jobp->links,qp);
856 }
857
858 jobp->on_ioq = PR_TRUE;
859 tpool->ioq.cnt++;
860 /*
861 * notify io worker thread(s)
862 */
863 PR_Unlock(tpool->ioq.lock);
864 notify_ioq(tpool);
865 return jobp;
866 }
867
868 /* queue a job, when a socket is readable */
869 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Read(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)870 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
871 PRBool joinable)
872 {
873 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
874 }
875
876 /* queue a job, when a socket is writeable */
877 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Write(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)878 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
879 PRBool joinable)
880 {
881 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
882 }
883
884
885 /* queue a job, when a socket has a pending connection */
886 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Accept(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)887 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
888 void * arg, PRBool joinable)
889 {
890 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
891 }
892
893 /* queue a job, when a socket can be connected */
894 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Connect(PRThreadPool * tpool,PRJobIoDesc * iod,const PRNetAddr * addr,PRJobFn fn,void * arg,PRBool joinable)895 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
896 const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
897 {
898 PRStatus rv;
899 PRErrorCode err;
900
901 rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
902 if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) {
903 /* connection pending */
904 return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
905 }
906 /*
907 * connection succeeded or failed; add to jobq right away
908 */
909 if (rv == PR_FAILURE) {
910 iod->error = err;
911 }
912 else {
913 iod->error = 0;
914 }
915 return(PR_QueueJob(tpool, fn, arg, joinable));
916
917 }
918
919 /* queue a job, when a timer expires */
920 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Timer(PRThreadPool * tpool,PRIntervalTime timeout,PRJobFn fn,void * arg,PRBool joinable)921 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
922 PRJobFn fn, void * arg, PRBool joinable)
923 {
924 PRIntervalTime now;
925 PRJob *jobp;
926
927 if (PR_INTERVAL_NO_TIMEOUT == timeout) {
928 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
929 return NULL;
930 }
931 if (PR_INTERVAL_NO_WAIT == timeout) {
932 /*
933 * no waiting; add to jobq right away
934 */
935 return(PR_QueueJob(tpool, fn, arg, joinable));
936 }
937 jobp = alloc_job(joinable, tpool);
938 if (NULL == jobp) {
939 return NULL;
940 }
941
942 /*
943 * Add a new job to timer_jobq
944 * wakeup timer worker thread
945 */
946
947 jobp->job_func = fn;
948 jobp->job_arg = arg;
949 jobp->tpool = tpool;
950 jobp->timeout = timeout;
951
952 now = PR_IntervalNow();
953 jobp->absolute = now + timeout;
954
955
956 PR_Lock(tpool->timerq.lock);
957 jobp->on_timerq = PR_TRUE;
958 if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
959 PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
960 }
961 else {
962 PRCList *qp;
963 PRJob *tmp_jobp;
964 /*
965 * insert into the sorted timer jobq
966 */
967 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
968 qp = qp->prev) {
969 tmp_jobp = JOB_LINKS_PTR(qp);
970 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
971 break;
972 }
973 }
974 PR_INSERT_AFTER(&jobp->links,qp);
975 }
976 tpool->timerq.cnt++;
977 /*
978 * notify timer worker thread(s)
979 */
980 notify_timerq(tpool);
981 PR_Unlock(tpool->timerq.lock);
982 return jobp;
983 }
984
985 static void
notify_timerq(PRThreadPool * tp)986 notify_timerq(PRThreadPool *tp)
987 {
988 /*
989 * wakeup the timer thread(s)
990 */
991 PR_NotifyCondVar(tp->timerq.cv);
992 }
993
994 static void
notify_ioq(PRThreadPool * tp)995 notify_ioq(PRThreadPool *tp)
996 {
997 PRStatus rval_status;
998
999 /*
1000 * wakeup the io thread(s)
1001 */
1002 rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
1003 PR_ASSERT(PR_SUCCESS == rval_status);
1004 }
1005
1006 /*
1007 * cancel a job
1008 *
1009 * XXXX: is this needed? likely to be removed
1010 */
1011 PR_IMPLEMENT(PRStatus)
PR_CancelJob(PRJob * jobp)1012 PR_CancelJob(PRJob *jobp) {
1013
1014 PRStatus rval = PR_FAILURE;
1015 PRThreadPool *tp;
1016
1017 if (jobp->on_timerq) {
1018 /*
1019 * now, check again while holding the timerq lock
1020 */
1021 tp = jobp->tpool;
1022 PR_Lock(tp->timerq.lock);
1023 if (jobp->on_timerq) {
1024 jobp->on_timerq = PR_FALSE;
1025 PR_REMOVE_AND_INIT_LINK(&jobp->links);
1026 tp->timerq.cnt--;
1027 PR_Unlock(tp->timerq.lock);
1028 if (!JOINABLE_JOB(jobp)) {
1029 delete_job(jobp);
1030 } else {
1031 JOIN_NOTIFY(jobp);
1032 }
1033 rval = PR_SUCCESS;
1034 } else {
1035 PR_Unlock(tp->timerq.lock);
1036 }
1037 } else if (jobp->on_ioq) {
1038 /*
1039 * now, check again while holding the ioq lock
1040 */
1041 tp = jobp->tpool;
1042 PR_Lock(tp->ioq.lock);
1043 if (jobp->on_ioq) {
1044 jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
1045 if (NULL == jobp->cancel_cv) {
1046 PR_Unlock(tp->ioq.lock);
1047 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
1048 return PR_FAILURE;
1049 }
1050 /*
1051 * mark job 'cancelled' and notify io thread(s)
1052 * XXXX:
1053 * this assumes there is only one io thread; when there
1054 * are multiple threads, the io thread processing this job
1055 * must be notified.
1056 */
1057 jobp->cancel_io = PR_TRUE;
1058 PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
1059 notify_ioq(tp);
1060 PR_Lock(tp->ioq.lock);
1061 while (jobp->cancel_io) {
1062 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
1063 }
1064 PR_Unlock(tp->ioq.lock);
1065 PR_ASSERT(!jobp->on_ioq);
1066 if (!JOINABLE_JOB(jobp)) {
1067 delete_job(jobp);
1068 } else {
1069 JOIN_NOTIFY(jobp);
1070 }
1071 rval = PR_SUCCESS;
1072 } else {
1073 PR_Unlock(tp->ioq.lock);
1074 }
1075 }
1076 if (PR_FAILURE == rval) {
1077 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1078 }
1079 return rval;
1080 }
1081
1082 /* join a job, wait until completion */
1083 PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob * jobp)1084 PR_JoinJob(PRJob *jobp)
1085 {
1086 if (!JOINABLE_JOB(jobp)) {
1087 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1088 return PR_FAILURE;
1089 }
1090 PR_Lock(jobp->tpool->join_lock);
1091 while(jobp->join_wait) {
1092 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
1093 }
1094 PR_Unlock(jobp->tpool->join_lock);
1095 delete_job(jobp);
1096 return PR_SUCCESS;
1097 }
1098
1099 /* shutdown threadpool */
1100 PR_IMPLEMENT(PRStatus)
PR_ShutdownThreadPool(PRThreadPool * tpool)1101 PR_ShutdownThreadPool(PRThreadPool *tpool)
1102 {
1103 PRStatus rval = PR_SUCCESS;
1104
1105 PR_Lock(tpool->jobq.lock);
1106 tpool->shutdown = PR_TRUE;
1107 PR_NotifyAllCondVar(tpool->shutdown_cv);
1108 PR_Unlock(tpool->jobq.lock);
1109
1110 return rval;
1111 }
1112
1113 /*
1114 * join thread pool
1115 * wait for termination of worker threads
1116 * reclaim threadpool resources
1117 */
1118 PR_IMPLEMENT(PRStatus)
PR_JoinThreadPool(PRThreadPool * tpool)1119 PR_JoinThreadPool(PRThreadPool *tpool)
1120 {
1121 PRStatus rval = PR_SUCCESS;
1122 PRCList *head;
1123 PRStatus rval_status;
1124
1125 PR_Lock(tpool->jobq.lock);
1126 while (!tpool->shutdown) {
1127 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
1128 }
1129
1130 /*
1131 * wakeup worker threads
1132 */
1133 #ifdef OPT_WINNT
1134 /*
1135 * post shutdown notification for all threads
1136 */
1137 {
1138 int i;
1139 for(i=0; i < tpool->current_threads; i++) {
1140 PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
1141 TRUE, NULL);
1142 }
1143 }
1144 #else
1145 PR_NotifyAllCondVar(tpool->jobq.cv);
1146 #endif
1147
1148 /*
1149 * wakeup io thread(s)
1150 */
1151 notify_ioq(tpool);
1152
1153 /*
1154 * wakeup timer thread(s)
1155 */
1156 PR_Lock(tpool->timerq.lock);
1157 notify_timerq(tpool);
1158 PR_Unlock(tpool->timerq.lock);
1159
1160 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
1161 wthread *wthrp;
1162
1163 head = PR_LIST_HEAD(&tpool->jobq.wthreads);
1164 PR_REMOVE_AND_INIT_LINK(head);
1165 PR_Unlock(tpool->jobq.lock);
1166 wthrp = WTHREAD_LINKS_PTR(head);
1167 rval_status = PR_JoinThread(wthrp->thread);
1168 PR_ASSERT(PR_SUCCESS == rval_status);
1169 PR_DELETE(wthrp);
1170 PR_Lock(tpool->jobq.lock);
1171 }
1172 PR_Unlock(tpool->jobq.lock);
1173 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
1174 wthread *wthrp;
1175
1176 head = PR_LIST_HEAD(&tpool->ioq.wthreads);
1177 PR_REMOVE_AND_INIT_LINK(head);
1178 wthrp = WTHREAD_LINKS_PTR(head);
1179 rval_status = PR_JoinThread(wthrp->thread);
1180 PR_ASSERT(PR_SUCCESS == rval_status);
1181 PR_DELETE(wthrp);
1182 }
1183
1184 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
1185 wthread *wthrp;
1186
1187 head = PR_LIST_HEAD(&tpool->timerq.wthreads);
1188 PR_REMOVE_AND_INIT_LINK(head);
1189 wthrp = WTHREAD_LINKS_PTR(head);
1190 rval_status = PR_JoinThread(wthrp->thread);
1191 PR_ASSERT(PR_SUCCESS == rval_status);
1192 PR_DELETE(wthrp);
1193 }
1194
1195 /*
1196 * Delete queued jobs
1197 */
1198 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
1199 PRJob *jobp;
1200
1201 head = PR_LIST_HEAD(&tpool->jobq.list);
1202 PR_REMOVE_AND_INIT_LINK(head);
1203 jobp = JOB_LINKS_PTR(head);
1204 tpool->jobq.cnt--;
1205 delete_job(jobp);
1206 }
1207
1208 /* delete io jobs */
1209 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
1210 PRJob *jobp;
1211
1212 head = PR_LIST_HEAD(&tpool->ioq.list);
1213 PR_REMOVE_AND_INIT_LINK(head);
1214 tpool->ioq.cnt--;
1215 jobp = JOB_LINKS_PTR(head);
1216 delete_job(jobp);
1217 }
1218
1219 /* delete timer jobs */
1220 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
1221 PRJob *jobp;
1222
1223 head = PR_LIST_HEAD(&tpool->timerq.list);
1224 PR_REMOVE_AND_INIT_LINK(head);
1225 tpool->timerq.cnt--;
1226 jobp = JOB_LINKS_PTR(head);
1227 delete_job(jobp);
1228 }
1229
1230 PR_ASSERT(0 == tpool->jobq.cnt);
1231 PR_ASSERT(0 == tpool->ioq.cnt);
1232 PR_ASSERT(0 == tpool->timerq.cnt);
1233
1234 delete_threadpool(tpool);
1235 return rval;
1236 }
1237