diff options
Diffstat (limited to 'nsprpub/pr/src/misc/prtpool.c')
-rw-r--r-- | nsprpub/pr/src/misc/prtpool.c | 1919 |
1 files changed, 984 insertions, 935 deletions
diff --git a/nsprpub/pr/src/misc/prtpool.c b/nsprpub/pr/src/misc/prtpool.c index c2cc9c803..69f588ef6 100644 --- a/nsprpub/pr/src/misc/prtpool.c +++ b/nsprpub/pr/src/misc/prtpool.c @@ -7,8 +7,8 @@ /* * Thread pools - * Thread pools create and manage threads to provide support for - * scheduling jobs onto one or more threads. + * Thread pools create and manage threads to provide support for + * scheduling jobs onto one or more threads. * */ #ifdef OPT_WINNT @@ -19,32 +19,32 @@ * worker thread */ typedef struct wthread { - PRCList links; - PRThread *thread; + PRCList links; + PRThread *thread; } wthread; /* * queue of timer jobs */ typedef struct timer_jobq { - PRCList list; - PRLock *lock; - PRCondVar *cv; - PRInt32 cnt; - PRCList wthreads; + PRCList list; + PRLock *lock; + PRCondVar *cv; + PRInt32 cnt; + PRCList wthreads; } timer_jobq; /* * queue of jobs */ typedef struct tp_jobq { - PRCList list; - PRInt32 cnt; - PRLock *lock; - PRCondVar *cv; - PRCList wthreads; + PRCList list; + PRInt32 cnt; + PRLock *lock; + PRCondVar *cv; + PRCList wthreads; #ifdef OPT_WINNT - HANDLE nt_completion_port; + HANDLE nt_completion_port; #endif } tp_jobq; @@ -52,62 +52,62 @@ typedef struct tp_jobq { * queue of IO jobs */ typedef struct io_jobq { - PRCList list; - PRPollDesc *pollfds; - PRInt32 npollfds; - PRJob **polljobs; - PRLock *lock; - PRInt32 cnt; - PRFileDesc *notify_fd; - PRCList wthreads; + PRCList list; + PRPollDesc *pollfds; + PRInt32 npollfds; + PRJob **polljobs; + PRLock *lock; + PRInt32 cnt; + PRFileDesc *notify_fd; + PRCList wthreads; } io_jobq; /* * Threadpool */ struct PRThreadPool { - PRInt32 init_threads; - PRInt32 max_threads; - PRInt32 current_threads; - PRInt32 idle_threads; - PRUint32 stacksize; - tp_jobq jobq; - io_jobq ioq; - timer_jobq timerq; - PRLock *join_lock; /* used with jobp->join_cv */ - PRCondVar *shutdown_cv; - PRBool shutdown; + PRInt32 init_threads; + PRInt32 max_threads; + PRInt32 current_threads; + PRInt32 idle_threads; + PRUint32 stacksize; + tp_jobq jobq; + io_jobq ioq; + timer_jobq timerq; + PRLock *join_lock; /* used with jobp->join_cv */ + PRCondVar *shutdown_cv; + PRBool shutdown; }; typedef enum io_op_type - { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; +{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; #ifdef OPT_WINNT typedef struct NT_notifier { - OVERLAPPED overlapped; /* must be first */ - PRJob *jobp; + OVERLAPPED overlapped; /* must be first */ + PRJob *jobp; } NT_notifier; #endif struct PRJob { - PRCList links; /* for linking jobs */ - PRBool on_ioq; /* job on ioq */ - PRBool on_timerq; /* job on timerq */ - PRJobFn job_func; - void *job_arg; - PRCondVar *join_cv; - PRBool join_wait; /* == PR_TRUE, when waiting to join */ - PRCondVar *cancel_cv; /* for cancelling IO jobs */ - PRBool cancel_io; /* for cancelling IO jobs */ - PRThreadPool *tpool; /* back pointer to thread pool */ - PRJobIoDesc *iod; - io_op_type io_op; - PRInt16 io_poll_flags; - PRNetAddr *netaddr; - PRIntervalTime timeout; /* relative value */ - PRIntervalTime absolute; + PRCList links; /* for linking jobs */ + PRBool on_ioq; /* job on ioq */ + PRBool on_timerq; /* job on timerq */ + PRJobFn job_func; + void *job_arg; + PRCondVar *join_cv; + PRBool join_wait; /* == PR_TRUE, when waiting to join */ + PRCondVar *cancel_cv; /* for cancelling IO jobs */ + PRBool cancel_io; /* for cancelling IO jobs */ + PRThreadPool *tpool; /* back pointer to thread pool */ + PRJobIoDesc *iod; + io_op_type io_op; + PRInt16 io_poll_flags; + PRNetAddr *netaddr; + PRIntervalTime timeout; /* relative value */ + PRIntervalTime absolute; #ifdef OPT_WINNT - NT_notifier nt_notifier; + NT_notifier nt_notifier; #endif }; @@ -119,22 +119,22 @@ struct PRJob { #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) -#define JOIN_NOTIFY(_jobp) \ - PR_BEGIN_MACRO \ - PR_Lock(_jobp->tpool->join_lock); \ - _jobp->join_wait = PR_FALSE; \ - PR_NotifyCondVar(_jobp->join_cv); \ - PR_Unlock(_jobp->tpool->join_lock); \ - PR_END_MACRO - -#define CANCEL_IO_JOB(jobp) \ - PR_BEGIN_MACRO \ - jobp->cancel_io = PR_FALSE; \ - jobp->on_ioq = PR_FALSE; \ - PR_REMOVE_AND_INIT_LINK(&jobp->links); \ - tp->ioq.cnt--; \ - PR_NotifyCondVar(jobp->cancel_cv); \ - PR_END_MACRO +#define JOIN_NOTIFY(_jobp) \ + PR_BEGIN_MACRO \ + PR_Lock(_jobp->tpool->join_lock); \ + _jobp->join_wait = PR_FALSE; \ + PR_NotifyCondVar(_jobp->join_cv); \ + PR_Unlock(_jobp->tpool->join_lock); \ + PR_END_MACRO + +#define CANCEL_IO_JOB(jobp) \ + PR_BEGIN_MACRO \ + jobp->cancel_io = PR_FALSE; \ + jobp->on_ioq = PR_FALSE; \ + PR_REMOVE_AND_INIT_LINK(&jobp->links); \ + tp->ioq.cnt--; \ + PR_NotifyCondVar(jobp->cancel_cv); \ + PR_END_MACRO static void delete_job(PRJob *jobp); static PRThreadPool * alloc_threadpool(void); @@ -145,10 +145,10 @@ static void notify_timerq(PRThreadPool *tp); /* * locks are acquired in the following order * - * tp->ioq.lock,tp->timerq.lock - * | - * V - * tp->jobq->lock + * tp->ioq.lock,tp->timerq.lock + * | + * V + * tp->jobq->lock */ /* @@ -156,65 +156,66 @@ static void notify_timerq(PRThreadPool *tp); */ static void wstart(void *arg) { -PRThreadPool *tp = (PRThreadPool *) arg; -PRCList *head; - - /* - * execute jobs until shutdown - */ - while (!tp->shutdown) { - PRJob *jobp; + PRThreadPool *tp = (PRThreadPool *) arg; + PRCList *head; + + /* + * execute jobs until shutdown + */ + while (!tp->shutdown) { + PRJob *jobp; #ifdef OPT_WINNT - BOOL rv; - DWORD unused, shutdown; - LPOVERLAPPED olp; - - PR_Lock(tp->jobq.lock); - tp->idle_threads++; - PR_Unlock(tp->jobq.lock); - rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, - &unused, &shutdown, &olp, INFINITE); - - PR_ASSERT(rv); - if (shutdown) - break; - jobp = ((NT_notifier *) olp)->jobp; - PR_Lock(tp->jobq.lock); - tp->idle_threads--; - tp->jobq.cnt--; - PR_Unlock(tp->jobq.lock); + BOOL rv; + DWORD unused, shutdown; + LPOVERLAPPED olp; + + PR_Lock(tp->jobq.lock); + tp->idle_threads++; + PR_Unlock(tp->jobq.lock); + rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, + &unused, &shutdown, &olp, INFINITE); + + PR_ASSERT(rv); + if (shutdown) { + break; + } + jobp = ((NT_notifier *) olp)->jobp; + PR_Lock(tp->jobq.lock); + tp->idle_threads--; + tp->jobq.cnt--; + PR_Unlock(tp->jobq.lock); #else - PR_Lock(tp->jobq.lock); - while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { - tp->idle_threads++; - PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); - tp->idle_threads--; - } - if (tp->shutdown) { - PR_Unlock(tp->jobq.lock); - break; - } - head = PR_LIST_HEAD(&tp->jobq.list); - /* - * remove job from queue - */ - PR_REMOVE_AND_INIT_LINK(head); - tp->jobq.cnt--; - jobp = JOB_LINKS_PTR(head); - PR_Unlock(tp->jobq.lock); + PR_Lock(tp->jobq.lock); + while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { + tp->idle_threads++; + PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); + tp->idle_threads--; + } + if (tp->shutdown) { + PR_Unlock(tp->jobq.lock); + break; + } + head = PR_LIST_HEAD(&tp->jobq.list); + /* + * remove job from queue + */ + PR_REMOVE_AND_INIT_LINK(head); + tp->jobq.cnt--; + jobp = JOB_LINKS_PTR(head); + PR_Unlock(tp->jobq.lock); #endif - jobp->job_func(jobp->job_arg); - if (!JOINABLE_JOB(jobp)) { - delete_job(jobp); - } else { - JOIN_NOTIFY(jobp); - } - } - PR_Lock(tp->jobq.lock); - tp->current_threads--; - PR_Unlock(tp->jobq.lock); + jobp->job_func(jobp->job_arg); + if (!JOINABLE_JOB(jobp)) { + delete_job(jobp); + } else { + JOIN_NOTIFY(jobp); + } + } + PR_Lock(tp->jobq.lock); + tp->current_threads--; + PR_Unlock(tp->jobq.lock); } /* @@ -223,52 +224,52 @@ PRCList *head; static void add_to_jobq(PRThreadPool *tp, PRJob *jobp) { - /* - * add to jobq - */ + /* + * add to jobq + */ #ifdef OPT_WINNT - PR_Lock(tp->jobq.lock); - tp->jobq.cnt++; - PR_Unlock(tp->jobq.lock); - /* - * notify worker thread(s) - */ - PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, - FALSE, &jobp->nt_notifier.overlapped); + PR_Lock(tp->jobq.lock); + tp->jobq.cnt++; + PR_Unlock(tp->jobq.lock); + /* + * notify worker thread(s) + */ + PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, + FALSE, &jobp->nt_notifier.overlapped); #else - PR_Lock(tp->jobq.lock); - PR_APPEND_LINK(&jobp->links,&tp->jobq.list); - tp->jobq.cnt++; - if ((tp->idle_threads < tp->jobq.cnt) && - (tp->current_threads < tp->max_threads)) { - wthread *wthrp; - /* - * increment thread count and unlock the jobq lock - */ - tp->current_threads++; - PR_Unlock(tp->jobq.lock); - /* create new worker thread */ - wthrp = PR_NEWZAP(wthread); - if (wthrp) { - wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, - tp, PR_PRIORITY_NORMAL, - PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); - if (NULL == wthrp->thread) { - PR_DELETE(wthrp); /* this sets wthrp to NULL */ - } - } - PR_Lock(tp->jobq.lock); - if (NULL == wthrp) { - tp->current_threads--; - } else { - PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); - } - } - /* - * wakeup a worker thread - */ - PR_NotifyCondVar(tp->jobq.cv); - PR_Unlock(tp->jobq.lock); + PR_Lock(tp->jobq.lock); + PR_APPEND_LINK(&jobp->links,&tp->jobq.list); + tp->jobq.cnt++; + if ((tp->idle_threads < tp->jobq.cnt) && + (tp->current_threads < tp->max_threads)) { + wthread *wthrp; + /* + * increment thread count and unlock the jobq lock + */ + tp->current_threads++; + PR_Unlock(tp->jobq.lock); + /* create new worker thread */ + wthrp = PR_NEWZAP(wthread); + if (wthrp) { + wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); + if (NULL == wthrp->thread) { + PR_DELETE(wthrp); /* this sets wthrp to NULL */ + } + } + PR_Lock(tp->jobq.lock); + if (NULL == wthrp) { + tp->current_threads--; + } else { + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); + } + } + /* + * wakeup a worker thread + */ + PR_NotifyCondVar(tp->jobq.cv); + PR_Unlock(tp->jobq.lock); #endif } @@ -277,207 +278,220 @@ add_to_jobq(PRThreadPool *tp, PRJob *jobp) */ static void io_wstart(void *arg) { -PRThreadPool *tp = (PRThreadPool *) arg; -int pollfd_cnt, pollfds_used; -int rv; -PRCList *qp, *nextqp; -PRPollDesc *pollfds = NULL; -PRJob **polljobs = NULL; -int poll_timeout; -PRIntervalTime now; - - /* - * scan io_jobq - * construct poll list - * call PR_Poll - * for all fds, for which poll returns true, move the job to - * jobq and wakeup worker thread. - */ - while (!tp->shutdown) { - PRJob *jobp; - - pollfd_cnt = tp->ioq.cnt + 10; - if (pollfd_cnt > tp->ioq.npollfds) { - - /* - * re-allocate pollfd array if the current one is not large - * enough - */ - if (NULL != tp->ioq.pollfds) - PR_Free(tp->ioq.pollfds); - tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * - (sizeof(PRPollDesc) + sizeof(PRJob *))); - PR_ASSERT(NULL != tp->ioq.pollfds); - /* - * array of pollfds - */ - pollfds = tp->ioq.pollfds; - tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); - /* - * parallel array of jobs - */ - polljobs = tp->ioq.polljobs; - tp->ioq.npollfds = pollfd_cnt; - } - - pollfds_used = 0; - /* - * add the notify fd; used for unblocking io thread(s) - */ - pollfds[pollfds_used].fd = tp->ioq.notify_fd; - pollfds[pollfds_used].in_flags = PR_POLL_READ; - pollfds[pollfds_used].out_flags = 0; - polljobs[pollfds_used] = NULL; - pollfds_used++; - /* - * fill in the pollfd array - */ - PR_Lock(tp->ioq.lock); - for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { - nextqp = qp->next; - jobp = JOB_LINKS_PTR(qp); - if (jobp->cancel_io) { - CANCEL_IO_JOB(jobp); - continue; - } - if (pollfds_used == (pollfd_cnt)) - break; - pollfds[pollfds_used].fd = jobp->iod->socket; - pollfds[pollfds_used].in_flags = jobp->io_poll_flags; - pollfds[pollfds_used].out_flags = 0; - polljobs[pollfds_used] = jobp; - - pollfds_used++; - } - if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { - qp = tp->ioq.list.next; - jobp = JOB_LINKS_PTR(qp); - if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) - poll_timeout = PR_INTERVAL_NO_TIMEOUT; - else if (PR_INTERVAL_NO_WAIT == jobp->timeout) - poll_timeout = PR_INTERVAL_NO_WAIT; - else { - poll_timeout = jobp->absolute - PR_IntervalNow(); - if (poll_timeout <= 0) /* already timed out */ - poll_timeout = PR_INTERVAL_NO_WAIT; - } - } else { - poll_timeout = PR_INTERVAL_NO_TIMEOUT; - } - PR_Unlock(tp->ioq.lock); - - /* - * XXXX - * should retry if more jobs have been added to the queue? - * - */ - PR_ASSERT(pollfds_used <= pollfd_cnt); - rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); - - if (tp->shutdown) { - break; - } - - if (rv > 0) { - /* - * at least one io event is set - */ - PRStatus rval_status; - PRInt32 index; - - PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); - /* - * reset the pollable event, if notified - */ - if (pollfds[0].out_flags & PR_POLL_READ) { - rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); - PR_ASSERT(PR_SUCCESS == rval_status); - } - - for(index = 1; index < (pollfds_used); index++) { + PRThreadPool *tp = (PRThreadPool *) arg; + int pollfd_cnt, pollfds_used; + int rv; + PRCList *qp, *nextqp; + PRPollDesc *pollfds = NULL; + PRJob **polljobs = NULL; + int poll_timeout; + PRIntervalTime now; + + /* + * scan io_jobq + * construct poll list + * call PR_Poll + * for all fds, for which poll returns true, move the job to + * jobq and wakeup worker thread. + */ + while (!tp->shutdown) { + PRJob *jobp; + + pollfd_cnt = tp->ioq.cnt + 10; + if (pollfd_cnt > tp->ioq.npollfds) { + + /* + * re-allocate pollfd array if the current one is not large + * enough + */ + if (NULL != tp->ioq.pollfds) { + PR_Free(tp->ioq.pollfds); + } + tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * + (sizeof(PRPollDesc) + sizeof(PRJob *))); + PR_ASSERT(NULL != tp->ioq.pollfds); + /* + * array of pollfds + */ + pollfds = tp->ioq.pollfds; + tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); + /* + * parallel array of jobs + */ + polljobs = tp->ioq.polljobs; + tp->ioq.npollfds = pollfd_cnt; + } + + pollfds_used = 0; + /* + * add the notify fd; used for unblocking io thread(s) + */ + pollfds[pollfds_used].fd = tp->ioq.notify_fd; + pollfds[pollfds_used].in_flags = PR_POLL_READ; + pollfds[pollfds_used].out_flags = 0; + polljobs[pollfds_used] = NULL; + pollfds_used++; + /* + * fill in the pollfd array + */ + PR_Lock(tp->ioq.lock); + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { + nextqp = qp->next; + jobp = JOB_LINKS_PTR(qp); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + continue; + } + if (pollfds_used == (pollfd_cnt)) { + break; + } + pollfds[pollfds_used].fd = jobp->iod->socket; + pollfds[pollfds_used].in_flags = jobp->io_poll_flags; + pollfds[pollfds_used].out_flags = 0; + polljobs[pollfds_used] = jobp; + + pollfds_used++; + } + if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { + qp = tp->ioq.list.next; + jobp = JOB_LINKS_PTR(qp); + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) { + poll_timeout = PR_INTERVAL_NO_TIMEOUT; + } + else if (PR_INTERVAL_NO_WAIT == jobp->timeout) { + poll_timeout = PR_INTERVAL_NO_WAIT; + } + else { + poll_timeout = jobp->absolute - PR_IntervalNow(); + if (poll_timeout <= 0) { /* already timed out */ + poll_timeout = PR_INTERVAL_NO_WAIT; + } + } + } else { + poll_timeout = PR_INTERVAL_NO_TIMEOUT; + } + PR_Unlock(tp->ioq.lock); + + /* + * XXXX + * should retry if more jobs have been added to the queue? + * + */ + PR_ASSERT(pollfds_used <= pollfd_cnt); + rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); + + if (tp->shutdown) { + break; + } + + if (rv > 0) { + /* + * at least one io event is set + */ + PRStatus rval_status; + PRInt32 index; + + PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); + /* + * reset the pollable event, if notified + */ + if (pollfds[0].out_flags & PR_POLL_READ) { + rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); + PR_ASSERT(PR_SUCCESS == rval_status); + } + + for(index = 1; index < (pollfds_used); index++) { PRInt16 events = pollfds[index].in_flags; - PRInt16 revents = pollfds[index].out_flags; - jobp = polljobs[index]; + PRInt16 revents = pollfds[index].out_flags; + jobp = polljobs[index]; if ((revents & PR_POLL_NVAL) || /* busted in all cases */ - (revents & PR_POLL_ERR) || - ((events & PR_POLL_WRITE) && - (revents & PR_POLL_HUP))) { /* write op & hup */ - PR_Lock(tp->ioq.lock); - if (jobp->cancel_io) { - CANCEL_IO_JOB(jobp); - PR_Unlock(tp->ioq.lock); - continue; - } - PR_REMOVE_AND_INIT_LINK(&jobp->links); - tp->ioq.cnt--; - jobp->on_ioq = PR_FALSE; - PR_Unlock(tp->ioq.lock); - - /* set error */ - if (PR_POLL_NVAL & revents) - jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; - else if (PR_POLL_HUP & revents) - jobp->iod->error = PR_CONNECT_RESET_ERROR; - else - jobp->iod->error = PR_IO_ERROR; - - /* - * add to jobq - */ - add_to_jobq(tp, jobp); - } else if (revents) { - /* - * add to jobq - */ - PR_Lock(tp->ioq.lock); - if (jobp->cancel_io) { - CANCEL_IO_JOB(jobp); - PR_Unlock(tp->ioq.lock); - continue; - } - PR_REMOVE_AND_INIT_LINK(&jobp->links); - tp->ioq.cnt--; - jobp->on_ioq = PR_FALSE; - PR_Unlock(tp->ioq.lock); - - if (jobp->io_op == JOB_IO_CONNECT) { - if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) - jobp->iod->error = 0; - else - jobp->iod->error = PR_GetError(); - } else - jobp->iod->error = 0; - - add_to_jobq(tp, jobp); - } - } - } - /* - * timeout processing - */ - now = PR_IntervalNow(); - PR_Lock(tp->ioq.lock); - for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { - nextqp = qp->next; - jobp = JOB_LINKS_PTR(qp); - if (jobp->cancel_io) { - CANCEL_IO_JOB(jobp); - continue; - } - if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) - break; - if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && - ((PRInt32)(jobp->absolute - now) > 0)) - break; - PR_REMOVE_AND_INIT_LINK(&jobp->links); - tp->ioq.cnt--; - jobp->on_ioq = PR_FALSE; - jobp->iod->error = PR_IO_TIMEOUT_ERROR; - add_to_jobq(tp, jobp); - } - PR_Unlock(tp->ioq.lock); - } + (revents & PR_POLL_ERR) || + ((events & PR_POLL_WRITE) && + (revents & PR_POLL_HUP))) { /* write op & hup */ + PR_Lock(tp->ioq.lock); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + PR_Unlock(tp->ioq.lock); + continue; + } + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; + PR_Unlock(tp->ioq.lock); + + /* set error */ + if (PR_POLL_NVAL & revents) { + jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; + } + else if (PR_POLL_HUP & revents) { + jobp->iod->error = PR_CONNECT_RESET_ERROR; + } + else { + jobp->iod->error = PR_IO_ERROR; + } + + /* + * add to jobq + */ + add_to_jobq(tp, jobp); + } else if (revents) { + /* + * add to jobq + */ + PR_Lock(tp->ioq.lock); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + PR_Unlock(tp->ioq.lock); + continue; + } + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; + PR_Unlock(tp->ioq.lock); + + if (jobp->io_op == JOB_IO_CONNECT) { + if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) { + jobp->iod->error = 0; + } + else { + jobp->iod->error = PR_GetError(); + } + } else { + jobp->iod->error = 0; + } + + add_to_jobq(tp, jobp); + } + } + } + /* + * timeout processing + */ + now = PR_IntervalNow(); + PR_Lock(tp->ioq.lock); + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { + nextqp = qp->next; + jobp = JOB_LINKS_PTR(qp); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + continue; + } + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) { + break; + } + if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && + ((PRInt32)(jobp->absolute - now) > 0)) { + break; + } + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; + jobp->iod->error = PR_IO_TIMEOUT_ERROR; + add_to_jobq(tp, jobp); + } + PR_Unlock(tp->ioq.lock); + } } /* @@ -485,391 +499,419 @@ PRIntervalTime now; */ static void timer_wstart(void *arg) { -PRThreadPool *tp = (PRThreadPool *) arg; -PRCList *qp; -PRIntervalTime timeout; -PRIntervalTime now; - - /* - * call PR_WaitCondVar with minimum value of all timeouts - */ - while (!tp->shutdown) { - PRJob *jobp; - - PR_Lock(tp->timerq.lock); - if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { - timeout = PR_INTERVAL_NO_TIMEOUT; - } else { - PRCList *qp; - - qp = tp->timerq.list.next; - jobp = JOB_LINKS_PTR(qp); - - timeout = jobp->absolute - PR_IntervalNow(); - if (timeout <= 0) - timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ - } - if (PR_INTERVAL_NO_WAIT != timeout) - PR_WaitCondVar(tp->timerq.cv, timeout); - if (tp->shutdown) { - PR_Unlock(tp->timerq.lock); - break; - } - /* - * move expired-timer jobs to jobq - */ - now = PR_IntervalNow(); - while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { - qp = tp->timerq.list.next; - jobp = JOB_LINKS_PTR(qp); - - if ((PRInt32)(jobp->absolute - now) > 0) { - break; - } - /* - * job timed out - */ - PR_REMOVE_AND_INIT_LINK(&jobp->links); - tp->timerq.cnt--; - jobp->on_timerq = PR_FALSE; - add_to_jobq(tp, jobp); - } - PR_Unlock(tp->timerq.lock); - } + PRThreadPool *tp = (PRThreadPool *) arg; + PRCList *qp; + PRIntervalTime timeout; + PRIntervalTime now; + + /* + * call PR_WaitCondVar with minimum value of all timeouts + */ + while (!tp->shutdown) { + PRJob *jobp; + + PR_Lock(tp->timerq.lock); + if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { + timeout = PR_INTERVAL_NO_TIMEOUT; + } else { + PRCList *qp; + + qp = tp->timerq.list.next; + jobp = JOB_LINKS_PTR(qp); + + timeout = jobp->absolute - PR_IntervalNow(); + if (timeout <= 0) { + timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ + } + } + if (PR_INTERVAL_NO_WAIT != timeout) { + PR_WaitCondVar(tp->timerq.cv, timeout); + } + if (tp->shutdown) { + PR_Unlock(tp->timerq.lock); + break; + } + /* + * move expired-timer jobs to jobq + */ + now = PR_IntervalNow(); + while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { + qp = tp->timerq.list.next; + jobp = JOB_LINKS_PTR(qp); + + if ((PRInt32)(jobp->absolute - now) > 0) { + break; + } + /* + * job timed out + */ + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->timerq.cnt--; + jobp->on_timerq = PR_FALSE; + add_to_jobq(tp, jobp); + } + PR_Unlock(tp->timerq.lock); + } } static void delete_threadpool(PRThreadPool *tp) { - if (NULL != tp) { - if (NULL != tp->shutdown_cv) - PR_DestroyCondVar(tp->shutdown_cv); - if (NULL != tp->jobq.cv) - PR_DestroyCondVar(tp->jobq.cv); - if (NULL != tp->jobq.lock) - PR_DestroyLock(tp->jobq.lock); - if (NULL != tp->join_lock) - PR_DestroyLock(tp->join_lock); + if (NULL != tp) { + if (NULL != tp->shutdown_cv) { + PR_DestroyCondVar(tp->shutdown_cv); + } + if (NULL != tp->jobq.cv) { + PR_DestroyCondVar(tp->jobq.cv); + } + if (NULL != tp->jobq.lock) { + PR_DestroyLock(tp->jobq.lock); + } + if (NULL != tp->join_lock) { + PR_DestroyLock(tp->join_lock); + } #ifdef OPT_WINNT - if (NULL != tp->jobq.nt_completion_port) - CloseHandle(tp->jobq.nt_completion_port); + if (NULL != tp->jobq.nt_completion_port) { + CloseHandle(tp->jobq.nt_completion_port); + } #endif - /* Timer queue */ - if (NULL != tp->timerq.cv) - PR_DestroyCondVar(tp->timerq.cv); - if (NULL != tp->timerq.lock) - PR_DestroyLock(tp->timerq.lock); - - if (NULL != tp->ioq.lock) - PR_DestroyLock(tp->ioq.lock); - if (NULL != tp->ioq.pollfds) - PR_Free(tp->ioq.pollfds); - if (NULL != tp->ioq.notify_fd) - PR_DestroyPollableEvent(tp->ioq.notify_fd); - PR_Free(tp); - } - return; + /* Timer queue */ + if (NULL != tp->timerq.cv) { + PR_DestroyCondVar(tp->timerq.cv); + } + if (NULL != tp->timerq.lock) { + PR_DestroyLock(tp->timerq.lock); + } + + if (NULL != tp->ioq.lock) { + PR_DestroyLock(tp->ioq.lock); + } + if (NULL != tp->ioq.pollfds) { + PR_Free(tp->ioq.pollfds); + } + if (NULL != tp->ioq.notify_fd) { + PR_DestroyPollableEvent(tp->ioq.notify_fd); + } + PR_Free(tp); + } + return; } static PRThreadPool * alloc_threadpool(void) { -PRThreadPool *tp; - - tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); - if (NULL == tp) - goto failed; - tp->jobq.lock = PR_NewLock(); - if (NULL == tp->jobq.lock) - goto failed; - tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); - if (NULL == tp->jobq.cv) - goto failed; - tp->join_lock = PR_NewLock(); - if (NULL == tp->join_lock) - goto failed; + PRThreadPool *tp; + + tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); + if (NULL == tp) { + goto failed; + } + tp->jobq.lock = PR_NewLock(); + if (NULL == tp->jobq.lock) { + goto failed; + } + tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); + if (NULL == tp->jobq.cv) { + goto failed; + } + tp->join_lock = PR_NewLock(); + if (NULL == tp->join_lock) { + goto failed; + } #ifdef OPT_WINNT - tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, - NULL, 0, 0); - if (NULL == tp->jobq.nt_completion_port) - goto failed; + tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, + NULL, 0, 0); + if (NULL == tp->jobq.nt_completion_port) { + goto failed; + } #endif - tp->ioq.lock = PR_NewLock(); - if (NULL == tp->ioq.lock) - goto failed; - - /* Timer queue */ - - tp->timerq.lock = PR_NewLock(); - if (NULL == tp->timerq.lock) - goto failed; - tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); - if (NULL == tp->timerq.cv) - goto failed; - - tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); - if (NULL == tp->shutdown_cv) - goto failed; - tp->ioq.notify_fd = PR_NewPollableEvent(); - if (NULL == tp->ioq.notify_fd) - goto failed; - return tp; + tp->ioq.lock = PR_NewLock(); + if (NULL == tp->ioq.lock) { + goto failed; + } + + /* Timer queue */ + + tp->timerq.lock = PR_NewLock(); + if (NULL == tp->timerq.lock) { + goto failed; + } + tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); + if (NULL == tp->timerq.cv) { + goto failed; + } + + tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); + if (NULL == tp->shutdown_cv) { + goto failed; + } + tp->ioq.notify_fd = PR_NewPollableEvent(); + if (NULL == tp->ioq.notify_fd) { + goto failed; + } + return tp; failed: - delete_threadpool(tp); - PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); - return NULL; + delete_threadpool(tp); + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); + return NULL; } /* Create thread pool */ PR_IMPLEMENT(PRThreadPool *) PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, - PRUint32 stacksize) + PRUint32 stacksize) { -PRThreadPool *tp; -PRThread *thr; -int i; -wthread *wthrp; - - tp = alloc_threadpool(); - if (NULL == tp) - return NULL; - - tp->init_threads = initial_threads; - tp->max_threads = max_threads; - tp->stacksize = stacksize; - PR_INIT_CLIST(&tp->jobq.list); - PR_INIT_CLIST(&tp->ioq.list); - PR_INIT_CLIST(&tp->timerq.list); - PR_INIT_CLIST(&tp->jobq.wthreads); - PR_INIT_CLIST(&tp->ioq.wthreads); - PR_INIT_CLIST(&tp->timerq.wthreads); - tp->shutdown = PR_FALSE; - - PR_Lock(tp->jobq.lock); - for(i=0; i < initial_threads; ++i) { - - thr = PR_CreateThread(PR_USER_THREAD, wstart, - tp, PR_PRIORITY_NORMAL, - PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); - PR_ASSERT(thr); - wthrp = PR_NEWZAP(wthread); - PR_ASSERT(wthrp); - wthrp->thread = thr; - PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); - } - tp->current_threads = initial_threads; - - thr = PR_CreateThread(PR_USER_THREAD, io_wstart, - tp, PR_PRIORITY_NORMAL, - PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); - PR_ASSERT(thr); - wthrp = PR_NEWZAP(wthread); - PR_ASSERT(wthrp); - wthrp->thread = thr; - PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); - - thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, - tp, PR_PRIORITY_NORMAL, - PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); - PR_ASSERT(thr); - wthrp = PR_NEWZAP(wthread); - PR_ASSERT(wthrp); - wthrp->thread = thr; - PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); - - PR_Unlock(tp->jobq.lock); - return tp; + PRThreadPool *tp; + PRThread *thr; + int i; + wthread *wthrp; + + tp = alloc_threadpool(); + if (NULL == tp) { + return NULL; + } + + tp->init_threads = initial_threads; + tp->max_threads = max_threads; + tp->stacksize = stacksize; + PR_INIT_CLIST(&tp->jobq.list); + PR_INIT_CLIST(&tp->ioq.list); + PR_INIT_CLIST(&tp->timerq.list); + PR_INIT_CLIST(&tp->jobq.wthreads); + PR_INIT_CLIST(&tp->ioq.wthreads); + PR_INIT_CLIST(&tp->timerq.wthreads); + tp->shutdown = PR_FALSE; + + PR_Lock(tp->jobq.lock); + for(i=0; i < initial_threads; ++i) { + + thr = PR_CreateThread(PR_USER_THREAD, wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); + } + tp->current_threads = initial_threads; + + thr = PR_CreateThread(PR_USER_THREAD, io_wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); + + thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, + tp, PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); + PR_ASSERT(thr); + wthrp = PR_NEWZAP(wthread); + PR_ASSERT(wthrp); + wthrp->thread = thr; + PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); + + PR_Unlock(tp->jobq.lock); + return tp; } static void delete_job(PRJob *jobp) { - if (NULL != jobp) { - if (NULL != jobp->join_cv) { - PR_DestroyCondVar(jobp->join_cv); - jobp->join_cv = NULL; - } - if (NULL != jobp->cancel_cv) { - PR_DestroyCondVar(jobp->cancel_cv); - jobp->cancel_cv = NULL; - } - PR_DELETE(jobp); - } + if (NULL != jobp) { + if (NULL != jobp->join_cv) { + PR_DestroyCondVar(jobp->join_cv); + jobp->join_cv = NULL; + } + if (NULL != jobp->cancel_cv) { + PR_DestroyCondVar(jobp->cancel_cv); + jobp->cancel_cv = NULL; + } + PR_DELETE(jobp); + } } static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp) { - PRJob *jobp; - - jobp = PR_NEWZAP(PRJob); - if (NULL == jobp) - goto failed; - if (joinable) { - jobp->join_cv = PR_NewCondVar(tp->join_lock); - jobp->join_wait = PR_TRUE; - if (NULL == jobp->join_cv) - goto failed; - } else { - jobp->join_cv = NULL; - } + PRJob *jobp; + + jobp = PR_NEWZAP(PRJob); + if (NULL == jobp) { + goto failed; + } + if (joinable) { + jobp->join_cv = PR_NewCondVar(tp->join_lock); + jobp->join_wait = PR_TRUE; + if (NULL == jobp->join_cv) { + goto failed; + } + } else { + jobp->join_cv = NULL; + } #ifdef OPT_WINNT - jobp->nt_notifier.jobp = jobp; + jobp->nt_notifier.jobp = jobp; #endif - return jobp; + return jobp; failed: - delete_job(jobp); - PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); - return NULL; + delete_job(jobp); + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); + return NULL; } /* queue a job */ PR_IMPLEMENT(PRJob *) PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) { - PRJob *jobp; + PRJob *jobp; - jobp = alloc_job(joinable, tpool); - if (NULL == jobp) - return NULL; + jobp = alloc_job(joinable, tpool); + if (NULL == jobp) { + return NULL; + } - jobp->job_func = fn; - jobp->job_arg = arg; - jobp->tpool = tpool; + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->tpool = tpool; - add_to_jobq(tpool, jobp); - return jobp; + add_to_jobq(tpool, jobp); + return jobp; } /* queue a job, when a socket is readable or writeable */ static PRJob * queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, - PRBool joinable, io_op_type op) + PRBool joinable, io_op_type op) { - PRJob *jobp; - PRIntervalTime now; - - jobp = alloc_job(joinable, tpool); - if (NULL == jobp) { - return NULL; - } - - /* - * Add a new job to io_jobq - * wakeup io worker thread - */ - - jobp->job_func = fn; - jobp->job_arg = arg; - jobp->tpool = tpool; - jobp->iod = iod; - if (JOB_IO_READ == op) { - jobp->io_op = JOB_IO_READ; - jobp->io_poll_flags = PR_POLL_READ; - } else if (JOB_IO_WRITE == op) { - jobp->io_op = JOB_IO_WRITE; - jobp->io_poll_flags = PR_POLL_WRITE; - } else if (JOB_IO_ACCEPT == op) { - jobp->io_op = JOB_IO_ACCEPT; - jobp->io_poll_flags = PR_POLL_READ; - } else if (JOB_IO_CONNECT == op) { - jobp->io_op = JOB_IO_CONNECT; - jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; - } else { - delete_job(jobp); - PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); - return NULL; - } - - jobp->timeout = iod->timeout; - if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || - (PR_INTERVAL_NO_WAIT == iod->timeout)) { - jobp->absolute = iod->timeout; - } else { - now = PR_IntervalNow(); - jobp->absolute = now + iod->timeout; - } - - - PR_Lock(tpool->ioq.lock); - - if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || - (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { - PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); - } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { - PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); - } else { - PRCList *qp; - PRJob *tmp_jobp; - /* - * insert into the timeout-sorted ioq - */ - for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; - qp = qp->prev) { - tmp_jobp = JOB_LINKS_PTR(qp); - if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { - break; - } - } - PR_INSERT_AFTER(&jobp->links,qp); - } - - jobp->on_ioq = PR_TRUE; - tpool->ioq.cnt++; - /* - * notify io worker thread(s) - */ - PR_Unlock(tpool->ioq.lock); - notify_ioq(tpool); - return jobp; + PRJob *jobp; + PRIntervalTime now; + + jobp = alloc_job(joinable, tpool); + if (NULL == jobp) { + return NULL; + } + + /* + * Add a new job to io_jobq + * wakeup io worker thread + */ + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->tpool = tpool; + jobp->iod = iod; + if (JOB_IO_READ == op) { + jobp->io_op = JOB_IO_READ; + jobp->io_poll_flags = PR_POLL_READ; + } else if (JOB_IO_WRITE == op) { + jobp->io_op = JOB_IO_WRITE; + jobp->io_poll_flags = PR_POLL_WRITE; + } else if (JOB_IO_ACCEPT == op) { + jobp->io_op = JOB_IO_ACCEPT; + jobp->io_poll_flags = PR_POLL_READ; + } else if (JOB_IO_CONNECT == op) { + jobp->io_op = JOB_IO_CONNECT; + jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; + } else { + delete_job(jobp); + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return NULL; + } + + jobp->timeout = iod->timeout; + if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || + (PR_INTERVAL_NO_WAIT == iod->timeout)) { + jobp->absolute = iod->timeout; + } else { + now = PR_IntervalNow(); + jobp->absolute = now + iod->timeout; + } + + + PR_Lock(tpool->ioq.lock); + + if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || + (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { + PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); + } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { + PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); + } else { + PRCList *qp; + PRJob *tmp_jobp; + /* + * insert into the timeout-sorted ioq + */ + for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; + qp = qp->prev) { + tmp_jobp = JOB_LINKS_PTR(qp); + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { + break; + } + } + PR_INSERT_AFTER(&jobp->links,qp); + } + + jobp->on_ioq = PR_TRUE; + tpool->ioq.cnt++; + /* + * notify io worker thread(s) + */ + PR_Unlock(tpool->ioq.lock); + notify_ioq(tpool); + return jobp; } /* queue a job, when a socket is readable */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, - PRBool joinable) + PRBool joinable) { - return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); } /* queue a job, when a socket is writeable */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, - PRBool joinable) + PRBool joinable) { - return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); } /* queue a job, when a socket has a pending connection */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, - void * arg, PRBool joinable) + void * arg, PRBool joinable) { - return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); } /* queue a job, when a socket can be connected */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, - const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) + const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) { - PRStatus rv; - PRErrorCode err; - - rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); - if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ - /* connection pending */ - return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); - } + PRStatus rv; + PRErrorCode err; + + rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); + if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) { + /* connection pending */ + return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); + } /* * connection succeeded or failed; add to jobq right away */ - if (rv == PR_FAILURE) - iod->error = err; - else - iod->error = 0; + if (rv == PR_FAILURE) { + iod->error = err; + } + else { + iod->error = 0; + } return(PR_QueueJob(tpool, fn, arg, joinable)); } @@ -877,311 +919,318 @@ PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, /* queue a job, when a timer expires */ PR_IMPLEMENT(PRJob *) PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, - PRJobFn fn, void * arg, PRBool joinable) + PRJobFn fn, void * arg, PRBool joinable) { - PRIntervalTime now; - PRJob *jobp; - - if (PR_INTERVAL_NO_TIMEOUT == timeout) { - PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); - return NULL; - } - if (PR_INTERVAL_NO_WAIT == timeout) { - /* - * no waiting; add to jobq right away - */ - return(PR_QueueJob(tpool, fn, arg, joinable)); - } - jobp = alloc_job(joinable, tpool); - if (NULL == jobp) { - return NULL; - } - - /* - * Add a new job to timer_jobq - * wakeup timer worker thread - */ - - jobp->job_func = fn; - jobp->job_arg = arg; - jobp->tpool = tpool; - jobp->timeout = timeout; - - now = PR_IntervalNow(); - jobp->absolute = now + timeout; - - - PR_Lock(tpool->timerq.lock); - jobp->on_timerq = PR_TRUE; - if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) - PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); - else { - PRCList *qp; - PRJob *tmp_jobp; - /* - * insert into the sorted timer jobq - */ - for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; - qp = qp->prev) { - tmp_jobp = JOB_LINKS_PTR(qp); - if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { - break; - } - } - PR_INSERT_AFTER(&jobp->links,qp); - } - tpool->timerq.cnt++; - /* - * notify timer worker thread(s) - */ - notify_timerq(tpool); - PR_Unlock(tpool->timerq.lock); - return jobp; + PRIntervalTime now; + PRJob *jobp; + + if (PR_INTERVAL_NO_TIMEOUT == timeout) { + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return NULL; + } + if (PR_INTERVAL_NO_WAIT == timeout) { + /* + * no waiting; add to jobq right away + */ + return(PR_QueueJob(tpool, fn, arg, joinable)); + } + jobp = alloc_job(joinable, tpool); + if (NULL == jobp) { + return NULL; + } + + /* + * Add a new job to timer_jobq + * wakeup timer worker thread + */ + + jobp->job_func = fn; + jobp->job_arg = arg; + jobp->tpool = tpool; + jobp->timeout = timeout; + + now = PR_IntervalNow(); + jobp->absolute = now + timeout; + + + PR_Lock(tpool->timerq.lock); + jobp->on_timerq = PR_TRUE; + if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { + PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); + } + else { + PRCList *qp; + PRJob *tmp_jobp; + /* + * insert into the sorted timer jobq + */ + for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; + qp = qp->prev) { + tmp_jobp = JOB_LINKS_PTR(qp); + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { + break; + } + } + PR_INSERT_AFTER(&jobp->links,qp); + } + tpool->timerq.cnt++; + /* + * notify timer worker thread(s) + */ + notify_timerq(tpool); + PR_Unlock(tpool->timerq.lock); + return jobp; } static void notify_timerq(PRThreadPool *tp) { - /* - * wakeup the timer thread(s) - */ - PR_NotifyCondVar(tp->timerq.cv); + /* + * wakeup the timer thread(s) + */ + PR_NotifyCondVar(tp->timerq.cv); } static void notify_ioq(PRThreadPool *tp) { -PRStatus rval_status; + PRStatus rval_status; - /* - * wakeup the io thread(s) - */ - rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); - PR_ASSERT(PR_SUCCESS == rval_status); + /* + * wakeup the io thread(s) + */ + rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); + PR_ASSERT(PR_SUCCESS == rval_status); } /* * cancel a job * - * XXXX: is this needed? likely to be removed + * XXXX: is this needed? likely to be removed */ PR_IMPLEMENT(PRStatus) PR_CancelJob(PRJob *jobp) { - PRStatus rval = PR_FAILURE; - PRThreadPool *tp; - - if (jobp->on_timerq) { - /* - * now, check again while holding the timerq lock - */ - tp = jobp->tpool; - PR_Lock(tp->timerq.lock); - if (jobp->on_timerq) { - jobp->on_timerq = PR_FALSE; - PR_REMOVE_AND_INIT_LINK(&jobp->links); - tp->timerq.cnt--; - PR_Unlock(tp->timerq.lock); - if (!JOINABLE_JOB(jobp)) { - delete_job(jobp); - } else { - JOIN_NOTIFY(jobp); - } - rval = PR_SUCCESS; - } else - PR_Unlock(tp->timerq.lock); - } else if (jobp->on_ioq) { - /* - * now, check again while holding the ioq lock - */ - tp = jobp->tpool; - PR_Lock(tp->ioq.lock); - if (jobp->on_ioq) { - jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); - if (NULL == jobp->cancel_cv) { - PR_Unlock(tp->ioq.lock); - PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); - return PR_FAILURE; - } - /* - * mark job 'cancelled' and notify io thread(s) - * XXXX: - * this assumes there is only one io thread; when there - * are multiple threads, the io thread processing this job - * must be notified. - */ - jobp->cancel_io = PR_TRUE; - PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ - notify_ioq(tp); - PR_Lock(tp->ioq.lock); - while (jobp->cancel_io) - PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); - PR_Unlock(tp->ioq.lock); - PR_ASSERT(!jobp->on_ioq); - if (!JOINABLE_JOB(jobp)) { - delete_job(jobp); - } else { - JOIN_NOTIFY(jobp); - } - rval = PR_SUCCESS; - } else - PR_Unlock(tp->ioq.lock); - } - if (PR_FAILURE == rval) - PR_SetError(PR_INVALID_STATE_ERROR, 0); - return rval; + PRStatus rval = PR_FAILURE; + PRThreadPool *tp; + + if (jobp->on_timerq) { + /* + * now, check again while holding the timerq lock + */ + tp = jobp->tpool; + PR_Lock(tp->timerq.lock); + if (jobp->on_timerq) { + jobp->on_timerq = PR_FALSE; + PR_REMOVE_AND_INIT_LINK(&jobp->links); + tp->timerq.cnt--; + PR_Unlock(tp->timerq.lock); + if (!JOINABLE_JOB(jobp)) { + delete_job(jobp); + } else { + JOIN_NOTIFY(jobp); + } + rval = PR_SUCCESS; + } else { + PR_Unlock(tp->timerq.lock); + } + } else if (jobp->on_ioq) { + /* + * now, check again while holding the ioq lock + */ + tp = jobp->tpool; + PR_Lock(tp->ioq.lock); + if (jobp->on_ioq) { + jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); + if (NULL == jobp->cancel_cv) { + PR_Unlock(tp->ioq.lock); + PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); + return PR_FAILURE; + } + /* + * mark job 'cancelled' and notify io thread(s) + * XXXX: + * this assumes there is only one io thread; when there + * are multiple threads, the io thread processing this job + * must be notified. + */ + jobp->cancel_io = PR_TRUE; + PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ + notify_ioq(tp); + PR_Lock(tp->ioq.lock); + while (jobp->cancel_io) { + PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); + } + PR_Unlock(tp->ioq.lock); + PR_ASSERT(!jobp->on_ioq); + if (!JOINABLE_JOB(jobp)) { + delete_job(jobp); + } else { + JOIN_NOTIFY(jobp); + } + rval = PR_SUCCESS; + } else { + PR_Unlock(tp->ioq.lock); + } + } + if (PR_FAILURE == rval) { + PR_SetError(PR_INVALID_STATE_ERROR, 0); + } + return rval; } /* join a job, wait until completion */ PR_IMPLEMENT(PRStatus) PR_JoinJob(PRJob *jobp) { - if (!JOINABLE_JOB(jobp)) { - PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); - return PR_FAILURE; - } - PR_Lock(jobp->tpool->join_lock); - while(jobp->join_wait) - PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); - PR_Unlock(jobp->tpool->join_lock); - delete_job(jobp); - return PR_SUCCESS; + if (!JOINABLE_JOB(jobp)) { + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); + return PR_FAILURE; + } + PR_Lock(jobp->tpool->join_lock); + while(jobp->join_wait) { + PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); + } + PR_Unlock(jobp->tpool->join_lock); + delete_job(jobp); + return PR_SUCCESS; } /* shutdown threadpool */ PR_IMPLEMENT(PRStatus) PR_ShutdownThreadPool(PRThreadPool *tpool) { -PRStatus rval = PR_SUCCESS; + PRStatus rval = PR_SUCCESS; - PR_Lock(tpool->jobq.lock); - tpool->shutdown = PR_TRUE; - PR_NotifyAllCondVar(tpool->shutdown_cv); - PR_Unlock(tpool->jobq.lock); + PR_Lock(tpool->jobq.lock); + tpool->shutdown = PR_TRUE; + PR_NotifyAllCondVar(tpool->shutdown_cv); + PR_Unlock(tpool->jobq.lock); - return rval; + return rval; } /* * join thread pool - * wait for termination of worker threads - * reclaim threadpool resources + * wait for termination of worker threads + * reclaim threadpool resources */ PR_IMPLEMENT(PRStatus) PR_JoinThreadPool(PRThreadPool *tpool) { -PRStatus rval = PR_SUCCESS; -PRCList *head; -PRStatus rval_status; + PRStatus rval = PR_SUCCESS; + PRCList *head; + PRStatus rval_status; - PR_Lock(tpool->jobq.lock); - while (!tpool->shutdown) - PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); + PR_Lock(tpool->jobq.lock); + while (!tpool->shutdown) { + PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); + } - /* - * wakeup worker threads - */ + /* + * wakeup worker threads + */ #ifdef OPT_WINNT - /* - * post shutdown notification for all threads - */ - { - int i; - for(i=0; i < tpool->current_threads; i++) { - PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, - TRUE, NULL); - } - } + /* + * post shutdown notification for all threads + */ + { + int i; + for(i=0; i < tpool->current_threads; i++) { + PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, + TRUE, NULL); + } + } #else - PR_NotifyAllCondVar(tpool->jobq.cv); + PR_NotifyAllCondVar(tpool->jobq.cv); #endif - /* - * wakeup io thread(s) - */ - notify_ioq(tpool); - - /* - * wakeup timer thread(s) - */ - PR_Lock(tpool->timerq.lock); - notify_timerq(tpool); - PR_Unlock(tpool->timerq.lock); - - while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { - wthread *wthrp; - - head = PR_LIST_HEAD(&tpool->jobq.wthreads); - PR_REMOVE_AND_INIT_LINK(head); - PR_Unlock(tpool->jobq.lock); - wthrp = WTHREAD_LINKS_PTR(head); - rval_status = PR_JoinThread(wthrp->thread); - PR_ASSERT(PR_SUCCESS == rval_status); - PR_DELETE(wthrp); - PR_Lock(tpool->jobq.lock); - } - PR_Unlock(tpool->jobq.lock); - while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { - wthread *wthrp; - - head = PR_LIST_HEAD(&tpool->ioq.wthreads); - PR_REMOVE_AND_INIT_LINK(head); - wthrp = WTHREAD_LINKS_PTR(head); - rval_status = PR_JoinThread(wthrp->thread); - PR_ASSERT(PR_SUCCESS == rval_status); - PR_DELETE(wthrp); - } - - while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { - wthread *wthrp; - - head = PR_LIST_HEAD(&tpool->timerq.wthreads); - PR_REMOVE_AND_INIT_LINK(head); - wthrp = WTHREAD_LINKS_PTR(head); - rval_status = PR_JoinThread(wthrp->thread); - PR_ASSERT(PR_SUCCESS == rval_status); - PR_DELETE(wthrp); - } - - /* - * Delete queued jobs - */ - while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { - PRJob *jobp; - - head = PR_LIST_HEAD(&tpool->jobq.list); - PR_REMOVE_AND_INIT_LINK(head); - jobp = JOB_LINKS_PTR(head); - tpool->jobq.cnt--; - delete_job(jobp); - } - - /* delete io jobs */ - while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { - PRJob *jobp; - - head = PR_LIST_HEAD(&tpool->ioq.list); - PR_REMOVE_AND_INIT_LINK(head); - tpool->ioq.cnt--; - jobp = JOB_LINKS_PTR(head); - delete_job(jobp); - } - - /* delete timer jobs */ - while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { - PRJob *jobp; - - head = PR_LIST_HEAD(&tpool->timerq.list); - PR_REMOVE_AND_INIT_LINK(head); - tpool->timerq.cnt--; - jobp = JOB_LINKS_PTR(head); - delete_job(jobp); - } - - PR_ASSERT(0 == tpool->jobq.cnt); - PR_ASSERT(0 == tpool->ioq.cnt); - PR_ASSERT(0 == tpool->timerq.cnt); - - delete_threadpool(tpool); - return rval; + /* + * wakeup io thread(s) + */ + notify_ioq(tpool); + + /* + * wakeup timer thread(s) + */ + PR_Lock(tpool->timerq.lock); + notify_timerq(tpool); + PR_Unlock(tpool->timerq.lock); + + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->jobq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + PR_Unlock(tpool->jobq.lock); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + PR_Lock(tpool->jobq.lock); + } + PR_Unlock(tpool->jobq.lock); + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->ioq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + } + + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { + wthread *wthrp; + + head = PR_LIST_HEAD(&tpool->timerq.wthreads); + PR_REMOVE_AND_INIT_LINK(head); + wthrp = WTHREAD_LINKS_PTR(head); + rval_status = PR_JoinThread(wthrp->thread); + PR_ASSERT(PR_SUCCESS == rval_status); + PR_DELETE(wthrp); + } + + /* + * Delete queued jobs + */ + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->jobq.list); + PR_REMOVE_AND_INIT_LINK(head); + jobp = JOB_LINKS_PTR(head); + tpool->jobq.cnt--; + delete_job(jobp); + } + + /* delete io jobs */ + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->ioq.list); + PR_REMOVE_AND_INIT_LINK(head); + tpool->ioq.cnt--; + jobp = JOB_LINKS_PTR(head); + delete_job(jobp); + } + + /* delete timer jobs */ + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { + PRJob *jobp; + + head = PR_LIST_HEAD(&tpool->timerq.list); + PR_REMOVE_AND_INIT_LINK(head); + tpool->timerq.cnt--; + jobp = JOB_LINKS_PTR(head); + delete_job(jobp); + } + + PR_ASSERT(0 == tpool->jobq.cnt); + PR_ASSERT(0 == tpool->ioq.cnt); + PR_ASSERT(0 == tpool->timerq.cnt); + + delete_threadpool(tpool); + return rval; } |