/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "nspr.h" /* * Thread pools * Thread pools create and manage threads to provide support for * scheduling jobs onto one or more threads. * */ #ifdef OPT_WINNT #include #endif /* * worker thread */ typedef struct wthread { PRCList links; PRThread *thread; } wthread; /* * queue of timer jobs */ typedef struct timer_jobq { PRCList list; PRLock *lock; PRCondVar *cv; PRInt32 cnt; PRCList wthreads; } timer_jobq; /* * queue of jobs */ typedef struct tp_jobq { PRCList list; PRInt32 cnt; PRLock *lock; PRCondVar *cv; PRCList wthreads; #ifdef OPT_WINNT HANDLE nt_completion_port; #endif } tp_jobq; /* * queue of IO jobs */ typedef struct io_jobq { PRCList list; PRPollDesc *pollfds; PRInt32 npollfds; PRJob **polljobs; PRLock *lock; PRInt32 cnt; PRFileDesc *notify_fd; PRCList wthreads; } io_jobq; /* * Threadpool */ struct PRThreadPool { PRInt32 init_threads; PRInt32 max_threads; PRInt32 current_threads; PRInt32 idle_threads; PRUint32 stacksize; tp_jobq jobq; io_jobq ioq; timer_jobq timerq; PRLock *join_lock; /* used with jobp->join_cv */ PRCondVar *shutdown_cv; PRBool shutdown; }; typedef enum io_op_type { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; #ifdef OPT_WINNT typedef struct NT_notifier { OVERLAPPED overlapped; /* must be first */ PRJob *jobp; } NT_notifier; #endif struct PRJob { PRCList links; /* for linking jobs */ PRBool on_ioq; /* job on ioq */ PRBool on_timerq; /* job on timerq */ PRJobFn job_func; void *job_arg; PRCondVar *join_cv; PRBool join_wait; /* == PR_TRUE, when waiting to join */ PRCondVar *cancel_cv; /* for cancelling IO jobs */ PRBool cancel_io; /* for cancelling IO jobs */ PRThreadPool *tpool; /* back pointer to thread pool */ PRJobIoDesc *iod; io_op_type io_op; PRInt16 io_poll_flags; PRNetAddr *netaddr; PRIntervalTime timeout; /* relative value */ PRIntervalTime absolute; #ifdef OPT_WINNT NT_notifier nt_notifier; #endif }; #define JOB_LINKS_PTR(_qp) \ ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) #define WTHREAD_LINKS_PTR(_qp) \ ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) #define JOIN_NOTIFY(_jobp) \ PR_BEGIN_MACRO \ PR_Lock(_jobp->tpool->join_lock); \ _jobp->join_wait = PR_FALSE; \ PR_NotifyCondVar(_jobp->join_cv); \ PR_Unlock(_jobp->tpool->join_lock); \ PR_END_MACRO #define CANCEL_IO_JOB(jobp) \ PR_BEGIN_MACRO \ jobp->cancel_io = PR_FALSE; \ jobp->on_ioq = PR_FALSE; \ PR_REMOVE_AND_INIT_LINK(&jobp->links); \ tp->ioq.cnt--; \ PR_NotifyCondVar(jobp->cancel_cv); \ PR_END_MACRO static void delete_job(PRJob *jobp); static PRThreadPool * alloc_threadpool(void); static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); static void notify_ioq(PRThreadPool *tp); static void notify_timerq(PRThreadPool *tp); /* * locks are acquired in the following order * * tp->ioq.lock,tp->timerq.lock * | * V * tp->jobq->lock */ /* * worker thread function */ static void wstart(void *arg) { PRThreadPool *tp = (PRThreadPool *) arg; PRCList *head; /* * execute jobs until shutdown */ while (!tp->shutdown) { PRJob *jobp; #ifdef OPT_WINNT BOOL rv; DWORD unused, shutdown; LPOVERLAPPED olp; PR_Lock(tp->jobq.lock); tp->idle_threads++; PR_Unlock(tp->jobq.lock); rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, &unused, &shutdown, &olp, INFINITE); PR_ASSERT(rv); if (shutdown) { break; } jobp = ((NT_notifier *) olp)->jobp; PR_Lock(tp->jobq.lock); tp->idle_threads--; tp->jobq.cnt--; PR_Unlock(tp->jobq.lock); #else PR_Lock(tp->jobq.lock); while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { tp->idle_threads++; PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); tp->idle_threads--; } if (tp->shutdown) { PR_Unlock(tp->jobq.lock); break; } head = PR_LIST_HEAD(&tp->jobq.list); /* * remove job from queue */ PR_REMOVE_AND_INIT_LINK(head); tp->jobq.cnt--; jobp = JOB_LINKS_PTR(head); PR_Unlock(tp->jobq.lock); #endif jobp->job_func(jobp->job_arg); if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { JOIN_NOTIFY(jobp); } } PR_Lock(tp->jobq.lock); tp->current_threads--; PR_Unlock(tp->jobq.lock); } /* * add a job to the work queue */ static void add_to_jobq(PRThreadPool *tp, PRJob *jobp) { /* * add to jobq */ #ifdef OPT_WINNT PR_Lock(tp->jobq.lock); tp->jobq.cnt++; PR_Unlock(tp->jobq.lock); /* * notify worker thread(s) */ PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, FALSE, &jobp->nt_notifier.overlapped); #else PR_Lock(tp->jobq.lock); PR_APPEND_LINK(&jobp->links,&tp->jobq.list); tp->jobq.cnt++; if ((tp->idle_threads < tp->jobq.cnt) && (tp->current_threads < tp->max_threads)) { wthread *wthrp; /* * increment thread count and unlock the jobq lock */ tp->current_threads++; PR_Unlock(tp->jobq.lock); /* create new worker thread */ wthrp = PR_NEWZAP(wthread); if (wthrp) { wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); if (NULL == wthrp->thread) { PR_DELETE(wthrp); /* this sets wthrp to NULL */ } } PR_Lock(tp->jobq.lock); if (NULL == wthrp) { tp->current_threads--; } else { PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); } } /* * wakeup a worker thread */ PR_NotifyCondVar(tp->jobq.cv); PR_Unlock(tp->jobq.lock); #endif } /* * io worker thread function */ static void io_wstart(void *arg) { PRThreadPool *tp = (PRThreadPool *) arg; int pollfd_cnt, pollfds_used; int rv; PRCList *qp, *nextqp; PRPollDesc *pollfds = NULL; PRJob **polljobs = NULL; int poll_timeout; PRIntervalTime now; /* * scan io_jobq * construct poll list * call PR_Poll * for all fds, for which poll returns true, move the job to * jobq and wakeup worker thread. */ while (!tp->shutdown) { PRJob *jobp; pollfd_cnt = tp->ioq.cnt + 10; if (pollfd_cnt > tp->ioq.npollfds) { /* * re-allocate pollfd array if the current one is not large * enough */ if (NULL != tp->ioq.pollfds) { PR_Free(tp->ioq.pollfds); } tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * (sizeof(PRPollDesc) + sizeof(PRJob *))); PR_ASSERT(NULL != tp->ioq.pollfds); /* * array of pollfds */ pollfds = tp->ioq.pollfds; tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); /* * parallel array of jobs */ polljobs = tp->ioq.polljobs; tp->ioq.npollfds = pollfd_cnt; } pollfds_used = 0; /* * add the notify fd; used for unblocking io thread(s) */ pollfds[pollfds_used].fd = tp->ioq.notify_fd; pollfds[pollfds_used].in_flags = PR_POLL_READ; pollfds[pollfds_used].out_flags = 0; polljobs[pollfds_used] = NULL; pollfds_used++; /* * fill in the pollfd array */ PR_Lock(tp->ioq.lock); for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { nextqp = qp->next; jobp = JOB_LINKS_PTR(qp); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); continue; } if (pollfds_used == (pollfd_cnt)) { break; } pollfds[pollfds_used].fd = jobp->iod->socket; pollfds[pollfds_used].in_flags = jobp->io_poll_flags; pollfds[pollfds_used].out_flags = 0; polljobs[pollfds_used] = jobp; pollfds_used++; } if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { qp = tp->ioq.list.next; jobp = JOB_LINKS_PTR(qp); if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) { poll_timeout = PR_INTERVAL_NO_TIMEOUT; } else if (PR_INTERVAL_NO_WAIT == jobp->timeout) { poll_timeout = PR_INTERVAL_NO_WAIT; } else { poll_timeout = jobp->absolute - PR_IntervalNow(); if (poll_timeout <= 0) { /* already timed out */ poll_timeout = PR_INTERVAL_NO_WAIT; } } } else { poll_timeout = PR_INTERVAL_NO_TIMEOUT; } PR_Unlock(tp->ioq.lock); /* * XXXX * should retry if more jobs have been added to the queue? * */ PR_ASSERT(pollfds_used <= pollfd_cnt); rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); if (tp->shutdown) { break; } if (rv > 0) { /* * at least one io event is set */ PRStatus rval_status; PRInt32 index; PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); /* * reset the pollable event, if notified */ if (pollfds[0].out_flags & PR_POLL_READ) { rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); PR_ASSERT(PR_SUCCESS == rval_status); } for(index = 1; index < (pollfds_used); index++) { PRInt16 events = pollfds[index].in_flags; PRInt16 revents = pollfds[index].out_flags; jobp = polljobs[index]; if ((revents & PR_POLL_NVAL) || /* busted in all cases */ (revents & PR_POLL_ERR) || ((events & PR_POLL_WRITE) && (revents & PR_POLL_HUP))) { /* write op & hup */ PR_Lock(tp->ioq.lock); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); PR_Unlock(tp->ioq.lock); continue; } PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; jobp->on_ioq = PR_FALSE; PR_Unlock(tp->ioq.lock); /* set error */ if (PR_POLL_NVAL & revents) { jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; } else if (PR_POLL_HUP & revents) { jobp->iod->error = PR_CONNECT_RESET_ERROR; } else { jobp->iod->error = PR_IO_ERROR; } /* * add to jobq */ add_to_jobq(tp, jobp); } else if (revents) { /* * add to jobq */ PR_Lock(tp->ioq.lock); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); PR_Unlock(tp->ioq.lock); continue; } PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; jobp->on_ioq = PR_FALSE; PR_Unlock(tp->ioq.lock); if (jobp->io_op == JOB_IO_CONNECT) { if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) { jobp->iod->error = 0; } else { jobp->iod->error = PR_GetError(); } } else { jobp->iod->error = 0; } add_to_jobq(tp, jobp); } } } /* * timeout processing */ now = PR_IntervalNow(); PR_Lock(tp->ioq.lock); for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { nextqp = qp->next; jobp = JOB_LINKS_PTR(qp); if (jobp->cancel_io) { CANCEL_IO_JOB(jobp); continue; } if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) { break; } if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && ((PRInt32)(jobp->absolute - now) > 0)) { break; } PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; jobp->on_ioq = PR_FALSE; jobp->iod->error = PR_IO_TIMEOUT_ERROR; add_to_jobq(tp, jobp); } PR_Unlock(tp->ioq.lock); } } /* * timer worker thread function */ static void timer_wstart(void *arg) { PRThreadPool *tp = (PRThreadPool *) arg; PRCList *qp; PRIntervalTime timeout; PRIntervalTime now; /* * call PR_WaitCondVar with minimum value of all timeouts */ while (!tp->shutdown) { PRJob *jobp; PR_Lock(tp->timerq.lock); if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { timeout = PR_INTERVAL_NO_TIMEOUT; } else { PRCList *qp; qp = tp->timerq.list.next; jobp = JOB_LINKS_PTR(qp); timeout = jobp->absolute - PR_IntervalNow(); if (timeout <= 0) { timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ } } if (PR_INTERVAL_NO_WAIT != timeout) { PR_WaitCondVar(tp->timerq.cv, timeout); } if (tp->shutdown) { PR_Unlock(tp->timerq.lock); break; } /* * move expired-timer jobs to jobq */ now = PR_IntervalNow(); while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { qp = tp->timerq.list.next; jobp = JOB_LINKS_PTR(qp); if ((PRInt32)(jobp->absolute - now) > 0) { break; } /* * job timed out */ PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->timerq.cnt--; jobp->on_timerq = PR_FALSE; add_to_jobq(tp, jobp); } PR_Unlock(tp->timerq.lock); } } static void delete_threadpool(PRThreadPool *tp) { if (NULL != tp) { if (NULL != tp->shutdown_cv) { PR_DestroyCondVar(tp->shutdown_cv); } if (NULL != tp->jobq.cv) { PR_DestroyCondVar(tp->jobq.cv); } if (NULL != tp->jobq.lock) { PR_DestroyLock(tp->jobq.lock); } if (NULL != tp->join_lock) { PR_DestroyLock(tp->join_lock); } #ifdef OPT_WINNT if (NULL != tp->jobq.nt_completion_port) { CloseHandle(tp->jobq.nt_completion_port); } #endif /* Timer queue */ if (NULL != tp->timerq.cv) { PR_DestroyCondVar(tp->timerq.cv); } if (NULL != tp->timerq.lock) { PR_DestroyLock(tp->timerq.lock); } if (NULL != tp->ioq.lock) { PR_DestroyLock(tp->ioq.lock); } if (NULL != tp->ioq.pollfds) { PR_Free(tp->ioq.pollfds); } if (NULL != tp->ioq.notify_fd) { PR_DestroyPollableEvent(tp->ioq.notify_fd); } PR_Free(tp); } return; } static PRThreadPool * alloc_threadpool(void) { PRThreadPool *tp; tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); if (NULL == tp) { goto failed; } tp->jobq.lock = PR_NewLock(); if (NULL == tp->jobq.lock) { goto failed; } tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); if (NULL == tp->jobq.cv) { goto failed; } tp->join_lock = PR_NewLock(); if (NULL == tp->join_lock) { goto failed; } #ifdef OPT_WINNT tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (NULL == tp->jobq.nt_completion_port) { goto failed; } #endif tp->ioq.lock = PR_NewLock(); if (NULL == tp->ioq.lock) { goto failed; } /* Timer queue */ tp->timerq.lock = PR_NewLock(); if (NULL == tp->timerq.lock) { goto failed; } tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); if (NULL == tp->timerq.cv) { goto failed; } tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); if (NULL == tp->shutdown_cv) { goto failed; } tp->ioq.notify_fd = PR_NewPollableEvent(); if (NULL == tp->ioq.notify_fd) { goto failed; } return tp; failed: delete_threadpool(tp); PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); return NULL; } /* Create thread pool */ PR_IMPLEMENT(PRThreadPool *) PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, PRUint32 stacksize) { PRThreadPool *tp; PRThread *thr; int i; wthread *wthrp; tp = alloc_threadpool(); if (NULL == tp) { return NULL; } tp->init_threads = initial_threads; tp->max_threads = max_threads; tp->stacksize = stacksize; PR_INIT_CLIST(&tp->jobq.list); PR_INIT_CLIST(&tp->ioq.list); PR_INIT_CLIST(&tp->timerq.list); PR_INIT_CLIST(&tp->jobq.wthreads); PR_INIT_CLIST(&tp->ioq.wthreads); PR_INIT_CLIST(&tp->timerq.wthreads); tp->shutdown = PR_FALSE; PR_Lock(tp->jobq.lock); for(i=0; i < initial_threads; ++i) { thr = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); PR_ASSERT(thr); wthrp = PR_NEWZAP(wthread); PR_ASSERT(wthrp); wthrp->thread = thr; PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); } tp->current_threads = initial_threads; thr = PR_CreateThread(PR_USER_THREAD, io_wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); PR_ASSERT(thr); wthrp = PR_NEWZAP(wthread); PR_ASSERT(wthrp); wthrp->thread = thr; PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, tp, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); PR_ASSERT(thr); wthrp = PR_NEWZAP(wthread); PR_ASSERT(wthrp); wthrp->thread = thr; PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); PR_Unlock(tp->jobq.lock); return tp; } static void delete_job(PRJob *jobp) { if (NULL != jobp) { if (NULL != jobp->join_cv) { PR_DestroyCondVar(jobp->join_cv); jobp->join_cv = NULL; } if (NULL != jobp->cancel_cv) { PR_DestroyCondVar(jobp->cancel_cv); jobp->cancel_cv = NULL; } PR_DELETE(jobp); } } static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp) { PRJob *jobp; jobp = PR_NEWZAP(PRJob); if (NULL == jobp) { goto failed; } if (joinable) { jobp->join_cv = PR_NewCondVar(tp->join_lock); jobp->join_wait = PR_TRUE; if (NULL == jobp->join_cv) { goto failed; } } else { jobp->join_cv = NULL; } #ifdef OPT_WINNT jobp->nt_notifier.jobp = jobp; #endif return jobp; failed: delete_job(jobp); PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); return NULL; } /* queue a job */ PR_IMPLEMENT(PRJob *) PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) { PRJob *jobp; jobp = alloc_job(joinable, tpool); if (NULL == jobp) { return NULL; } jobp->job_func = fn; jobp->job_arg = arg; jobp->tpool = tpool; add_to_jobq(tpool, jobp); return jobp; } /* queue a job, when a socket is readable or writeable */ static PRJob * queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PRBool joinable, io_op_type op) { PRJob *jobp; PRIntervalTime now; jobp = alloc_job(joinable, tpool); if (NULL == jobp) { return NULL; } /* * Add a new job to io_jobq * wakeup io worker thread */ jobp->job_func = fn; jobp->job_arg = arg; jobp->tpool = tpool; jobp->iod = iod; if (JOB_IO_READ == op) { jobp->io_op = JOB_IO_READ; jobp->io_poll_flags = PR_POLL_READ; } else if (JOB_IO_WRITE == op) { jobp->io_op = JOB_IO_WRITE; jobp->io_poll_flags = PR_POLL_WRITE; } else if (JOB_IO_ACCEPT == op) { jobp->io_op = JOB_IO_ACCEPT; jobp->io_poll_flags = PR_POLL_READ; } else if (JOB_IO_CONNECT == op) { jobp->io_op = JOB_IO_CONNECT; jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; } else { delete_job(jobp); PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return NULL; } jobp->timeout = iod->timeout; if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || (PR_INTERVAL_NO_WAIT == iod->timeout)) { jobp->absolute = iod->timeout; } else { now = PR_IntervalNow(); jobp->absolute = now + iod->timeout; } PR_Lock(tpool->ioq.lock); if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); } else { PRCList *qp; PRJob *tmp_jobp; /* * insert into the timeout-sorted ioq */ for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; qp = qp->prev) { tmp_jobp = JOB_LINKS_PTR(qp); if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { break; } } PR_INSERT_AFTER(&jobp->links,qp); } jobp->on_ioq = PR_TRUE; tpool->ioq.cnt++; /* * notify io worker thread(s) */ PR_Unlock(tpool->ioq.lock); notify_ioq(tpool); return jobp; } /* queue a job, when a socket is readable */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PRBool joinable) { return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); } /* queue a job, when a socket is writeable */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, PRBool joinable) { return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); } /* queue a job, when a socket has a pending connection */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PRBool joinable) { return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); } /* queue a job, when a socket can be connected */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) { PRStatus rv; PRErrorCode err; rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) { /* connection pending */ return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); } /* * connection succeeded or failed; add to jobq right away */ if (rv == PR_FAILURE) { iod->error = err; } else { iod->error = 0; } return(PR_QueueJob(tpool, fn, arg, joinable)); } /* queue a job, when a timer expires */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, PRJobFn fn, void * arg, PRBool joinable) { PRIntervalTime now; PRJob *jobp; if (PR_INTERVAL_NO_TIMEOUT == timeout) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return NULL; } if (PR_INTERVAL_NO_WAIT == timeout) { /* * no waiting; add to jobq right away */ return(PR_QueueJob(tpool, fn, arg, joinable)); } jobp = alloc_job(joinable, tpool); if (NULL == jobp) { return NULL; } /* * Add a new job to timer_jobq * wakeup timer worker thread */ jobp->job_func = fn; jobp->job_arg = arg; jobp->tpool = tpool; jobp->timeout = timeout; now = PR_IntervalNow(); jobp->absolute = now + timeout; PR_Lock(tpool->timerq.lock); jobp->on_timerq = PR_TRUE; if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); } else { PRCList *qp; PRJob *tmp_jobp; /* * insert into the sorted timer jobq */ for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; qp = qp->prev) { tmp_jobp = JOB_LINKS_PTR(qp); if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { break; } } PR_INSERT_AFTER(&jobp->links,qp); } tpool->timerq.cnt++; /* * notify timer worker thread(s) */ notify_timerq(tpool); PR_Unlock(tpool->timerq.lock); return jobp; } static void notify_timerq(PRThreadPool *tp) { /* * wakeup the timer thread(s) */ PR_NotifyCondVar(tp->timerq.cv); } static void notify_ioq(PRThreadPool *tp) { PRStatus rval_status; /* * wakeup the io thread(s) */ rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); PR_ASSERT(PR_SUCCESS == rval_status); } /* * cancel a job * * XXXX: is this needed? likely to be removed */ PR_IMPLEMENT(PRStatus) PR_CancelJob(PRJob *jobp) { PRStatus rval = PR_FAILURE; PRThreadPool *tp; if (jobp->on_timerq) { /* * now, check again while holding the timerq lock */ tp = jobp->tpool; PR_Lock(tp->timerq.lock); if (jobp->on_timerq) { jobp->on_timerq = PR_FALSE; PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->timerq.cnt--; PR_Unlock(tp->timerq.lock); if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { JOIN_NOTIFY(jobp); } rval = PR_SUCCESS; } else { PR_Unlock(tp->timerq.lock); } } else if (jobp->on_ioq) { /* * now, check again while holding the ioq lock */ tp = jobp->tpool; PR_Lock(tp->ioq.lock); if (jobp->on_ioq) { jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); if (NULL == jobp->cancel_cv) { PR_Unlock(tp->ioq.lock); PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); return PR_FAILURE; } /* * mark job 'cancelled' and notify io thread(s) * XXXX: * this assumes there is only one io thread; when there * are multiple threads, the io thread processing this job * must be notified. */ jobp->cancel_io = PR_TRUE; PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ notify_ioq(tp); PR_Lock(tp->ioq.lock); while (jobp->cancel_io) { PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); } PR_Unlock(tp->ioq.lock); PR_ASSERT(!jobp->on_ioq); if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { JOIN_NOTIFY(jobp); } rval = PR_SUCCESS; } else { PR_Unlock(tp->ioq.lock); } } if (PR_FAILURE == rval) { PR_SetError(PR_INVALID_STATE_ERROR, 0); } return rval; } /* join a job, wait until completion */ PR_IMPLEMENT(PRStatus) PR_JoinJob(PRJob *jobp) { if (!JOINABLE_JOB(jobp)) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return PR_FAILURE; } PR_Lock(jobp->tpool->join_lock); while(jobp->join_wait) { PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); } PR_Unlock(jobp->tpool->join_lock); delete_job(jobp); return PR_SUCCESS; } /* shutdown threadpool */ PR_IMPLEMENT(PRStatus) PR_ShutdownThreadPool(PRThreadPool *tpool) { PRStatus rval = PR_SUCCESS; PR_Lock(tpool->jobq.lock); tpool->shutdown = PR_TRUE; PR_NotifyAllCondVar(tpool->shutdown_cv); PR_Unlock(tpool->jobq.lock); return rval; } /* * join thread pool * wait for termination of worker threads * reclaim threadpool resources */ PR_IMPLEMENT(PRStatus) PR_JoinThreadPool(PRThreadPool *tpool) { PRStatus rval = PR_SUCCESS; PRCList *head; PRStatus rval_status; PR_Lock(tpool->jobq.lock); while (!tpool->shutdown) { PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); } /* * wakeup worker threads */ #ifdef OPT_WINNT /* * post shutdown notification for all threads */ { int i; for(i=0; i < tpool->current_threads; i++) { PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, TRUE, NULL); } } #else PR_NotifyAllCondVar(tpool->jobq.cv); #endif /* * wakeup io thread(s) */ notify_ioq(tpool); /* * wakeup timer thread(s) */ PR_Lock(tpool->timerq.lock); notify_timerq(tpool); PR_Unlock(tpool->timerq.lock); while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { wthread *wthrp; head = PR_LIST_HEAD(&tpool->jobq.wthreads); PR_REMOVE_AND_INIT_LINK(head); PR_Unlock(tpool->jobq.lock); wthrp = WTHREAD_LINKS_PTR(head); rval_status = PR_JoinThread(wthrp->thread); PR_ASSERT(PR_SUCCESS == rval_status); PR_DELETE(wthrp); PR_Lock(tpool->jobq.lock); } PR_Unlock(tpool->jobq.lock); while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { wthread *wthrp; head = PR_LIST_HEAD(&tpool->ioq.wthreads); PR_REMOVE_AND_INIT_LINK(head); wthrp = WTHREAD_LINKS_PTR(head); rval_status = PR_JoinThread(wthrp->thread); PR_ASSERT(PR_SUCCESS == rval_status); PR_DELETE(wthrp); } while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { wthread *wthrp; head = PR_LIST_HEAD(&tpool->timerq.wthreads); PR_REMOVE_AND_INIT_LINK(head); wthrp = WTHREAD_LINKS_PTR(head); rval_status = PR_JoinThread(wthrp->thread); PR_ASSERT(PR_SUCCESS == rval_status); PR_DELETE(wthrp); } /* * Delete queued jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { PRJob *jobp; head = PR_LIST_HEAD(&tpool->jobq.list); PR_REMOVE_AND_INIT_LINK(head); jobp = JOB_LINKS_PTR(head); tpool->jobq.cnt--; delete_job(jobp); } /* delete io jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { PRJob *jobp; head = PR_LIST_HEAD(&tpool->ioq.list); PR_REMOVE_AND_INIT_LINK(head); tpool->ioq.cnt--; jobp = JOB_LINKS_PTR(head); delete_job(jobp); } /* delete timer jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { PRJob *jobp; head = PR_LIST_HEAD(&tpool->timerq.list); PR_REMOVE_AND_INIT_LINK(head); tpool->timerq.cnt--; jobp = JOB_LINKS_PTR(head); delete_job(jobp); } PR_ASSERT(0 == tpool->jobq.cnt); PR_ASSERT(0 == tpool->ioq.cnt); PR_ASSERT(0 == tpool->timerq.cnt); delete_threadpool(tpool); return rval; }