diff options
Diffstat (limited to 'nsprpub/pr/src/misc/prtpool.c')
-rw-r--r-- | nsprpub/pr/src/misc/prtpool.c | 1187 |
1 files changed, 1187 insertions, 0 deletions
diff --git a/nsprpub/pr/src/misc/prtpool.c b/nsprpub/pr/src/misc/prtpool.c new file mode 100644 index 000000000..0671cc19b --- /dev/null +++ b/nsprpub/pr/src/misc/prtpool.c @@ -0,0 +1,1187 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "nspr.h" + +/* + * Thread pools + * Thread pools create and manage threads to provide support for + * scheduling jobs onto one or more threads. + * + */ +#ifdef OPT_WINNT +#include <windows.h> +#endif + +/* + * worker thread + */ +typedef struct wthread { + PRCList links; + PRThread *thread; +} wthread; + +/* + * queue of timer jobs + */ +typedef struct timer_jobq { + PRCList list; + PRLock *lock; + PRCondVar *cv; + PRInt32 cnt; + PRCList wthreads; +} timer_jobq; + +/* + * queue of jobs + */ +typedef struct tp_jobq { + PRCList list; + PRInt32 cnt; + PRLock *lock; + PRCondVar *cv; + PRCList wthreads; +#ifdef OPT_WINNT + HANDLE nt_completion_port; +#endif +} tp_jobq; + +/* + * queue of IO jobs + */ +typedef struct io_jobq { + PRCList list; + PRPollDesc *pollfds; + PRInt32 npollfds; + PRJob **polljobs; + PRLock *lock; + PRInt32 cnt; + PRFileDesc *notify_fd; + PRCList wthreads; +} io_jobq; + +/* + * Threadpool + */ +struct PRThreadPool { + PRInt32 init_threads; + PRInt32 max_threads; + PRInt32 current_threads; + PRInt32 idle_threads; + PRUint32 stacksize; + tp_jobq jobq; + io_jobq ioq; + timer_jobq timerq; + PRLock *join_lock; /* used with jobp->join_cv */ + PRCondVar *shutdown_cv; + PRBool shutdown; +}; + +typedef enum io_op_type + { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; + +#ifdef OPT_WINNT +typedef struct NT_notifier { + OVERLAPPED overlapped; /* must be first */ + PRJob *jobp; +} NT_notifier; +#endif + +struct PRJob { + PRCList links; /* for linking jobs */ + PRBool on_ioq; /* job on ioq */ + PRBool on_timerq; /* job on timerq */ + PRJobFn job_func; + void *job_arg; + PRCondVar *join_cv; + PRBool join_wait; /* == PR_TRUE, when waiting to join */ + PRCondVar *cancel_cv; /* for cancelling IO jobs */ + PRBool cancel_io; /* for cancelling IO jobs */ + PRThreadPool *tpool; /* back pointer to thread pool */ + PRJobIoDesc *iod; + io_op_type io_op; + PRInt16 io_poll_flags; + PRNetAddr *netaddr; + PRIntervalTime timeout; /* relative value */ + PRIntervalTime absolute; +#ifdef OPT_WINNT + NT_notifier nt_notifier; +#endif +}; + +#define JOB_LINKS_PTR(_qp) \ + ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) + +#define WTHREAD_LINKS_PTR(_qp) \ + ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) + +#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) + +#define JOIN_NOTIFY(_jobp) \ + PR_BEGIN_MACRO \ + PR_Lock(_jobp->tpool->join_lock); \ + _jobp->join_wait = PR_FALSE; \ + PR_NotifyCondVar(_jobp->join_cv); \ + PR_Unlock(_jobp->tpool->join_lock); \ + PR_END_MACRO + +#define CANCEL_IO_JOB(jobp) \ + PR_BEGIN_MACRO \ + jobp->cancel_io = PR_FALSE; \ + jobp->on_ioq = PR_FALSE; \ + PR_REMOVE_AND_INIT_LINK(&jobp->links); \ + tp->ioq.cnt--; \ + PR_NotifyCondVar(jobp->cancel_cv); \ + PR_END_MACRO + +static void delete_job(PRJob *jobp); +static PRThreadPool * alloc_threadpool(void); +static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); +static void notify_ioq(PRThreadPool *tp); +static void notify_timerq(PRThreadPool *tp); + +/* + * locks are acquired in the following order + * + * tp->ioq.lock,tp->timerq.lock + * | + * V + * tp->jobq->lock + */ + +/* + * worker thread function + */ +static void wstart(void *arg) +{ +PRThreadPool *tp = (PRThreadPool *) arg; +PRCList *head; + + /* + * execute jobs until shutdown + */ + while (!tp->shutdown) { + PRJob *jobp; +#ifdef OPT_WINNT + BOOL rv; + DWORD unused, shutdown; + LPOVERLAPPED olp; + + PR_Lock(tp->jobq.lock); + tp->idle_threads++; + PR_Unlock(tp->jobq.lock); + rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, + &unused, &shutdown, &olp, INFINITE); + + PR_ASSERT(rv); + if (shutdown) + break; + jobp = ((NT_notifier *) olp)->jobp; + PR_Lock(tp->jobq.lock); + tp->idle_threads--; + tp->jobq.cnt--; + PR_Unlock(tp->jobq.lock); +#else + + PR_Lock(tp->jobq.lock); + while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { + tp->idle_threads++; + PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); + tp->idle_threads--; + } + if (tp->shutdown) { + PR_Unlock(tp->jobq.lock); + break; + } + head = PR_LIST_HEAD(&tp->jobq.list); + /* + * remove job from queue + */ + PR_REMOVE_AND_INIT_LINK(head); + tp->jobq.cnt--; + jobp = JOB_LINKS_PTR(head); + PR_Unlock(tp->jobq.lock); +#endif + + jobp->job_func(jobp->job_arg); + if (!JOINABLE_JOB(jobp)) { + delete_job(jobp); + } else { + JOIN_NOTIFY(jobp); + } + } + PR_Lock(tp->jobq.lock); + tp->current_threads--; + PR_Unlock(tp->jobq.lock); +} + +/* + * add a job to the work queue + */ +static void +add_to_jobq(PRThreadPool *tp, PRJob *jobp) +{ + /* + * add to jobq + */ +#ifdef OPT_WINNT + PR_Lock(tp->jobq.lock); + tp->jobq.cnt++; + PR_Unlock(tp->jobq.lock); + /* + * notify worker thread(s) + */ + PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, + FALSE, &jobp->nt_notifier.overlapped); +#else + PR_Lock(tp->jobq.lock); + PR_APPEND_LINK(&jobp->links,&tp->jobq.list); + tp->jobq.cnt++; + if ((tp->idle_threads < tp->jobq.cnt) && + (tp->current_threads < tp->max_threads)) { + wthread *wthrp; + /* + * increment thread count and unlock the jobq lock + */ + tp->current_threads++; + PR_Unlock(tp->jobq.lock); + /* create new worker thread */ + wthrp = PR_NEWZAP(wthread); + if (wthrp) { + wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); + if (NULL == wthrp->thread) { + PR_DELETE(wthrp); /* this sets wthrp to NULL */ + } + } + PR_Lock(tp->jobq.lock); + if (NULL == wthrp) { + tp->current_threads--; + } else { + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); + } + } + /* + * wakeup a worker thread + */ + PR_NotifyCondVar(tp->jobq.cv); + PR_Unlock(tp->jobq.lock); +#endif +} + +/* + * io worker thread function + */ +static void io_wstart(void *arg) +{ +PRThreadPool *tp = (PRThreadPool *) arg; +int pollfd_cnt, pollfds_used; +int rv; +PRCList *qp, *nextqp; +PRPollDesc *pollfds = NULL; +PRJob **polljobs = NULL; +int poll_timeout; +PRIntervalTime now; + + /* + * scan io_jobq + * construct poll list + * call PR_Poll + * for all fds, for which poll returns true, move the job to + * jobq and wakeup worker thread. + */ + while (!tp->shutdown) { + PRJob *jobp; + + pollfd_cnt = tp->ioq.cnt + 10; + if (pollfd_cnt > tp->ioq.npollfds) { + + /* + * re-allocate pollfd array if the current one is not large + * enough + */ + if (NULL != tp->ioq.pollfds) + PR_Free(tp->ioq.pollfds); + tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * + (sizeof(PRPollDesc) + sizeof(PRJob *))); + PR_ASSERT(NULL != tp->ioq.pollfds); + /* + * array of pollfds + */ + pollfds = tp->ioq.pollfds; + tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); + /* + * parallel array of jobs + */ + polljobs = tp->ioq.polljobs; + tp->ioq.npollfds = pollfd_cnt; + } + + pollfds_used = 0; + /* + * add the notify fd; used for unblocking io thread(s) + */ + pollfds[pollfds_used].fd = tp->ioq.notify_fd; + pollfds[pollfds_used].in_flags = PR_POLL_READ; + pollfds[pollfds_used].out_flags = 0; + polljobs[pollfds_used] = NULL; + pollfds_used++; + /* + * fill in the pollfd array + */ + PR_Lock(tp->ioq.lock); + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { + nextqp = qp->next; + jobp = JOB_LINKS_PTR(qp); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + continue; + } + if (pollfds_used == (pollfd_cnt)) + break; + pollfds[pollfds_used].fd = jobp->iod->socket; + pollfds[pollfds_used].in_flags = jobp->io_poll_flags; + pollfds[pollfds_used].out_flags = 0; + polljobs[pollfds_used] = jobp; + + pollfds_used++; + } + if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { + qp = tp->ioq.list.next; + jobp = JOB_LINKS_PTR(qp); + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) + poll_timeout = PR_INTERVAL_NO_TIMEOUT; + else if (PR_INTERVAL_NO_WAIT == jobp->timeout) + poll_timeout = PR_INTERVAL_NO_WAIT; + else { + poll_timeout = jobp->absolute - PR_IntervalNow(); + if (poll_timeout <= 0) /* already timed out */ + poll_timeout = PR_INTERVAL_NO_WAIT; + } + } else { + poll_timeout = PR_INTERVAL_NO_TIMEOUT; + } + PR_Unlock(tp->ioq.lock); + + /* + * XXXX + * should retry if more jobs have been added to the queue? + * + */ + PR_ASSERT(pollfds_used <= pollfd_cnt); + rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); + + if (tp->shutdown) { + break; + } + + if (rv > 0) { + /* + * at least one io event is set + */ + PRStatus rval_status; + PRInt32 index; + + PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); + /* + * reset the pollable event, if notified + */ + if (pollfds[0].out_flags & PR_POLL_READ) { + rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); + PR_ASSERT(PR_SUCCESS == rval_status); + } + + for(index = 1; index < (pollfds_used); index++) { + PRInt16 events = pollfds[index].in_flags; + PRInt16 revents = pollfds[index].out_flags; + jobp = polljobs[index]; + + if ((revents & PR_POLL_NVAL) || /* busted in all cases */ + (revents & PR_POLL_ERR) || + ((events & PR_POLL_WRITE) && + (revents & PR_POLL_HUP))) { /* write op & hup */ + PR_Lock(tp->ioq.lock); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + PR_Unlock(tp->ioq.lock); + continue; + } + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; + PR_Unlock(tp->ioq.lock); + + /* set error */ + if (PR_POLL_NVAL & revents) + jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; + else if (PR_POLL_HUP & revents) + jobp->iod->error = PR_CONNECT_RESET_ERROR; + else + jobp->iod->error = PR_IO_ERROR; + + /* + * add to jobq + */ + add_to_jobq(tp, jobp); + } else if (revents) { + /* + * add to jobq + */ + PR_Lock(tp->ioq.lock); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + PR_Unlock(tp->ioq.lock); + continue; + } + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; + PR_Unlock(tp->ioq.lock); + + if (jobp->io_op == JOB_IO_CONNECT) { + if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) + jobp->iod->error = 0; + else + jobp->iod->error = PR_GetError(); + } else + jobp->iod->error = 0; + + add_to_jobq(tp, jobp); + } + } + } + /* + * timeout processing + */ + now = PR_IntervalNow(); + PR_Lock(tp->ioq.lock); + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { + nextqp = qp->next; + jobp = JOB_LINKS_PTR(qp); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + continue; + } + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) + break; + if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && + ((PRInt32)(jobp->absolute - now) > 0)) + break; + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; + jobp->iod->error = PR_IO_TIMEOUT_ERROR; + add_to_jobq(tp, jobp); + } + PR_Unlock(tp->ioq.lock); + } +} + +/* + * timer worker thread function + */ +static void timer_wstart(void *arg) +{ +PRThreadPool *tp = (PRThreadPool *) arg; +PRCList *qp; +PRIntervalTime timeout; +PRIntervalTime now; + + /* + * call PR_WaitCondVar with minimum value of all timeouts + */ + while (!tp->shutdown) { + PRJob *jobp; + + PR_Lock(tp->timerq.lock); + if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { + timeout = PR_INTERVAL_NO_TIMEOUT; + } else { + PRCList *qp; + + qp = tp->timerq.list.next; + jobp = JOB_LINKS_PTR(qp); + + timeout = jobp->absolute - PR_IntervalNow(); + if (timeout <= 0) + timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ + } + if (PR_INTERVAL_NO_WAIT != timeout) + PR_WaitCondVar(tp->timerq.cv, timeout); + if (tp->shutdown) { + PR_Unlock(tp->timerq.lock); + break; + } + /* + * move expired-timer jobs to jobq + */ + now = PR_IntervalNow(); + while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { + qp = tp->timerq.list.next; + jobp = JOB_LINKS_PTR(qp); + + if ((PRInt32)(jobp->absolute - now) > 0) { + break; + } + /* + * job timed out + */ + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->timerq.cnt--; + jobp->on_timerq = PR_FALSE; + add_to_jobq(tp, jobp); + } + PR_Unlock(tp->timerq.lock); + } +} + +static void +delete_threadpool(PRThreadPool *tp) +{ + if (NULL != tp) { + if (NULL != tp->shutdown_cv) + PR_DestroyCondVar(tp->shutdown_cv); + if (NULL != tp->jobq.cv) + PR_DestroyCondVar(tp->jobq.cv); + if (NULL != tp->jobq.lock) + PR_DestroyLock(tp->jobq.lock); + if (NULL != tp->join_lock) + PR_DestroyLock(tp->join_lock); +#ifdef OPT_WINNT + if (NULL != tp->jobq.nt_completion_port) + CloseHandle(tp->jobq.nt_completion_port); +#endif + /* Timer queue */ + if (NULL != tp->timerq.cv) + PR_DestroyCondVar(tp->timerq.cv); + if (NULL != tp->timerq.lock) + PR_DestroyLock(tp->timerq.lock); + + if (NULL != tp->ioq.lock) + PR_DestroyLock(tp->ioq.lock); + if (NULL != tp->ioq.pollfds) + PR_Free(tp->ioq.pollfds); + if (NULL != tp->ioq.notify_fd) + PR_DestroyPollableEvent(tp->ioq.notify_fd); + PR_Free(tp); + } + return; +} + +static PRThreadPool * +alloc_threadpool(void) +{ +PRThreadPool *tp; + + tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); + if (NULL == tp) + goto failed; + tp->jobq.lock = PR_NewLock(); + if (NULL == tp->jobq.lock) + goto failed; + tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); + if (NULL == tp->jobq.cv) + goto failed; + tp->join_lock = PR_NewLock(); + if (NULL == tp->join_lock) + goto failed; +#ifdef OPT_WINNT + tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, + NULL, 0, 0); + if (NULL == tp->jobq.nt_completion_port) + goto failed; +#endif + + tp->ioq.lock = PR_NewLock(); + if (NULL == tp->ioq.lock) + goto failed; + + /* Timer queue */ + + tp->timerq.lock = PR_NewLock(); + if (NULL == tp->timerq.lock) + goto failed; + tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); + if (NULL == tp->timerq.cv) + goto failed; + + tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); + if (NULL == tp->shutdown_cv) + goto failed; + tp->ioq.notify_fd = PR_NewPollableEvent(); + if (NULL == tp->ioq.notify_fd) + goto failed; + return tp; +failed: + delete_threadpool(tp); + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); + return NULL; +} + +/* Create thread pool */ +PR_IMPLEMENT(PRThreadPool *) +PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, + PRUint32 stacksize) +{ +PRThreadPool *tp; +PRThread *thr; +int i; +wthread *wthrp; + + tp = alloc_threadpool(); + if (NULL == tp) + return NULL; + + tp->init_threads = initial_threads; + tp->max_threads = max_threads; + tp->stacksize = stacksize; + PR_INIT_CLIST(&tp->jobq.list); + PR_INIT_CLIST(&tp->ioq.list); + PR_INIT_CLIST(&tp->timerq.list); + PR_INIT_CLIST(&tp->jobq.wthreads); + PR_INIT_CLIST(&tp->ioq.wthreads); + PR_INIT_CLIST(&tp->timerq.wthreads); + tp->shutdown = PR_FALSE; + + PR_Lock(tp->jobq.lock); + for(i=0; i < initial_threads; ++i) { + + thr = PR_CreateThread(PR_USER_THREAD, wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); + } + tp->current_threads = initial_threads; + + thr = PR_CreateThread(PR_USER_THREAD, io_wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); + + thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); + + PR_Unlock(tp->jobq.lock); + return tp; +} + +static void +delete_job(PRJob *jobp) +{ + if (NULL != jobp) { + if (NULL != jobp->join_cv) { + PR_DestroyCondVar(jobp->join_cv); + jobp->join_cv = NULL; + } + if (NULL != jobp->cancel_cv) { + PR_DestroyCondVar(jobp->cancel_cv); + jobp->cancel_cv = NULL; + } + PR_DELETE(jobp); + } +} + +static PRJob * +alloc_job(PRBool joinable, PRThreadPool *tp) +{ + PRJob *jobp; + + jobp = PR_NEWZAP(PRJob); + if (NULL == jobp) + goto failed; + if (joinable) { + jobp->join_cv = PR_NewCondVar(tp->join_lock); + jobp->join_wait = PR_TRUE; + if (NULL == jobp->join_cv) + goto failed; + } else { + jobp->join_cv = NULL; + } +#ifdef OPT_WINNT + jobp->nt_notifier.jobp = jobp; +#endif + return jobp; +failed: + delete_job(jobp); + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); + return NULL; +} + +/* queue a job */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) +{ + PRJob *jobp; + + jobp = alloc_job(joinable, tpool); + if (NULL == jobp) + return NULL; + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->tpool = tpool; + + add_to_jobq(tpool, jobp); + return jobp; +} + +/* queue a job, when a socket is readable or writeable */ +static PRJob * +queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, + PRBool joinable, io_op_type op) +{ + PRJob *jobp; + PRIntervalTime now; + + jobp = alloc_job(joinable, tpool); + if (NULL == jobp) { + return NULL; + } + + /* + * Add a new job to io_jobq + * wakeup io worker thread + */ + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->tpool = tpool; + jobp->iod = iod; + if (JOB_IO_READ == op) { + jobp->io_op = JOB_IO_READ; + jobp->io_poll_flags = PR_POLL_READ; + } else if (JOB_IO_WRITE == op) { + jobp->io_op = JOB_IO_WRITE; + jobp->io_poll_flags = PR_POLL_WRITE; + } else if (JOB_IO_ACCEPT == op) { + jobp->io_op = JOB_IO_ACCEPT; + jobp->io_poll_flags = PR_POLL_READ; + } else if (JOB_IO_CONNECT == op) { + jobp->io_op = JOB_IO_CONNECT; + jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; + } else { + delete_job(jobp); + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return NULL; + } + + jobp->timeout = iod->timeout; + if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || + (PR_INTERVAL_NO_WAIT == iod->timeout)) { + jobp->absolute = iod->timeout; + } else { + now = PR_IntervalNow(); + jobp->absolute = now + iod->timeout; + } + + + PR_Lock(tpool->ioq.lock); + + if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || + (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { + PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); + } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { + PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); + } else { + PRCList *qp; + PRJob *tmp_jobp; + /* + * insert into the timeout-sorted ioq + */ + for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; + qp = qp->prev) { + tmp_jobp = JOB_LINKS_PTR(qp); + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { + break; + } + } + PR_INSERT_AFTER(&jobp->links,qp); + } + + jobp->on_ioq = PR_TRUE; + tpool->ioq.cnt++; + /* + * notify io worker thread(s) + */ + PR_Unlock(tpool->ioq.lock); + notify_ioq(tpool); + return jobp; +} + +/* queue a job, when a socket is readable */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, + PRBool joinable) +{ + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); +} + +/* queue a job, when a socket is writeable */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, + PRBool joinable) +{ + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); +} + + +/* queue a job, when a socket has a pending connection */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, + void * arg, PRBool joinable) +{ + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); +} + +/* queue a job, when a socket can be connected */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, + const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) +{ + PRStatus rv; + PRErrorCode err; + + rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); + if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ + /* connection pending */ + return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); + } else { + /* + * connection succeeded or failed; add to jobq right away + */ + if (rv == PR_FAILURE) + iod->error = err; + else + iod->error = 0; + return(PR_QueueJob(tpool, fn, arg, joinable)); + } +} + +/* queue a job, when a timer expires */ +PR_IMPLEMENT(PRJob *) +PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, + PRJobFn fn, void * arg, PRBool joinable) +{ + PRIntervalTime now; + PRJob *jobp; + + if (PR_INTERVAL_NO_TIMEOUT == timeout) { + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return NULL; + } + if (PR_INTERVAL_NO_WAIT == timeout) { + /* + * no waiting; add to jobq right away + */ + return(PR_QueueJob(tpool, fn, arg, joinable)); + } + jobp = alloc_job(joinable, tpool); + if (NULL == jobp) { + return NULL; + } + + /* + * Add a new job to timer_jobq + * wakeup timer worker thread + */ + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->tpool = tpool; + jobp->timeout = timeout; + + now = PR_IntervalNow(); + jobp->absolute = now + timeout; + + + PR_Lock(tpool->timerq.lock); + jobp->on_timerq = PR_TRUE; + if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) + PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); + else { + PRCList *qp; + PRJob *tmp_jobp; + /* + * insert into the sorted timer jobq + */ + for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; + qp = qp->prev) { + tmp_jobp = JOB_LINKS_PTR(qp); + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { + break; + } + } + PR_INSERT_AFTER(&jobp->links,qp); + } + tpool->timerq.cnt++; + /* + * notify timer worker thread(s) + */ + notify_timerq(tpool); + PR_Unlock(tpool->timerq.lock); + return jobp; +} + +static void +notify_timerq(PRThreadPool *tp) +{ + /* + * wakeup the timer thread(s) + */ + PR_NotifyCondVar(tp->timerq.cv); +} + +static void +notify_ioq(PRThreadPool *tp) +{ +PRStatus rval_status; + + /* + * wakeup the io thread(s) + */ + rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); + PR_ASSERT(PR_SUCCESS == rval_status); +} + +/* + * cancel a job + * + * XXXX: is this needed? likely to be removed + */ +PR_IMPLEMENT(PRStatus) +PR_CancelJob(PRJob *jobp) { + + PRStatus rval = PR_FAILURE; + PRThreadPool *tp; + + if (jobp->on_timerq) { + /* + * now, check again while holding the timerq lock + */ + tp = jobp->tpool; + PR_Lock(tp->timerq.lock); + if (jobp->on_timerq) { + jobp->on_timerq = PR_FALSE; + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->timerq.cnt--; + PR_Unlock(tp->timerq.lock); + if (!JOINABLE_JOB(jobp)) { + delete_job(jobp); + } else { + JOIN_NOTIFY(jobp); + } + rval = PR_SUCCESS; + } else + PR_Unlock(tp->timerq.lock); + } else if (jobp->on_ioq) { + /* + * now, check again while holding the ioq lock + */ + tp = jobp->tpool; + PR_Lock(tp->ioq.lock); + if (jobp->on_ioq) { + jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); + if (NULL == jobp->cancel_cv) { + PR_Unlock(tp->ioq.lock); + PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); + return PR_FAILURE; + } + /* + * mark job 'cancelled' and notify io thread(s) + * XXXX: + * this assumes there is only one io thread; when there + * are multiple threads, the io thread processing this job + * must be notified. + */ + jobp->cancel_io = PR_TRUE; + PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ + notify_ioq(tp); + PR_Lock(tp->ioq.lock); + while (jobp->cancel_io) + PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); + PR_Unlock(tp->ioq.lock); + PR_ASSERT(!jobp->on_ioq); + if (!JOINABLE_JOB(jobp)) { + delete_job(jobp); + } else { + JOIN_NOTIFY(jobp); + } + rval = PR_SUCCESS; + } else + PR_Unlock(tp->ioq.lock); + } + if (PR_FAILURE == rval) + PR_SetError(PR_INVALID_STATE_ERROR, 0); + return rval; +} + +/* join a job, wait until completion */ +PR_IMPLEMENT(PRStatus) +PR_JoinJob(PRJob *jobp) +{ + if (!JOINABLE_JOB(jobp)) { + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return PR_FAILURE; + } + PR_Lock(jobp->tpool->join_lock); + while(jobp->join_wait) + PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); + PR_Unlock(jobp->tpool->join_lock); + delete_job(jobp); + return PR_SUCCESS; +} + +/* shutdown threadpool */ +PR_IMPLEMENT(PRStatus) +PR_ShutdownThreadPool(PRThreadPool *tpool) +{ +PRStatus rval = PR_SUCCESS; + + PR_Lock(tpool->jobq.lock); + tpool->shutdown = PR_TRUE; + PR_NotifyAllCondVar(tpool->shutdown_cv); + PR_Unlock(tpool->jobq.lock); + + return rval; +} + +/* + * join thread pool + * wait for termination of worker threads + * reclaim threadpool resources + */ +PR_IMPLEMENT(PRStatus) +PR_JoinThreadPool(PRThreadPool *tpool) +{ +PRStatus rval = PR_SUCCESS; +PRCList *head; +PRStatus rval_status; + + PR_Lock(tpool->jobq.lock); + while (!tpool->shutdown) + PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); + + /* + * wakeup worker threads + */ +#ifdef OPT_WINNT + /* + * post shutdown notification for all threads + */ + { + int i; + for(i=0; i < tpool->current_threads; i++) { + PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, + TRUE, NULL); + } + } +#else + PR_NotifyAllCondVar(tpool->jobq.cv); +#endif + + /* + * wakeup io thread(s) + */ + notify_ioq(tpool); + + /* + * wakeup timer thread(s) + */ + PR_Lock(tpool->timerq.lock); + notify_timerq(tpool); + PR_Unlock(tpool->timerq.lock); + + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->jobq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + PR_Unlock(tpool->jobq.lock); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + PR_Lock(tpool->jobq.lock); + } + PR_Unlock(tpool->jobq.lock); + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->ioq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + } + + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->timerq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + } + + /* + * Delete queued jobs + */ + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->jobq.list); + PR_REMOVE_AND_INIT_LINK(head); + jobp = JOB_LINKS_PTR(head); + tpool->jobq.cnt--; + delete_job(jobp); + } + + /* delete io jobs */ + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->ioq.list); + PR_REMOVE_AND_INIT_LINK(head); + tpool->ioq.cnt--; + jobp = JOB_LINKS_PTR(head); + delete_job(jobp); + } + + /* delete timer jobs */ + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->timerq.list); + PR_REMOVE_AND_INIT_LINK(head); + tpool->timerq.cnt--; + jobp = JOB_LINKS_PTR(head); + delete_job(jobp); + } + + PR_ASSERT(0 == tpool->jobq.cnt); + PR_ASSERT(0 == tpool->ioq.cnt); + PR_ASSERT(0 == tpool->timerq.cnt); + + delete_threadpool(tpool); + return rval; +} |