1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3  * License, v. 2.0. If a copy of the MPL was not distributed with this
4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 
6 #include "nspr.h"
7 
8 /*
9  * Thread pools
10  *	Thread pools create and manage threads to provide support for
11  *	scheduling jobs onto one or more threads.
12  *
13  */
14 #ifdef OPT_WINNT
15 #include <windows.h>
16 #endif
17 
18 /*
19  * worker thread
20  */
21 typedef struct wthread {
22 	PRCList		links;
23 	PRThread	*thread;
24 } wthread;
25 
26 /*
27  * queue of timer jobs
28  */
29 typedef struct timer_jobq {
30 	PRCList		list;
31 	PRLock		*lock;
32 	PRCondVar	*cv;
33 	PRInt32		cnt;
34 	PRCList 	wthreads;
35 } timer_jobq;
36 
37 /*
38  * queue of jobs
39  */
40 typedef struct tp_jobq {
41 	PRCList		list;
42 	PRInt32		cnt;
43 	PRLock		*lock;
44 	PRCondVar	*cv;
45 	PRCList 	wthreads;
46 #ifdef OPT_WINNT
47 	HANDLE		nt_completion_port;
48 #endif
49 } tp_jobq;
50 
51 /*
52  * queue of IO jobs
53  */
54 typedef struct io_jobq {
55 	PRCList		list;
56 	PRPollDesc  *pollfds;
57 	PRInt32  	npollfds;
58 	PRJob		**polljobs;
59 	PRLock		*lock;
60 	PRInt32		cnt;
61 	PRFileDesc	*notify_fd;
62 	PRCList 	wthreads;
63 } io_jobq;
64 
65 /*
66  * Threadpool
67  */
68 struct PRThreadPool {
69 	PRInt32		init_threads;
70 	PRInt32		max_threads;
71 	PRInt32		current_threads;
72 	PRInt32		idle_threads;
73 	PRUint32	stacksize;
74 	tp_jobq		jobq;
75 	io_jobq		ioq;
76 	timer_jobq	timerq;
77 	PRLock		*join_lock;		/* used with jobp->join_cv */
78 	PRCondVar	*shutdown_cv;
79 	PRBool		shutdown;
80 };
81 
82 typedef enum io_op_type
83 	{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
84 
85 #ifdef OPT_WINNT
86 typedef struct NT_notifier {
87 	OVERLAPPED overlapped;		/* must be first */
88 	PRJob	*jobp;
89 } NT_notifier;
90 #endif
91 
92 struct PRJob {
93 	PRCList			links;		/* 	for linking jobs */
94 	PRBool			on_ioq;		/* job on ioq */
95 	PRBool			on_timerq;	/* job on timerq */
96 	PRJobFn			job_func;
97 	void 			*job_arg;
98 	PRCondVar		*join_cv;
99 	PRBool			join_wait;	/* == PR_TRUE, when waiting to join */
100 	PRCondVar		*cancel_cv;	/* for cancelling IO jobs */
101 	PRBool			cancel_io;	/* for cancelling IO jobs */
102 	PRThreadPool	*tpool;		/* back pointer to thread pool */
103 	PRJobIoDesc		*iod;
104 	io_op_type		io_op;
105 	PRInt16			io_poll_flags;
106 	PRNetAddr		*netaddr;
107 	PRIntervalTime	timeout;	/* relative value */
108 	PRIntervalTime	absolute;
109 #ifdef OPT_WINNT
110 	NT_notifier		nt_notifier;
111 #endif
112 };
113 
114 #define JOB_LINKS_PTR(_qp) \
115     ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
116 
117 #define WTHREAD_LINKS_PTR(_qp) \
118     ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
119 
120 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
121 
122 #define JOIN_NOTIFY(_jobp)								\
123 				PR_BEGIN_MACRO							\
124 				PR_Lock(_jobp->tpool->join_lock);		\
125 				_jobp->join_wait = PR_FALSE;			\
126 				PR_NotifyCondVar(_jobp->join_cv);		\
127 				PR_Unlock(_jobp->tpool->join_lock);		\
128 				PR_END_MACRO
129 
130 #define CANCEL_IO_JOB(jobp)								\
131 				PR_BEGIN_MACRO							\
132 				jobp->cancel_io = PR_FALSE;				\
133 				jobp->on_ioq = PR_FALSE;				\
134 				PR_REMOVE_AND_INIT_LINK(&jobp->links);	\
135 				tp->ioq.cnt--;							\
136 				PR_NotifyCondVar(jobp->cancel_cv);		\
137 				PR_END_MACRO
138 
139 static void delete_job(PRJob *jobp);
140 static PRThreadPool * alloc_threadpool(void);
141 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
142 static void notify_ioq(PRThreadPool *tp);
143 static void notify_timerq(PRThreadPool *tp);
144 
145 /*
146  * locks are acquired in the following order
147  *
148  *	tp->ioq.lock,tp->timerq.lock
149  *			|
150  *			V
151  *		tp->jobq->lock
152  */
153 
154 /*
155  * worker thread function
156  */
wstart(void * arg)157 static void wstart(void *arg)
158 {
159 PRThreadPool *tp = (PRThreadPool *) arg;
160 PRCList *head;
161 
162 	/*
163 	 * execute jobs until shutdown
164 	 */
165 	while (!tp->shutdown) {
166 		PRJob *jobp;
167 #ifdef OPT_WINNT
168 		BOOL rv;
169 		DWORD unused, shutdown;
170 		LPOVERLAPPED olp;
171 
172 		PR_Lock(tp->jobq.lock);
173 		tp->idle_threads++;
174 		PR_Unlock(tp->jobq.lock);
175 		rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
176 					&unused, &shutdown, &olp, INFINITE);
177 
178 		PR_ASSERT(rv);
179 		if (shutdown)
180 			break;
181 		jobp = ((NT_notifier *) olp)->jobp;
182 		PR_Lock(tp->jobq.lock);
183 		tp->idle_threads--;
184 		tp->jobq.cnt--;
185 		PR_Unlock(tp->jobq.lock);
186 #else
187 
188 		PR_Lock(tp->jobq.lock);
189 		while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
190 			tp->idle_threads++;
191 			PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
192 			tp->idle_threads--;
193 		}
194 		if (tp->shutdown) {
195 			PR_Unlock(tp->jobq.lock);
196 			break;
197 		}
198 		head = PR_LIST_HEAD(&tp->jobq.list);
199 		/*
200 		 * remove job from queue
201 		 */
202 		PR_REMOVE_AND_INIT_LINK(head);
203 		tp->jobq.cnt--;
204 		jobp = JOB_LINKS_PTR(head);
205 		PR_Unlock(tp->jobq.lock);
206 #endif
207 
208 		jobp->job_func(jobp->job_arg);
209 		if (!JOINABLE_JOB(jobp)) {
210 			delete_job(jobp);
211 		} else {
212 			JOIN_NOTIFY(jobp);
213 		}
214 	}
215 	PR_Lock(tp->jobq.lock);
216 	tp->current_threads--;
217 	PR_Unlock(tp->jobq.lock);
218 }
219 
220 /*
221  * add a job to the work queue
222  */
223 static void
add_to_jobq(PRThreadPool * tp,PRJob * jobp)224 add_to_jobq(PRThreadPool *tp, PRJob *jobp)
225 {
226 	/*
227 	 * add to jobq
228 	 */
229 #ifdef OPT_WINNT
230 	PR_Lock(tp->jobq.lock);
231 	tp->jobq.cnt++;
232 	PR_Unlock(tp->jobq.lock);
233 	/*
234 	 * notify worker thread(s)
235 	 */
236 	PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
237             FALSE, &jobp->nt_notifier.overlapped);
238 #else
239 	PR_Lock(tp->jobq.lock);
240 	PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
241 	tp->jobq.cnt++;
242 	if ((tp->idle_threads < tp->jobq.cnt) &&
243 					(tp->current_threads < tp->max_threads)) {
244 		wthread *wthrp;
245 		/*
246 		 * increment thread count and unlock the jobq lock
247 		 */
248 		tp->current_threads++;
249 		PR_Unlock(tp->jobq.lock);
250 		/* create new worker thread */
251 		wthrp = PR_NEWZAP(wthread);
252 		if (wthrp) {
253 			wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
254 						tp, PR_PRIORITY_NORMAL,
255 						PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
256 			if (NULL == wthrp->thread) {
257 				PR_DELETE(wthrp);  /* this sets wthrp to NULL */
258 			}
259 		}
260 		PR_Lock(tp->jobq.lock);
261 		if (NULL == wthrp) {
262 			tp->current_threads--;
263 		} else {
264 			PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
265 		}
266 	}
267 	/*
268 	 * wakeup a worker thread
269 	 */
270 	PR_NotifyCondVar(tp->jobq.cv);
271 	PR_Unlock(tp->jobq.lock);
272 #endif
273 }
274 
275 /*
276  * io worker thread function
277  */
io_wstart(void * arg)278 static void io_wstart(void *arg)
279 {
280 PRThreadPool *tp = (PRThreadPool *) arg;
281 int pollfd_cnt, pollfds_used;
282 int rv;
283 PRCList *qp, *nextqp;
284 PRPollDesc *pollfds = NULL;
285 PRJob **polljobs = NULL;
286 int poll_timeout;
287 PRIntervalTime now;
288 
289 	/*
290 	 * scan io_jobq
291 	 * construct poll list
292 	 * call PR_Poll
293 	 * for all fds, for which poll returns true, move the job to
294 	 * jobq and wakeup worker thread.
295 	 */
296 	while (!tp->shutdown) {
297 		PRJob *jobp;
298 
299 		pollfd_cnt = tp->ioq.cnt + 10;
300 		if (pollfd_cnt > tp->ioq.npollfds) {
301 
302 			/*
303 			 * re-allocate pollfd array if the current one is not large
304 			 * enough
305 			 */
306 			if (NULL != tp->ioq.pollfds)
307 				PR_Free(tp->ioq.pollfds);
308 			tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
309 						(sizeof(PRPollDesc) + sizeof(PRJob *)));
310 			PR_ASSERT(NULL != tp->ioq.pollfds);
311 			/*
312 			 * array of pollfds
313 			 */
314 			pollfds = tp->ioq.pollfds;
315 			tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
316 			/*
317 			 * parallel array of jobs
318 			 */
319 			polljobs = tp->ioq.polljobs;
320 			tp->ioq.npollfds = pollfd_cnt;
321 		}
322 
323 		pollfds_used = 0;
324 		/*
325 		 * add the notify fd; used for unblocking io thread(s)
326 		 */
327 		pollfds[pollfds_used].fd = tp->ioq.notify_fd;
328 		pollfds[pollfds_used].in_flags = PR_POLL_READ;
329 		pollfds[pollfds_used].out_flags = 0;
330 		polljobs[pollfds_used] = NULL;
331 		pollfds_used++;
332 		/*
333 		 * fill in the pollfd array
334 		 */
335 		PR_Lock(tp->ioq.lock);
336 		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
337 			nextqp = qp->next;
338 			jobp = JOB_LINKS_PTR(qp);
339 			if (jobp->cancel_io) {
340 				CANCEL_IO_JOB(jobp);
341 				continue;
342 			}
343 			if (pollfds_used == (pollfd_cnt))
344 				break;
345 			pollfds[pollfds_used].fd = jobp->iod->socket;
346 			pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
347 			pollfds[pollfds_used].out_flags = 0;
348 			polljobs[pollfds_used] = jobp;
349 
350 			pollfds_used++;
351 		}
352 		if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
353 			qp = tp->ioq.list.next;
354 			jobp = JOB_LINKS_PTR(qp);
355 			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
356 				poll_timeout = PR_INTERVAL_NO_TIMEOUT;
357 			else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
358 				poll_timeout = PR_INTERVAL_NO_WAIT;
359 			else {
360 				poll_timeout = jobp->absolute - PR_IntervalNow();
361 				if (poll_timeout <= 0) /* already timed out */
362 					poll_timeout = PR_INTERVAL_NO_WAIT;
363 			}
364 		} else {
365 			poll_timeout = PR_INTERVAL_NO_TIMEOUT;
366 		}
367 		PR_Unlock(tp->ioq.lock);
368 
369 		/*
370 		 * XXXX
371 		 * should retry if more jobs have been added to the queue?
372 		 *
373 		 */
374 		PR_ASSERT(pollfds_used <= pollfd_cnt);
375 		rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
376 
377 		if (tp->shutdown) {
378 			break;
379 		}
380 
381 		if (rv > 0) {
382 			/*
383 			 * at least one io event is set
384 			 */
385 			PRStatus rval_status;
386 			PRInt32 index;
387 
388 			PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
389 			/*
390 			 * reset the pollable event, if notified
391 			 */
392 			if (pollfds[0].out_flags & PR_POLL_READ) {
393 				rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
394 				PR_ASSERT(PR_SUCCESS == rval_status);
395 			}
396 
397 			for(index = 1; index < (pollfds_used); index++) {
398                 PRInt16 events = pollfds[index].in_flags;
399                 PRInt16 revents = pollfds[index].out_flags;
400 				jobp = polljobs[index];
401 
402                 if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
403                 	(revents & PR_POLL_ERR) ||
404                 			((events & PR_POLL_WRITE) &&
405 							(revents & PR_POLL_HUP))) { /* write op & hup */
406 					PR_Lock(tp->ioq.lock);
407 					if (jobp->cancel_io) {
408 						CANCEL_IO_JOB(jobp);
409 						PR_Unlock(tp->ioq.lock);
410 						continue;
411 					}
412 					PR_REMOVE_AND_INIT_LINK(&jobp->links);
413 					tp->ioq.cnt--;
414 					jobp->on_ioq = PR_FALSE;
415 					PR_Unlock(tp->ioq.lock);
416 
417 					/* set error */
418                     if (PR_POLL_NVAL & revents)
419 						jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
420                     else if (PR_POLL_HUP & revents)
421 						jobp->iod->error = PR_CONNECT_RESET_ERROR;
422                     else
423 						jobp->iod->error = PR_IO_ERROR;
424 
425 					/*
426 					 * add to jobq
427 					 */
428 					add_to_jobq(tp, jobp);
429 				} else if (revents) {
430 					/*
431 					 * add to jobq
432 					 */
433 					PR_Lock(tp->ioq.lock);
434 					if (jobp->cancel_io) {
435 						CANCEL_IO_JOB(jobp);
436 						PR_Unlock(tp->ioq.lock);
437 						continue;
438 					}
439 					PR_REMOVE_AND_INIT_LINK(&jobp->links);
440 					tp->ioq.cnt--;
441 					jobp->on_ioq = PR_FALSE;
442 					PR_Unlock(tp->ioq.lock);
443 
444 					if (jobp->io_op == JOB_IO_CONNECT) {
445 						if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
446 							jobp->iod->error = 0;
447 						else
448 							jobp->iod->error = PR_GetError();
449 					} else
450 						jobp->iod->error = 0;
451 
452 					add_to_jobq(tp, jobp);
453 				}
454 			}
455 		}
456 		/*
457 		 * timeout processing
458 		 */
459 		now = PR_IntervalNow();
460 		PR_Lock(tp->ioq.lock);
461 		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
462 			nextqp = qp->next;
463 			jobp = JOB_LINKS_PTR(qp);
464 			if (jobp->cancel_io) {
465 				CANCEL_IO_JOB(jobp);
466 				continue;
467 			}
468 			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
469 				break;
470 			if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
471 								((PRInt32)(jobp->absolute - now) > 0))
472 				break;
473 			PR_REMOVE_AND_INIT_LINK(&jobp->links);
474 			tp->ioq.cnt--;
475 			jobp->on_ioq = PR_FALSE;
476 			jobp->iod->error = PR_IO_TIMEOUT_ERROR;
477 			add_to_jobq(tp, jobp);
478 		}
479 		PR_Unlock(tp->ioq.lock);
480 	}
481 }
482 
483 /*
484  * timer worker thread function
485  */
timer_wstart(void * arg)486 static void timer_wstart(void *arg)
487 {
488 PRThreadPool *tp = (PRThreadPool *) arg;
489 PRCList *qp;
490 PRIntervalTime timeout;
491 PRIntervalTime now;
492 
493 	/*
494 	 * call PR_WaitCondVar with minimum value of all timeouts
495 	 */
496 	while (!tp->shutdown) {
497 		PRJob *jobp;
498 
499 		PR_Lock(tp->timerq.lock);
500 		if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
501 			timeout = PR_INTERVAL_NO_TIMEOUT;
502 		} else {
503 			PRCList *qp;
504 
505 			qp = tp->timerq.list.next;
506 			jobp = JOB_LINKS_PTR(qp);
507 
508 			timeout = jobp->absolute - PR_IntervalNow();
509             if (timeout <= 0)
510 				timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
511 		}
512 		if (PR_INTERVAL_NO_WAIT != timeout)
513 			PR_WaitCondVar(tp->timerq.cv, timeout);
514 		if (tp->shutdown) {
515 			PR_Unlock(tp->timerq.lock);
516 			break;
517 		}
518 		/*
519 		 * move expired-timer jobs to jobq
520 		 */
521 		now = PR_IntervalNow();
522 		while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
523 			qp = tp->timerq.list.next;
524 			jobp = JOB_LINKS_PTR(qp);
525 
526 			if ((PRInt32)(jobp->absolute - now) > 0) {
527 				break;
528 			}
529 			/*
530 			 * job timed out
531 			 */
532 			PR_REMOVE_AND_INIT_LINK(&jobp->links);
533 			tp->timerq.cnt--;
534 			jobp->on_timerq = PR_FALSE;
535 			add_to_jobq(tp, jobp);
536 		}
537 		PR_Unlock(tp->timerq.lock);
538 	}
539 }
540 
541 static void
delete_threadpool(PRThreadPool * tp)542 delete_threadpool(PRThreadPool *tp)
543 {
544 	if (NULL != tp) {
545 		if (NULL != tp->shutdown_cv)
546 			PR_DestroyCondVar(tp->shutdown_cv);
547 		if (NULL != tp->jobq.cv)
548 			PR_DestroyCondVar(tp->jobq.cv);
549 		if (NULL != tp->jobq.lock)
550 			PR_DestroyLock(tp->jobq.lock);
551 		if (NULL != tp->join_lock)
552 			PR_DestroyLock(tp->join_lock);
553 #ifdef OPT_WINNT
554 		if (NULL != tp->jobq.nt_completion_port)
555 			CloseHandle(tp->jobq.nt_completion_port);
556 #endif
557 		/* Timer queue */
558 		if (NULL != tp->timerq.cv)
559 			PR_DestroyCondVar(tp->timerq.cv);
560 		if (NULL != tp->timerq.lock)
561 			PR_DestroyLock(tp->timerq.lock);
562 
563 		if (NULL != tp->ioq.lock)
564 			PR_DestroyLock(tp->ioq.lock);
565 		if (NULL != tp->ioq.pollfds)
566 			PR_Free(tp->ioq.pollfds);
567 		if (NULL != tp->ioq.notify_fd)
568 			PR_DestroyPollableEvent(tp->ioq.notify_fd);
569 		PR_Free(tp);
570 	}
571 	return;
572 }
573 
574 static PRThreadPool *
alloc_threadpool(void)575 alloc_threadpool(void)
576 {
577 PRThreadPool *tp;
578 
579 	tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
580 	if (NULL == tp)
581 		goto failed;
582 	tp->jobq.lock = PR_NewLock();
583 	if (NULL == tp->jobq.lock)
584 		goto failed;
585 	tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
586 	if (NULL == tp->jobq.cv)
587 		goto failed;
588 	tp->join_lock = PR_NewLock();
589 	if (NULL == tp->join_lock)
590 		goto failed;
591 #ifdef OPT_WINNT
592 	tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
593 									NULL, 0, 0);
594 	if (NULL == tp->jobq.nt_completion_port)
595 		goto failed;
596 #endif
597 
598 	tp->ioq.lock = PR_NewLock();
599 	if (NULL == tp->ioq.lock)
600 		goto failed;
601 
602 	/* Timer queue */
603 
604 	tp->timerq.lock = PR_NewLock();
605 	if (NULL == tp->timerq.lock)
606 		goto failed;
607 	tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
608 	if (NULL == tp->timerq.cv)
609 		goto failed;
610 
611 	tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
612 	if (NULL == tp->shutdown_cv)
613 		goto failed;
614 	tp->ioq.notify_fd = PR_NewPollableEvent();
615 	if (NULL == tp->ioq.notify_fd)
616 		goto failed;
617 	return tp;
618 failed:
619 	delete_threadpool(tp);
620 	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
621 	return NULL;
622 }
623 
624 /* Create thread pool */
625 PR_IMPLEMENT(PRThreadPool *)
PR_CreateThreadPool(PRInt32 initial_threads,PRInt32 max_threads,PRUint32 stacksize)626 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
627                                 PRUint32 stacksize)
628 {
629 PRThreadPool *tp;
630 PRThread *thr;
631 int i;
632 wthread *wthrp;
633 
634 	tp = alloc_threadpool();
635 	if (NULL == tp)
636 		return NULL;
637 
638 	tp->init_threads = initial_threads;
639 	tp->max_threads = max_threads;
640 	tp->stacksize = stacksize;
641 	PR_INIT_CLIST(&tp->jobq.list);
642 	PR_INIT_CLIST(&tp->ioq.list);
643 	PR_INIT_CLIST(&tp->timerq.list);
644 	PR_INIT_CLIST(&tp->jobq.wthreads);
645 	PR_INIT_CLIST(&tp->ioq.wthreads);
646 	PR_INIT_CLIST(&tp->timerq.wthreads);
647 	tp->shutdown = PR_FALSE;
648 
649 	PR_Lock(tp->jobq.lock);
650 	for(i=0; i < initial_threads; ++i) {
651 
652 		thr = PR_CreateThread(PR_USER_THREAD, wstart,
653 						tp, PR_PRIORITY_NORMAL,
654 						PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
655 		PR_ASSERT(thr);
656 		wthrp = PR_NEWZAP(wthread);
657 		PR_ASSERT(wthrp);
658 		wthrp->thread = thr;
659 		PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
660 	}
661 	tp->current_threads = initial_threads;
662 
663 	thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
664 					tp, PR_PRIORITY_NORMAL,
665 					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
666 	PR_ASSERT(thr);
667 	wthrp = PR_NEWZAP(wthread);
668 	PR_ASSERT(wthrp);
669 	wthrp->thread = thr;
670 	PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
671 
672 	thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
673 					tp, PR_PRIORITY_NORMAL,
674 					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
675 	PR_ASSERT(thr);
676 	wthrp = PR_NEWZAP(wthread);
677 	PR_ASSERT(wthrp);
678 	wthrp->thread = thr;
679 	PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
680 
681 	PR_Unlock(tp->jobq.lock);
682 	return tp;
683 }
684 
685 static void
delete_job(PRJob * jobp)686 delete_job(PRJob *jobp)
687 {
688 	if (NULL != jobp) {
689 		if (NULL != jobp->join_cv) {
690 			PR_DestroyCondVar(jobp->join_cv);
691 			jobp->join_cv = NULL;
692 		}
693 		if (NULL != jobp->cancel_cv) {
694 			PR_DestroyCondVar(jobp->cancel_cv);
695 			jobp->cancel_cv = NULL;
696 		}
697 		PR_DELETE(jobp);
698 	}
699 }
700 
701 static PRJob *
alloc_job(PRBool joinable,PRThreadPool * tp)702 alloc_job(PRBool joinable, PRThreadPool *tp)
703 {
704 	PRJob *jobp;
705 
706 	jobp = PR_NEWZAP(PRJob);
707 	if (NULL == jobp)
708 		goto failed;
709 	if (joinable) {
710 		jobp->join_cv = PR_NewCondVar(tp->join_lock);
711 		jobp->join_wait = PR_TRUE;
712 		if (NULL == jobp->join_cv)
713 			goto failed;
714 	} else {
715 		jobp->join_cv = NULL;
716 	}
717 #ifdef OPT_WINNT
718 	jobp->nt_notifier.jobp = jobp;
719 #endif
720 	return jobp;
721 failed:
722 	delete_job(jobp);
723 	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
724 	return NULL;
725 }
726 
727 /* queue a job */
728 PR_IMPLEMENT(PRJob *)
PR_QueueJob(PRThreadPool * tpool,PRJobFn fn,void * arg,PRBool joinable)729 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
730 {
731 	PRJob *jobp;
732 
733 	jobp = alloc_job(joinable, tpool);
734 	if (NULL == jobp)
735 		return NULL;
736 
737 	jobp->job_func = fn;
738 	jobp->job_arg = arg;
739 	jobp->tpool = tpool;
740 
741 	add_to_jobq(tpool, jobp);
742 	return jobp;
743 }
744 
745 /* queue a job, when a socket is readable or writeable */
746 static PRJob *
queue_io_job(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable,io_op_type op)747 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
748 				PRBool joinable, io_op_type op)
749 {
750 	PRJob *jobp;
751 	PRIntervalTime now;
752 
753 	jobp = alloc_job(joinable, tpool);
754 	if (NULL == jobp) {
755 		return NULL;
756 	}
757 
758 	/*
759 	 * Add a new job to io_jobq
760 	 * wakeup io worker thread
761 	 */
762 
763 	jobp->job_func = fn;
764 	jobp->job_arg = arg;
765 	jobp->tpool = tpool;
766 	jobp->iod = iod;
767 	if (JOB_IO_READ == op) {
768 		jobp->io_op = JOB_IO_READ;
769 		jobp->io_poll_flags = PR_POLL_READ;
770 	} else if (JOB_IO_WRITE == op) {
771 		jobp->io_op = JOB_IO_WRITE;
772 		jobp->io_poll_flags = PR_POLL_WRITE;
773 	} else if (JOB_IO_ACCEPT == op) {
774 		jobp->io_op = JOB_IO_ACCEPT;
775 		jobp->io_poll_flags = PR_POLL_READ;
776 	} else if (JOB_IO_CONNECT == op) {
777 		jobp->io_op = JOB_IO_CONNECT;
778 		jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
779 	} else {
780 		delete_job(jobp);
781 		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
782 		return NULL;
783 	}
784 
785 	jobp->timeout = iod->timeout;
786 	if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
787 			(PR_INTERVAL_NO_WAIT == iod->timeout)) {
788 		jobp->absolute = iod->timeout;
789 	} else {
790 		now = PR_IntervalNow();
791 		jobp->absolute = now + iod->timeout;
792 	}
793 
794 
795 	PR_Lock(tpool->ioq.lock);
796 
797 	if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
798 			(PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
799 		PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
800 	} else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
801 		PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
802 	} else {
803 		PRCList *qp;
804 		PRJob *tmp_jobp;
805 		/*
806 		 * insert into the timeout-sorted ioq
807 		 */
808 		for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
809 							qp = qp->prev) {
810 			tmp_jobp = JOB_LINKS_PTR(qp);
811 			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
812 				break;
813 			}
814 		}
815 		PR_INSERT_AFTER(&jobp->links,qp);
816 	}
817 
818 	jobp->on_ioq = PR_TRUE;
819 	tpool->ioq.cnt++;
820 	/*
821 	 * notify io worker thread(s)
822 	 */
823 	PR_Unlock(tpool->ioq.lock);
824 	notify_ioq(tpool);
825 	return jobp;
826 }
827 
828 /* queue a job, when a socket is readable */
829 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Read(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)830 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
831 											PRBool joinable)
832 {
833 	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
834 }
835 
836 /* queue a job, when a socket is writeable */
837 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Write(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)838 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
839 										PRBool joinable)
840 {
841 	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
842 }
843 
844 
845 /* queue a job, when a socket has a pending connection */
846 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Accept(PRThreadPool * tpool,PRJobIoDesc * iod,PRJobFn fn,void * arg,PRBool joinable)847 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
848 								void * arg, PRBool joinable)
849 {
850 	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
851 }
852 
853 /* queue a job, when a socket can be connected */
854 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Connect(PRThreadPool * tpool,PRJobIoDesc * iod,const PRNetAddr * addr,PRJobFn fn,void * arg,PRBool joinable)855 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
856 			const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
857 {
858 	PRStatus rv;
859 	PRErrorCode err;
860 
861 	rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
862 	if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
863 		/* connection pending */
864 		return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
865 	}
866     /*
867      * connection succeeded or failed; add to jobq right away
868      */
869     if (rv == PR_FAILURE)
870       iod->error = err;
871     else
872       iod->error = 0;
873     return(PR_QueueJob(tpool, fn, arg, joinable));
874 
875 }
876 
877 /* queue a job, when a timer expires */
878 PR_IMPLEMENT(PRJob *)
PR_QueueJob_Timer(PRThreadPool * tpool,PRIntervalTime timeout,PRJobFn fn,void * arg,PRBool joinable)879 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
880 							PRJobFn fn, void * arg, PRBool joinable)
881 {
882 	PRIntervalTime now;
883 	PRJob *jobp;
884 
885 	if (PR_INTERVAL_NO_TIMEOUT == timeout) {
886 		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
887 		return NULL;
888 	}
889 	if (PR_INTERVAL_NO_WAIT == timeout) {
890 		/*
891 		 * no waiting; add to jobq right away
892 		 */
893 		return(PR_QueueJob(tpool, fn, arg, joinable));
894 	}
895 	jobp = alloc_job(joinable, tpool);
896 	if (NULL == jobp) {
897 		return NULL;
898 	}
899 
900 	/*
901 	 * Add a new job to timer_jobq
902 	 * wakeup timer worker thread
903 	 */
904 
905 	jobp->job_func = fn;
906 	jobp->job_arg = arg;
907 	jobp->tpool = tpool;
908 	jobp->timeout = timeout;
909 
910 	now = PR_IntervalNow();
911 	jobp->absolute = now + timeout;
912 
913 
914 	PR_Lock(tpool->timerq.lock);
915 	jobp->on_timerq = PR_TRUE;
916 	if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
917 		PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
918 	else {
919 		PRCList *qp;
920 		PRJob *tmp_jobp;
921 		/*
922 		 * insert into the sorted timer jobq
923 		 */
924 		for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
925 							qp = qp->prev) {
926 			tmp_jobp = JOB_LINKS_PTR(qp);
927 			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
928 				break;
929 			}
930 		}
931 		PR_INSERT_AFTER(&jobp->links,qp);
932 	}
933 	tpool->timerq.cnt++;
934 	/*
935 	 * notify timer worker thread(s)
936 	 */
937 	notify_timerq(tpool);
938 	PR_Unlock(tpool->timerq.lock);
939 	return jobp;
940 }
941 
942 static void
notify_timerq(PRThreadPool * tp)943 notify_timerq(PRThreadPool *tp)
944 {
945 	/*
946 	 * wakeup the timer thread(s)
947 	 */
948 	PR_NotifyCondVar(tp->timerq.cv);
949 }
950 
951 static void
notify_ioq(PRThreadPool * tp)952 notify_ioq(PRThreadPool *tp)
953 {
954 PRStatus rval_status;
955 
956 	/*
957 	 * wakeup the io thread(s)
958 	 */
959 	rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
960 	PR_ASSERT(PR_SUCCESS == rval_status);
961 }
962 
963 /*
964  * cancel a job
965  *
966  *	XXXX: is this needed? likely to be removed
967  */
968 PR_IMPLEMENT(PRStatus)
PR_CancelJob(PRJob * jobp)969 PR_CancelJob(PRJob *jobp) {
970 
971 	PRStatus rval = PR_FAILURE;
972 	PRThreadPool *tp;
973 
974 	if (jobp->on_timerq) {
975 		/*
976 		 * now, check again while holding the timerq lock
977 		 */
978 		tp = jobp->tpool;
979 		PR_Lock(tp->timerq.lock);
980 		if (jobp->on_timerq) {
981 			jobp->on_timerq = PR_FALSE;
982 			PR_REMOVE_AND_INIT_LINK(&jobp->links);
983 			tp->timerq.cnt--;
984 			PR_Unlock(tp->timerq.lock);
985 			if (!JOINABLE_JOB(jobp)) {
986 				delete_job(jobp);
987 			} else {
988 				JOIN_NOTIFY(jobp);
989 			}
990 			rval = PR_SUCCESS;
991 		} else
992 			PR_Unlock(tp->timerq.lock);
993 	} else if (jobp->on_ioq) {
994 		/*
995 		 * now, check again while holding the ioq lock
996 		 */
997 		tp = jobp->tpool;
998 		PR_Lock(tp->ioq.lock);
999 		if (jobp->on_ioq) {
1000 			jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
1001 			if (NULL == jobp->cancel_cv) {
1002 				PR_Unlock(tp->ioq.lock);
1003 				PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
1004 				return PR_FAILURE;
1005 			}
1006 			/*
1007 			 * mark job 'cancelled' and notify io thread(s)
1008 			 * XXXX:
1009 			 *		this assumes there is only one io thread; when there
1010 			 * 		are multiple threads, the io thread processing this job
1011 			 * 		must be notified.
1012 			 */
1013 			jobp->cancel_io = PR_TRUE;
1014 			PR_Unlock(tp->ioq.lock);	/* release, reacquire ioq lock */
1015 			notify_ioq(tp);
1016 			PR_Lock(tp->ioq.lock);
1017 			while (jobp->cancel_io)
1018 				PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
1019 			PR_Unlock(tp->ioq.lock);
1020 			PR_ASSERT(!jobp->on_ioq);
1021 			if (!JOINABLE_JOB(jobp)) {
1022 				delete_job(jobp);
1023 			} else {
1024 				JOIN_NOTIFY(jobp);
1025 			}
1026 			rval = PR_SUCCESS;
1027 		} else
1028 			PR_Unlock(tp->ioq.lock);
1029 	}
1030 	if (PR_FAILURE == rval)
1031 		PR_SetError(PR_INVALID_STATE_ERROR, 0);
1032 	return rval;
1033 }
1034 
1035 /* join a job, wait until completion */
1036 PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob * jobp)1037 PR_JoinJob(PRJob *jobp)
1038 {
1039 	if (!JOINABLE_JOB(jobp)) {
1040 		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1041 		return PR_FAILURE;
1042 	}
1043 	PR_Lock(jobp->tpool->join_lock);
1044 	while(jobp->join_wait)
1045 		PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
1046 	PR_Unlock(jobp->tpool->join_lock);
1047 	delete_job(jobp);
1048 	return PR_SUCCESS;
1049 }
1050 
1051 /* shutdown threadpool */
1052 PR_IMPLEMENT(PRStatus)
PR_ShutdownThreadPool(PRThreadPool * tpool)1053 PR_ShutdownThreadPool(PRThreadPool *tpool)
1054 {
1055 PRStatus rval = PR_SUCCESS;
1056 
1057 	PR_Lock(tpool->jobq.lock);
1058 	tpool->shutdown = PR_TRUE;
1059 	PR_NotifyAllCondVar(tpool->shutdown_cv);
1060 	PR_Unlock(tpool->jobq.lock);
1061 
1062 	return rval;
1063 }
1064 
1065 /*
1066  * join thread pool
1067  * 	wait for termination of worker threads
1068  *	reclaim threadpool resources
1069  */
1070 PR_IMPLEMENT(PRStatus)
PR_JoinThreadPool(PRThreadPool * tpool)1071 PR_JoinThreadPool(PRThreadPool *tpool)
1072 {
1073 PRStatus rval = PR_SUCCESS;
1074 PRCList *head;
1075 PRStatus rval_status;
1076 
1077 	PR_Lock(tpool->jobq.lock);
1078 	while (!tpool->shutdown)
1079 		PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
1080 
1081 	/*
1082 	 * wakeup worker threads
1083 	 */
1084 #ifdef OPT_WINNT
1085 	/*
1086 	 * post shutdown notification for all threads
1087 	 */
1088 	{
1089 		int i;
1090 		for(i=0; i < tpool->current_threads; i++) {
1091 			PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
1092 												TRUE, NULL);
1093 		}
1094 	}
1095 #else
1096 	PR_NotifyAllCondVar(tpool->jobq.cv);
1097 #endif
1098 
1099 	/*
1100 	 * wakeup io thread(s)
1101 	 */
1102 	notify_ioq(tpool);
1103 
1104 	/*
1105 	 * wakeup timer thread(s)
1106 	 */
1107 	PR_Lock(tpool->timerq.lock);
1108 	notify_timerq(tpool);
1109 	PR_Unlock(tpool->timerq.lock);
1110 
1111 	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
1112 		wthread *wthrp;
1113 
1114 		head = PR_LIST_HEAD(&tpool->jobq.wthreads);
1115 		PR_REMOVE_AND_INIT_LINK(head);
1116 		PR_Unlock(tpool->jobq.lock);
1117 		wthrp = WTHREAD_LINKS_PTR(head);
1118 		rval_status = PR_JoinThread(wthrp->thread);
1119 		PR_ASSERT(PR_SUCCESS == rval_status);
1120 		PR_DELETE(wthrp);
1121 		PR_Lock(tpool->jobq.lock);
1122 	}
1123 	PR_Unlock(tpool->jobq.lock);
1124 	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
1125 		wthread *wthrp;
1126 
1127 		head = PR_LIST_HEAD(&tpool->ioq.wthreads);
1128 		PR_REMOVE_AND_INIT_LINK(head);
1129 		wthrp = WTHREAD_LINKS_PTR(head);
1130 		rval_status = PR_JoinThread(wthrp->thread);
1131 		PR_ASSERT(PR_SUCCESS == rval_status);
1132 		PR_DELETE(wthrp);
1133 	}
1134 
1135 	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
1136 		wthread *wthrp;
1137 
1138 		head = PR_LIST_HEAD(&tpool->timerq.wthreads);
1139 		PR_REMOVE_AND_INIT_LINK(head);
1140 		wthrp = WTHREAD_LINKS_PTR(head);
1141 		rval_status = PR_JoinThread(wthrp->thread);
1142 		PR_ASSERT(PR_SUCCESS == rval_status);
1143 		PR_DELETE(wthrp);
1144 	}
1145 
1146 	/*
1147 	 * Delete queued jobs
1148 	 */
1149 	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
1150 		PRJob *jobp;
1151 
1152 		head = PR_LIST_HEAD(&tpool->jobq.list);
1153 		PR_REMOVE_AND_INIT_LINK(head);
1154 		jobp = JOB_LINKS_PTR(head);
1155 		tpool->jobq.cnt--;
1156 		delete_job(jobp);
1157 	}
1158 
1159 	/* delete io jobs */
1160 	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
1161 		PRJob *jobp;
1162 
1163 		head = PR_LIST_HEAD(&tpool->ioq.list);
1164 		PR_REMOVE_AND_INIT_LINK(head);
1165 		tpool->ioq.cnt--;
1166 		jobp = JOB_LINKS_PTR(head);
1167 		delete_job(jobp);
1168 	}
1169 
1170 	/* delete timer jobs */
1171 	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
1172 		PRJob *jobp;
1173 
1174 		head = PR_LIST_HEAD(&tpool->timerq.list);
1175 		PR_REMOVE_AND_INIT_LINK(head);
1176 		tpool->timerq.cnt--;
1177 		jobp = JOB_LINKS_PTR(head);
1178 		delete_job(jobp);
1179 	}
1180 
1181 	PR_ASSERT(0 == tpool->jobq.cnt);
1182 	PR_ASSERT(0 == tpool->ioq.cnt);
1183 	PR_ASSERT(0 == tpool->timerq.cnt);
1184 
1185 	delete_threadpool(tpool);
1186 	return rval;
1187 }
1188