summaryrefslogtreecommitdiffstats
path: root/nsprpub/pr/src/misc/prtpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'nsprpub/pr/src/misc/prtpool.c')
-rw-r--r--nsprpub/pr/src/misc/prtpool.c1919
1 files changed, 984 insertions, 935 deletions
diff --git a/nsprpub/pr/src/misc/prtpool.c b/nsprpub/pr/src/misc/prtpool.c
index c2cc9c803..69f588ef6 100644
--- a/nsprpub/pr/src/misc/prtpool.c
+++ b/nsprpub/pr/src/misc/prtpool.c
@@ -7,8 +7,8 @@
/*
* Thread pools
- * Thread pools create and manage threads to provide support for
- * scheduling jobs onto one or more threads.
+ * Thread pools create and manage threads to provide support for
+ * scheduling jobs onto one or more threads.
*
*/
#ifdef OPT_WINNT
@@ -19,32 +19,32 @@
* worker thread
*/
typedef struct wthread {
- PRCList links;
- PRThread *thread;
+ PRCList links;
+ PRThread *thread;
} wthread;
/*
* queue of timer jobs
*/
typedef struct timer_jobq {
- PRCList list;
- PRLock *lock;
- PRCondVar *cv;
- PRInt32 cnt;
- PRCList wthreads;
+ PRCList list;
+ PRLock *lock;
+ PRCondVar *cv;
+ PRInt32 cnt;
+ PRCList wthreads;
} timer_jobq;
/*
* queue of jobs
*/
typedef struct tp_jobq {
- PRCList list;
- PRInt32 cnt;
- PRLock *lock;
- PRCondVar *cv;
- PRCList wthreads;
+ PRCList list;
+ PRInt32 cnt;
+ PRLock *lock;
+ PRCondVar *cv;
+ PRCList wthreads;
#ifdef OPT_WINNT
- HANDLE nt_completion_port;
+ HANDLE nt_completion_port;
#endif
} tp_jobq;
@@ -52,62 +52,62 @@ typedef struct tp_jobq {
* queue of IO jobs
*/
typedef struct io_jobq {
- PRCList list;
- PRPollDesc *pollfds;
- PRInt32 npollfds;
- PRJob **polljobs;
- PRLock *lock;
- PRInt32 cnt;
- PRFileDesc *notify_fd;
- PRCList wthreads;
+ PRCList list;
+ PRPollDesc *pollfds;
+ PRInt32 npollfds;
+ PRJob **polljobs;
+ PRLock *lock;
+ PRInt32 cnt;
+ PRFileDesc *notify_fd;
+ PRCList wthreads;
} io_jobq;
/*
* Threadpool
*/
struct PRThreadPool {
- PRInt32 init_threads;
- PRInt32 max_threads;
- PRInt32 current_threads;
- PRInt32 idle_threads;
- PRUint32 stacksize;
- tp_jobq jobq;
- io_jobq ioq;
- timer_jobq timerq;
- PRLock *join_lock; /* used with jobp->join_cv */
- PRCondVar *shutdown_cv;
- PRBool shutdown;
+ PRInt32 init_threads;
+ PRInt32 max_threads;
+ PRInt32 current_threads;
+ PRInt32 idle_threads;
+ PRUint32 stacksize;
+ tp_jobq jobq;
+ io_jobq ioq;
+ timer_jobq timerq;
+ PRLock *join_lock; /* used with jobp->join_cv */
+ PRCondVar *shutdown_cv;
+ PRBool shutdown;
};
typedef enum io_op_type
- { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
+{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
#ifdef OPT_WINNT
typedef struct NT_notifier {
- OVERLAPPED overlapped; /* must be first */
- PRJob *jobp;
+ OVERLAPPED overlapped; /* must be first */
+ PRJob *jobp;
} NT_notifier;
#endif
struct PRJob {
- PRCList links; /* for linking jobs */
- PRBool on_ioq; /* job on ioq */
- PRBool on_timerq; /* job on timerq */
- PRJobFn job_func;
- void *job_arg;
- PRCondVar *join_cv;
- PRBool join_wait; /* == PR_TRUE, when waiting to join */
- PRCondVar *cancel_cv; /* for cancelling IO jobs */
- PRBool cancel_io; /* for cancelling IO jobs */
- PRThreadPool *tpool; /* back pointer to thread pool */
- PRJobIoDesc *iod;
- io_op_type io_op;
- PRInt16 io_poll_flags;
- PRNetAddr *netaddr;
- PRIntervalTime timeout; /* relative value */
- PRIntervalTime absolute;
+ PRCList links; /* for linking jobs */
+ PRBool on_ioq; /* job on ioq */
+ PRBool on_timerq; /* job on timerq */
+ PRJobFn job_func;
+ void *job_arg;
+ PRCondVar *join_cv;
+ PRBool join_wait; /* == PR_TRUE, when waiting to join */
+ PRCondVar *cancel_cv; /* for cancelling IO jobs */
+ PRBool cancel_io; /* for cancelling IO jobs */
+ PRThreadPool *tpool; /* back pointer to thread pool */
+ PRJobIoDesc *iod;
+ io_op_type io_op;
+ PRInt16 io_poll_flags;
+ PRNetAddr *netaddr;
+ PRIntervalTime timeout; /* relative value */
+ PRIntervalTime absolute;
#ifdef OPT_WINNT
- NT_notifier nt_notifier;
+ NT_notifier nt_notifier;
#endif
};
@@ -119,22 +119,22 @@ struct PRJob {
#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
-#define JOIN_NOTIFY(_jobp) \
- PR_BEGIN_MACRO \
- PR_Lock(_jobp->tpool->join_lock); \
- _jobp->join_wait = PR_FALSE; \
- PR_NotifyCondVar(_jobp->join_cv); \
- PR_Unlock(_jobp->tpool->join_lock); \
- PR_END_MACRO
-
-#define CANCEL_IO_JOB(jobp) \
- PR_BEGIN_MACRO \
- jobp->cancel_io = PR_FALSE; \
- jobp->on_ioq = PR_FALSE; \
- PR_REMOVE_AND_INIT_LINK(&jobp->links); \
- tp->ioq.cnt--; \
- PR_NotifyCondVar(jobp->cancel_cv); \
- PR_END_MACRO
+#define JOIN_NOTIFY(_jobp) \
+ PR_BEGIN_MACRO \
+ PR_Lock(_jobp->tpool->join_lock); \
+ _jobp->join_wait = PR_FALSE; \
+ PR_NotifyCondVar(_jobp->join_cv); \
+ PR_Unlock(_jobp->tpool->join_lock); \
+ PR_END_MACRO
+
+#define CANCEL_IO_JOB(jobp) \
+ PR_BEGIN_MACRO \
+ jobp->cancel_io = PR_FALSE; \
+ jobp->on_ioq = PR_FALSE; \
+ PR_REMOVE_AND_INIT_LINK(&jobp->links); \
+ tp->ioq.cnt--; \
+ PR_NotifyCondVar(jobp->cancel_cv); \
+ PR_END_MACRO
static void delete_job(PRJob *jobp);
static PRThreadPool * alloc_threadpool(void);
@@ -145,10 +145,10 @@ static void notify_timerq(PRThreadPool *tp);
/*
* locks are acquired in the following order
*
- * tp->ioq.lock,tp->timerq.lock
- * |
- * V
- * tp->jobq->lock
+ * tp->ioq.lock,tp->timerq.lock
+ * |
+ * V
+ * tp->jobq->lock
*/
/*
@@ -156,65 +156,66 @@ static void notify_timerq(PRThreadPool *tp);
*/
static void wstart(void *arg)
{
-PRThreadPool *tp = (PRThreadPool *) arg;
-PRCList *head;
-
- /*
- * execute jobs until shutdown
- */
- while (!tp->shutdown) {
- PRJob *jobp;
+ PRThreadPool *tp = (PRThreadPool *) arg;
+ PRCList *head;
+
+ /*
+ * execute jobs until shutdown
+ */
+ while (!tp->shutdown) {
+ PRJob *jobp;
#ifdef OPT_WINNT
- BOOL rv;
- DWORD unused, shutdown;
- LPOVERLAPPED olp;
-
- PR_Lock(tp->jobq.lock);
- tp->idle_threads++;
- PR_Unlock(tp->jobq.lock);
- rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
- &unused, &shutdown, &olp, INFINITE);
-
- PR_ASSERT(rv);
- if (shutdown)
- break;
- jobp = ((NT_notifier *) olp)->jobp;
- PR_Lock(tp->jobq.lock);
- tp->idle_threads--;
- tp->jobq.cnt--;
- PR_Unlock(tp->jobq.lock);
+ BOOL rv;
+ DWORD unused, shutdown;
+ LPOVERLAPPED olp;
+
+ PR_Lock(tp->jobq.lock);
+ tp->idle_threads++;
+ PR_Unlock(tp->jobq.lock);
+ rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
+ &unused, &shutdown, &olp, INFINITE);
+
+ PR_ASSERT(rv);
+ if (shutdown) {
+ break;
+ }
+ jobp = ((NT_notifier *) olp)->jobp;
+ PR_Lock(tp->jobq.lock);
+ tp->idle_threads--;
+ tp->jobq.cnt--;
+ PR_Unlock(tp->jobq.lock);
#else
- PR_Lock(tp->jobq.lock);
- while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
- tp->idle_threads++;
- PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
- tp->idle_threads--;
- }
- if (tp->shutdown) {
- PR_Unlock(tp->jobq.lock);
- break;
- }
- head = PR_LIST_HEAD(&tp->jobq.list);
- /*
- * remove job from queue
- */
- PR_REMOVE_AND_INIT_LINK(head);
- tp->jobq.cnt--;
- jobp = JOB_LINKS_PTR(head);
- PR_Unlock(tp->jobq.lock);
+ PR_Lock(tp->jobq.lock);
+ while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
+ tp->idle_threads++;
+ PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
+ tp->idle_threads--;
+ }
+ if (tp->shutdown) {
+ PR_Unlock(tp->jobq.lock);
+ break;
+ }
+ head = PR_LIST_HEAD(&tp->jobq.list);
+ /*
+ * remove job from queue
+ */
+ PR_REMOVE_AND_INIT_LINK(head);
+ tp->jobq.cnt--;
+ jobp = JOB_LINKS_PTR(head);
+ PR_Unlock(tp->jobq.lock);
#endif
- jobp->job_func(jobp->job_arg);
- if (!JOINABLE_JOB(jobp)) {
- delete_job(jobp);
- } else {
- JOIN_NOTIFY(jobp);
- }
- }
- PR_Lock(tp->jobq.lock);
- tp->current_threads--;
- PR_Unlock(tp->jobq.lock);
+ jobp->job_func(jobp->job_arg);
+ if (!JOINABLE_JOB(jobp)) {
+ delete_job(jobp);
+ } else {
+ JOIN_NOTIFY(jobp);
+ }
+ }
+ PR_Lock(tp->jobq.lock);
+ tp->current_threads--;
+ PR_Unlock(tp->jobq.lock);
}
/*
@@ -223,52 +224,52 @@ PRCList *head;
static void
add_to_jobq(PRThreadPool *tp, PRJob *jobp)
{
- /*
- * add to jobq
- */
+ /*
+ * add to jobq
+ */
#ifdef OPT_WINNT
- PR_Lock(tp->jobq.lock);
- tp->jobq.cnt++;
- PR_Unlock(tp->jobq.lock);
- /*
- * notify worker thread(s)
- */
- PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
- FALSE, &jobp->nt_notifier.overlapped);
+ PR_Lock(tp->jobq.lock);
+ tp->jobq.cnt++;
+ PR_Unlock(tp->jobq.lock);
+ /*
+ * notify worker thread(s)
+ */
+ PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
+ FALSE, &jobp->nt_notifier.overlapped);
#else
- PR_Lock(tp->jobq.lock);
- PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
- tp->jobq.cnt++;
- if ((tp->idle_threads < tp->jobq.cnt) &&
- (tp->current_threads < tp->max_threads)) {
- wthread *wthrp;
- /*
- * increment thread count and unlock the jobq lock
- */
- tp->current_threads++;
- PR_Unlock(tp->jobq.lock);
- /* create new worker thread */
- wthrp = PR_NEWZAP(wthread);
- if (wthrp) {
- wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
- tp, PR_PRIORITY_NORMAL,
- PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
- if (NULL == wthrp->thread) {
- PR_DELETE(wthrp); /* this sets wthrp to NULL */
- }
- }
- PR_Lock(tp->jobq.lock);
- if (NULL == wthrp) {
- tp->current_threads--;
- } else {
- PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
- }
- }
- /*
- * wakeup a worker thread
- */
- PR_NotifyCondVar(tp->jobq.cv);
- PR_Unlock(tp->jobq.lock);
+ PR_Lock(tp->jobq.lock);
+ PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
+ tp->jobq.cnt++;
+ if ((tp->idle_threads < tp->jobq.cnt) &&
+ (tp->current_threads < tp->max_threads)) {
+ wthread *wthrp;
+ /*
+ * increment thread count and unlock the jobq lock
+ */
+ tp->current_threads++;
+ PR_Unlock(tp->jobq.lock);
+ /* create new worker thread */
+ wthrp = PR_NEWZAP(wthread);
+ if (wthrp) {
+ wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
+ tp, PR_PRIORITY_NORMAL,
+ PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
+ if (NULL == wthrp->thread) {
+ PR_DELETE(wthrp); /* this sets wthrp to NULL */
+ }
+ }
+ PR_Lock(tp->jobq.lock);
+ if (NULL == wthrp) {
+ tp->current_threads--;
+ } else {
+ PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
+ }
+ }
+ /*
+ * wakeup a worker thread
+ */
+ PR_NotifyCondVar(tp->jobq.cv);
+ PR_Unlock(tp->jobq.lock);
#endif
}
@@ -277,207 +278,220 @@ add_to_jobq(PRThreadPool *tp, PRJob *jobp)
*/
static void io_wstart(void *arg)
{
-PRThreadPool *tp = (PRThreadPool *) arg;
-int pollfd_cnt, pollfds_used;
-int rv;
-PRCList *qp, *nextqp;
-PRPollDesc *pollfds = NULL;
-PRJob **polljobs = NULL;
-int poll_timeout;
-PRIntervalTime now;
-
- /*
- * scan io_jobq
- * construct poll list
- * call PR_Poll
- * for all fds, for which poll returns true, move the job to
- * jobq and wakeup worker thread.
- */
- while (!tp->shutdown) {
- PRJob *jobp;
-
- pollfd_cnt = tp->ioq.cnt + 10;
- if (pollfd_cnt > tp->ioq.npollfds) {
-
- /*
- * re-allocate pollfd array if the current one is not large
- * enough
- */
- if (NULL != tp->ioq.pollfds)
- PR_Free(tp->ioq.pollfds);
- tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
- (sizeof(PRPollDesc) + sizeof(PRJob *)));
- PR_ASSERT(NULL != tp->ioq.pollfds);
- /*
- * array of pollfds
- */
- pollfds = tp->ioq.pollfds;
- tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
- /*
- * parallel array of jobs
- */
- polljobs = tp->ioq.polljobs;
- tp->ioq.npollfds = pollfd_cnt;
- }
-
- pollfds_used = 0;
- /*
- * add the notify fd; used for unblocking io thread(s)
- */
- pollfds[pollfds_used].fd = tp->ioq.notify_fd;
- pollfds[pollfds_used].in_flags = PR_POLL_READ;
- pollfds[pollfds_used].out_flags = 0;
- polljobs[pollfds_used] = NULL;
- pollfds_used++;
- /*
- * fill in the pollfd array
- */
- PR_Lock(tp->ioq.lock);
- for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
- nextqp = qp->next;
- jobp = JOB_LINKS_PTR(qp);
- if (jobp->cancel_io) {
- CANCEL_IO_JOB(jobp);
- continue;
- }
- if (pollfds_used == (pollfd_cnt))
- break;
- pollfds[pollfds_used].fd = jobp->iod->socket;
- pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
- pollfds[pollfds_used].out_flags = 0;
- polljobs[pollfds_used] = jobp;
-
- pollfds_used++;
- }
- if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
- qp = tp->ioq.list.next;
- jobp = JOB_LINKS_PTR(qp);
- if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
- poll_timeout = PR_INTERVAL_NO_TIMEOUT;
- else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
- poll_timeout = PR_INTERVAL_NO_WAIT;
- else {
- poll_timeout = jobp->absolute - PR_IntervalNow();
- if (poll_timeout <= 0) /* already timed out */
- poll_timeout = PR_INTERVAL_NO_WAIT;
- }
- } else {
- poll_timeout = PR_INTERVAL_NO_TIMEOUT;
- }
- PR_Unlock(tp->ioq.lock);
-
- /*
- * XXXX
- * should retry if more jobs have been added to the queue?
- *
- */
- PR_ASSERT(pollfds_used <= pollfd_cnt);
- rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
-
- if (tp->shutdown) {
- break;
- }
-
- if (rv > 0) {
- /*
- * at least one io event is set
- */
- PRStatus rval_status;
- PRInt32 index;
-
- PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
- /*
- * reset the pollable event, if notified
- */
- if (pollfds[0].out_flags & PR_POLL_READ) {
- rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
- PR_ASSERT(PR_SUCCESS == rval_status);
- }
-
- for(index = 1; index < (pollfds_used); index++) {
+ PRThreadPool *tp = (PRThreadPool *) arg;
+ int pollfd_cnt, pollfds_used;
+ int rv;
+ PRCList *qp, *nextqp;
+ PRPollDesc *pollfds = NULL;
+ PRJob **polljobs = NULL;
+ int poll_timeout;
+ PRIntervalTime now;
+
+ /*
+ * scan io_jobq
+ * construct poll list
+ * call PR_Poll
+ * for all fds, for which poll returns true, move the job to
+ * jobq and wakeup worker thread.
+ */
+ while (!tp->shutdown) {
+ PRJob *jobp;
+
+ pollfd_cnt = tp->ioq.cnt + 10;
+ if (pollfd_cnt > tp->ioq.npollfds) {
+
+ /*
+ * re-allocate pollfd array if the current one is not large
+ * enough
+ */
+ if (NULL != tp->ioq.pollfds) {
+ PR_Free(tp->ioq.pollfds);
+ }
+ tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
+ (sizeof(PRPollDesc) + sizeof(PRJob *)));
+ PR_ASSERT(NULL != tp->ioq.pollfds);
+ /*
+ * array of pollfds
+ */
+ pollfds = tp->ioq.pollfds;
+ tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
+ /*
+ * parallel array of jobs
+ */
+ polljobs = tp->ioq.polljobs;
+ tp->ioq.npollfds = pollfd_cnt;
+ }
+
+ pollfds_used = 0;
+ /*
+ * add the notify fd; used for unblocking io thread(s)
+ */
+ pollfds[pollfds_used].fd = tp->ioq.notify_fd;
+ pollfds[pollfds_used].in_flags = PR_POLL_READ;
+ pollfds[pollfds_used].out_flags = 0;
+ polljobs[pollfds_used] = NULL;
+ pollfds_used++;
+ /*
+ * fill in the pollfd array
+ */
+ PR_Lock(tp->ioq.lock);
+ for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
+ nextqp = qp->next;
+ jobp = JOB_LINKS_PTR(qp);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ continue;
+ }
+ if (pollfds_used == (pollfd_cnt)) {
+ break;
+ }
+ pollfds[pollfds_used].fd = jobp->iod->socket;
+ pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
+ pollfds[pollfds_used].out_flags = 0;
+ polljobs[pollfds_used] = jobp;
+
+ pollfds_used++;
+ }
+ if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
+ qp = tp->ioq.list.next;
+ jobp = JOB_LINKS_PTR(qp);
+ if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
+ poll_timeout = PR_INTERVAL_NO_TIMEOUT;
+ }
+ else if (PR_INTERVAL_NO_WAIT == jobp->timeout) {
+ poll_timeout = PR_INTERVAL_NO_WAIT;
+ }
+ else {
+ poll_timeout = jobp->absolute - PR_IntervalNow();
+ if (poll_timeout <= 0) { /* already timed out */
+ poll_timeout = PR_INTERVAL_NO_WAIT;
+ }
+ }
+ } else {
+ poll_timeout = PR_INTERVAL_NO_TIMEOUT;
+ }
+ PR_Unlock(tp->ioq.lock);
+
+ /*
+ * XXXX
+ * should retry if more jobs have been added to the queue?
+ *
+ */
+ PR_ASSERT(pollfds_used <= pollfd_cnt);
+ rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
+
+ if (tp->shutdown) {
+ break;
+ }
+
+ if (rv > 0) {
+ /*
+ * at least one io event is set
+ */
+ PRStatus rval_status;
+ PRInt32 index;
+
+ PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
+ /*
+ * reset the pollable event, if notified
+ */
+ if (pollfds[0].out_flags & PR_POLL_READ) {
+ rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
+ PR_ASSERT(PR_SUCCESS == rval_status);
+ }
+
+ for(index = 1; index < (pollfds_used); index++) {
PRInt16 events = pollfds[index].in_flags;
- PRInt16 revents = pollfds[index].out_flags;
- jobp = polljobs[index];
+ PRInt16 revents = pollfds[index].out_flags;
+ jobp = polljobs[index];
if ((revents & PR_POLL_NVAL) || /* busted in all cases */
- (revents & PR_POLL_ERR) ||
- ((events & PR_POLL_WRITE) &&
- (revents & PR_POLL_HUP))) { /* write op & hup */
- PR_Lock(tp->ioq.lock);
- if (jobp->cancel_io) {
- CANCEL_IO_JOB(jobp);
- PR_Unlock(tp->ioq.lock);
- continue;
- }
- PR_REMOVE_AND_INIT_LINK(&jobp->links);
- tp->ioq.cnt--;
- jobp->on_ioq = PR_FALSE;
- PR_Unlock(tp->ioq.lock);
-
- /* set error */
- if (PR_POLL_NVAL & revents)
- jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
- else if (PR_POLL_HUP & revents)
- jobp->iod->error = PR_CONNECT_RESET_ERROR;
- else
- jobp->iod->error = PR_IO_ERROR;
-
- /*
- * add to jobq
- */
- add_to_jobq(tp, jobp);
- } else if (revents) {
- /*
- * add to jobq
- */
- PR_Lock(tp->ioq.lock);
- if (jobp->cancel_io) {
- CANCEL_IO_JOB(jobp);
- PR_Unlock(tp->ioq.lock);
- continue;
- }
- PR_REMOVE_AND_INIT_LINK(&jobp->links);
- tp->ioq.cnt--;
- jobp->on_ioq = PR_FALSE;
- PR_Unlock(tp->ioq.lock);
-
- if (jobp->io_op == JOB_IO_CONNECT) {
- if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
- jobp->iod->error = 0;
- else
- jobp->iod->error = PR_GetError();
- } else
- jobp->iod->error = 0;
-
- add_to_jobq(tp, jobp);
- }
- }
- }
- /*
- * timeout processing
- */
- now = PR_IntervalNow();
- PR_Lock(tp->ioq.lock);
- for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
- nextqp = qp->next;
- jobp = JOB_LINKS_PTR(qp);
- if (jobp->cancel_io) {
- CANCEL_IO_JOB(jobp);
- continue;
- }
- if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
- break;
- if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
- ((PRInt32)(jobp->absolute - now) > 0))
- break;
- PR_REMOVE_AND_INIT_LINK(&jobp->links);
- tp->ioq.cnt--;
- jobp->on_ioq = PR_FALSE;
- jobp->iod->error = PR_IO_TIMEOUT_ERROR;
- add_to_jobq(tp, jobp);
- }
- PR_Unlock(tp->ioq.lock);
- }
+ (revents & PR_POLL_ERR) ||
+ ((events & PR_POLL_WRITE) &&
+ (revents & PR_POLL_HUP))) { /* write op & hup */
+ PR_Lock(tp->ioq.lock);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ PR_Unlock(tp->ioq.lock);
+ continue;
+ }
+ PR_REMOVE_AND_INIT_LINK(&jobp->links);
+ tp->ioq.cnt--;
+ jobp->on_ioq = PR_FALSE;
+ PR_Unlock(tp->ioq.lock);
+
+ /* set error */
+ if (PR_POLL_NVAL & revents) {
+ jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
+ }
+ else if (PR_POLL_HUP & revents) {
+ jobp->iod->error = PR_CONNECT_RESET_ERROR;
+ }
+ else {
+ jobp->iod->error = PR_IO_ERROR;
+ }
+
+ /*
+ * add to jobq
+ */
+ add_to_jobq(tp, jobp);
+ } else if (revents) {
+ /*
+ * add to jobq
+ */
+ PR_Lock(tp->ioq.lock);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ PR_Unlock(tp->ioq.lock);
+ continue;
+ }
+ PR_REMOVE_AND_INIT_LINK(&jobp->links);
+ tp->ioq.cnt--;
+ jobp->on_ioq = PR_FALSE;
+ PR_Unlock(tp->ioq.lock);
+
+ if (jobp->io_op == JOB_IO_CONNECT) {
+ if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) {
+ jobp->iod->error = 0;
+ }
+ else {
+ jobp->iod->error = PR_GetError();
+ }
+ } else {
+ jobp->iod->error = 0;
+ }
+
+ add_to_jobq(tp, jobp);
+ }
+ }
+ }
+ /*
+ * timeout processing
+ */
+ now = PR_IntervalNow();
+ PR_Lock(tp->ioq.lock);
+ for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
+ nextqp = qp->next;
+ jobp = JOB_LINKS_PTR(qp);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ continue;
+ }
+ if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
+ break;
+ }
+ if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
+ ((PRInt32)(jobp->absolute - now) > 0)) {
+ break;
+ }
+ PR_REMOVE_AND_INIT_LINK(&jobp->links);
+ tp->ioq.cnt--;
+ jobp->on_ioq = PR_FALSE;
+ jobp->iod->error = PR_IO_TIMEOUT_ERROR;
+ add_to_jobq(tp, jobp);
+ }
+ PR_Unlock(tp->ioq.lock);
+ }
}
/*
@@ -485,391 +499,419 @@ PRIntervalTime now;
*/
static void timer_wstart(void *arg)
{
-PRThreadPool *tp = (PRThreadPool *) arg;
-PRCList *qp;
-PRIntervalTime timeout;
-PRIntervalTime now;
-
- /*
- * call PR_WaitCondVar with minimum value of all timeouts
- */
- while (!tp->shutdown) {
- PRJob *jobp;
-
- PR_Lock(tp->timerq.lock);
- if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
- timeout = PR_INTERVAL_NO_TIMEOUT;
- } else {
- PRCList *qp;
-
- qp = tp->timerq.list.next;
- jobp = JOB_LINKS_PTR(qp);
-
- timeout = jobp->absolute - PR_IntervalNow();
- if (timeout <= 0)
- timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
- }
- if (PR_INTERVAL_NO_WAIT != timeout)
- PR_WaitCondVar(tp->timerq.cv, timeout);
- if (tp->shutdown) {
- PR_Unlock(tp->timerq.lock);
- break;
- }
- /*
- * move expired-timer jobs to jobq
- */
- now = PR_IntervalNow();
- while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
- qp = tp->timerq.list.next;
- jobp = JOB_LINKS_PTR(qp);
-
- if ((PRInt32)(jobp->absolute - now) > 0) {
- break;
- }
- /*
- * job timed out
- */
- PR_REMOVE_AND_INIT_LINK(&jobp->links);
- tp->timerq.cnt--;
- jobp->on_timerq = PR_FALSE;
- add_to_jobq(tp, jobp);
- }
- PR_Unlock(tp->timerq.lock);
- }
+ PRThreadPool *tp = (PRThreadPool *) arg;
+ PRCList *qp;
+ PRIntervalTime timeout;
+ PRIntervalTime now;
+
+ /*
+ * call PR_WaitCondVar with minimum value of all timeouts
+ */
+ while (!tp->shutdown) {
+ PRJob *jobp;
+
+ PR_Lock(tp->timerq.lock);
+ if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
+ timeout = PR_INTERVAL_NO_TIMEOUT;
+ } else {
+ PRCList *qp;
+
+ qp = tp->timerq.list.next;
+ jobp = JOB_LINKS_PTR(qp);
+
+ timeout = jobp->absolute - PR_IntervalNow();
+ if (timeout <= 0) {
+ timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
+ }
+ }
+ if (PR_INTERVAL_NO_WAIT != timeout) {
+ PR_WaitCondVar(tp->timerq.cv, timeout);
+ }
+ if (tp->shutdown) {
+ PR_Unlock(tp->timerq.lock);
+ break;
+ }
+ /*
+ * move expired-timer jobs to jobq
+ */
+ now = PR_IntervalNow();
+ while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
+ qp = tp->timerq.list.next;
+ jobp = JOB_LINKS_PTR(qp);
+
+ if ((PRInt32)(jobp->absolute - now) > 0) {
+ break;
+ }
+ /*
+ * job timed out
+ */
+ PR_REMOVE_AND_INIT_LINK(&jobp->links);
+ tp->timerq.cnt--;
+ jobp->on_timerq = PR_FALSE;
+ add_to_jobq(tp, jobp);
+ }
+ PR_Unlock(tp->timerq.lock);
+ }
}
static void
delete_threadpool(PRThreadPool *tp)
{
- if (NULL != tp) {
- if (NULL != tp->shutdown_cv)
- PR_DestroyCondVar(tp->shutdown_cv);
- if (NULL != tp->jobq.cv)
- PR_DestroyCondVar(tp->jobq.cv);
- if (NULL != tp->jobq.lock)
- PR_DestroyLock(tp->jobq.lock);
- if (NULL != tp->join_lock)
- PR_DestroyLock(tp->join_lock);
+ if (NULL != tp) {
+ if (NULL != tp->shutdown_cv) {
+ PR_DestroyCondVar(tp->shutdown_cv);
+ }
+ if (NULL != tp->jobq.cv) {
+ PR_DestroyCondVar(tp->jobq.cv);
+ }
+ if (NULL != tp->jobq.lock) {
+ PR_DestroyLock(tp->jobq.lock);
+ }
+ if (NULL != tp->join_lock) {
+ PR_DestroyLock(tp->join_lock);
+ }
#ifdef OPT_WINNT
- if (NULL != tp->jobq.nt_completion_port)
- CloseHandle(tp->jobq.nt_completion_port);
+ if (NULL != tp->jobq.nt_completion_port) {
+ CloseHandle(tp->jobq.nt_completion_port);
+ }
#endif
- /* Timer queue */
- if (NULL != tp->timerq.cv)
- PR_DestroyCondVar(tp->timerq.cv);
- if (NULL != tp->timerq.lock)
- PR_DestroyLock(tp->timerq.lock);
-
- if (NULL != tp->ioq.lock)
- PR_DestroyLock(tp->ioq.lock);
- if (NULL != tp->ioq.pollfds)
- PR_Free(tp->ioq.pollfds);
- if (NULL != tp->ioq.notify_fd)
- PR_DestroyPollableEvent(tp->ioq.notify_fd);
- PR_Free(tp);
- }
- return;
+ /* Timer queue */
+ if (NULL != tp->timerq.cv) {
+ PR_DestroyCondVar(tp->timerq.cv);
+ }
+ if (NULL != tp->timerq.lock) {
+ PR_DestroyLock(tp->timerq.lock);
+ }
+
+ if (NULL != tp->ioq.lock) {
+ PR_DestroyLock(tp->ioq.lock);
+ }
+ if (NULL != tp->ioq.pollfds) {
+ PR_Free(tp->ioq.pollfds);
+ }
+ if (NULL != tp->ioq.notify_fd) {
+ PR_DestroyPollableEvent(tp->ioq.notify_fd);
+ }
+ PR_Free(tp);
+ }
+ return;
}
static PRThreadPool *
alloc_threadpool(void)
{
-PRThreadPool *tp;
-
- tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
- if (NULL == tp)
- goto failed;
- tp->jobq.lock = PR_NewLock();
- if (NULL == tp->jobq.lock)
- goto failed;
- tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
- if (NULL == tp->jobq.cv)
- goto failed;
- tp->join_lock = PR_NewLock();
- if (NULL == tp->join_lock)
- goto failed;
+ PRThreadPool *tp;
+
+ tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
+ if (NULL == tp) {
+ goto failed;
+ }
+ tp->jobq.lock = PR_NewLock();
+ if (NULL == tp->jobq.lock) {
+ goto failed;
+ }
+ tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
+ if (NULL == tp->jobq.cv) {
+ goto failed;
+ }
+ tp->join_lock = PR_NewLock();
+ if (NULL == tp->join_lock) {
+ goto failed;
+ }
#ifdef OPT_WINNT
- tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
- NULL, 0, 0);
- if (NULL == tp->jobq.nt_completion_port)
- goto failed;
+ tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
+ NULL, 0, 0);
+ if (NULL == tp->jobq.nt_completion_port) {
+ goto failed;
+ }
#endif
- tp->ioq.lock = PR_NewLock();
- if (NULL == tp->ioq.lock)
- goto failed;
-
- /* Timer queue */
-
- tp->timerq.lock = PR_NewLock();
- if (NULL == tp->timerq.lock)
- goto failed;
- tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
- if (NULL == tp->timerq.cv)
- goto failed;
-
- tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
- if (NULL == tp->shutdown_cv)
- goto failed;
- tp->ioq.notify_fd = PR_NewPollableEvent();
- if (NULL == tp->ioq.notify_fd)
- goto failed;
- return tp;
+ tp->ioq.lock = PR_NewLock();
+ if (NULL == tp->ioq.lock) {
+ goto failed;
+ }
+
+ /* Timer queue */
+
+ tp->timerq.lock = PR_NewLock();
+ if (NULL == tp->timerq.lock) {
+ goto failed;
+ }
+ tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
+ if (NULL == tp->timerq.cv) {
+ goto failed;
+ }
+
+ tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
+ if (NULL == tp->shutdown_cv) {
+ goto failed;
+ }
+ tp->ioq.notify_fd = PR_NewPollableEvent();
+ if (NULL == tp->ioq.notify_fd) {
+ goto failed;
+ }
+ return tp;
failed:
- delete_threadpool(tp);
- PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
- return NULL;
+ delete_threadpool(tp);
+ PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
+ return NULL;
}
/* Create thread pool */
PR_IMPLEMENT(PRThreadPool *)
PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
- PRUint32 stacksize)
+ PRUint32 stacksize)
{
-PRThreadPool *tp;
-PRThread *thr;
-int i;
-wthread *wthrp;
-
- tp = alloc_threadpool();
- if (NULL == tp)
- return NULL;
-
- tp->init_threads = initial_threads;
- tp->max_threads = max_threads;
- tp->stacksize = stacksize;
- PR_INIT_CLIST(&tp->jobq.list);
- PR_INIT_CLIST(&tp->ioq.list);
- PR_INIT_CLIST(&tp->timerq.list);
- PR_INIT_CLIST(&tp->jobq.wthreads);
- PR_INIT_CLIST(&tp->ioq.wthreads);
- PR_INIT_CLIST(&tp->timerq.wthreads);
- tp->shutdown = PR_FALSE;
-
- PR_Lock(tp->jobq.lock);
- for(i=0; i < initial_threads; ++i) {
-
- thr = PR_CreateThread(PR_USER_THREAD, wstart,
- tp, PR_PRIORITY_NORMAL,
- PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
- PR_ASSERT(thr);
- wthrp = PR_NEWZAP(wthread);
- PR_ASSERT(wthrp);
- wthrp->thread = thr;
- PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
- }
- tp->current_threads = initial_threads;
-
- thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
- tp, PR_PRIORITY_NORMAL,
- PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
- PR_ASSERT(thr);
- wthrp = PR_NEWZAP(wthread);
- PR_ASSERT(wthrp);
- wthrp->thread = thr;
- PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
-
- thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
- tp, PR_PRIORITY_NORMAL,
- PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
- PR_ASSERT(thr);
- wthrp = PR_NEWZAP(wthread);
- PR_ASSERT(wthrp);
- wthrp->thread = thr;
- PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
-
- PR_Unlock(tp->jobq.lock);
- return tp;
+ PRThreadPool *tp;
+ PRThread *thr;
+ int i;
+ wthread *wthrp;
+
+ tp = alloc_threadpool();
+ if (NULL == tp) {
+ return NULL;
+ }
+
+ tp->init_threads = initial_threads;
+ tp->max_threads = max_threads;
+ tp->stacksize = stacksize;
+ PR_INIT_CLIST(&tp->jobq.list);
+ PR_INIT_CLIST(&tp->ioq.list);
+ PR_INIT_CLIST(&tp->timerq.list);
+ PR_INIT_CLIST(&tp->jobq.wthreads);
+ PR_INIT_CLIST(&tp->ioq.wthreads);
+ PR_INIT_CLIST(&tp->timerq.wthreads);
+ tp->shutdown = PR_FALSE;
+
+ PR_Lock(tp->jobq.lock);
+ for(i=0; i < initial_threads; ++i) {
+
+ thr = PR_CreateThread(PR_USER_THREAD, wstart,
+ tp, PR_PRIORITY_NORMAL,
+ PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
+ PR_ASSERT(thr);
+ wthrp = PR_NEWZAP(wthread);
+ PR_ASSERT(wthrp);
+ wthrp->thread = thr;
+ PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
+ }
+ tp->current_threads = initial_threads;
+
+ thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
+ tp, PR_PRIORITY_NORMAL,
+ PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
+ PR_ASSERT(thr);
+ wthrp = PR_NEWZAP(wthread);
+ PR_ASSERT(wthrp);
+ wthrp->thread = thr;
+ PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
+
+ thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
+ tp, PR_PRIORITY_NORMAL,
+ PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
+ PR_ASSERT(thr);
+ wthrp = PR_NEWZAP(wthread);
+ PR_ASSERT(wthrp);
+ wthrp->thread = thr;
+ PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
+
+ PR_Unlock(tp->jobq.lock);
+ return tp;
}
static void
delete_job(PRJob *jobp)
{
- if (NULL != jobp) {
- if (NULL != jobp->join_cv) {
- PR_DestroyCondVar(jobp->join_cv);
- jobp->join_cv = NULL;
- }
- if (NULL != jobp->cancel_cv) {
- PR_DestroyCondVar(jobp->cancel_cv);
- jobp->cancel_cv = NULL;
- }
- PR_DELETE(jobp);
- }
+ if (NULL != jobp) {
+ if (NULL != jobp->join_cv) {
+ PR_DestroyCondVar(jobp->join_cv);
+ jobp->join_cv = NULL;
+ }
+ if (NULL != jobp->cancel_cv) {
+ PR_DestroyCondVar(jobp->cancel_cv);
+ jobp->cancel_cv = NULL;
+ }
+ PR_DELETE(jobp);
+ }
}
static PRJob *
alloc_job(PRBool joinable, PRThreadPool *tp)
{
- PRJob *jobp;
-
- jobp = PR_NEWZAP(PRJob);
- if (NULL == jobp)
- goto failed;
- if (joinable) {
- jobp->join_cv = PR_NewCondVar(tp->join_lock);
- jobp->join_wait = PR_TRUE;
- if (NULL == jobp->join_cv)
- goto failed;
- } else {
- jobp->join_cv = NULL;
- }
+ PRJob *jobp;
+
+ jobp = PR_NEWZAP(PRJob);
+ if (NULL == jobp) {
+ goto failed;
+ }
+ if (joinable) {
+ jobp->join_cv = PR_NewCondVar(tp->join_lock);
+ jobp->join_wait = PR_TRUE;
+ if (NULL == jobp->join_cv) {
+ goto failed;
+ }
+ } else {
+ jobp->join_cv = NULL;
+ }
#ifdef OPT_WINNT
- jobp->nt_notifier.jobp = jobp;
+ jobp->nt_notifier.jobp = jobp;
#endif
- return jobp;
+ return jobp;
failed:
- delete_job(jobp);
- PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
- return NULL;
+ delete_job(jobp);
+ PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
+ return NULL;
}
/* queue a job */
PR_IMPLEMENT(PRJob *)
PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
{
- PRJob *jobp;
+ PRJob *jobp;
- jobp = alloc_job(joinable, tpool);
- if (NULL == jobp)
- return NULL;
+ jobp = alloc_job(joinable, tpool);
+ if (NULL == jobp) {
+ return NULL;
+ }
- jobp->job_func = fn;
- jobp->job_arg = arg;
- jobp->tpool = tpool;
+ jobp->job_func = fn;
+ jobp->job_arg = arg;
+ jobp->tpool = tpool;
- add_to_jobq(tpool, jobp);
- return jobp;
+ add_to_jobq(tpool, jobp);
+ return jobp;
}
/* queue a job, when a socket is readable or writeable */
static PRJob *
queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
- PRBool joinable, io_op_type op)
+ PRBool joinable, io_op_type op)
{
- PRJob *jobp;
- PRIntervalTime now;
-
- jobp = alloc_job(joinable, tpool);
- if (NULL == jobp) {
- return NULL;
- }
-
- /*
- * Add a new job to io_jobq
- * wakeup io worker thread
- */
-
- jobp->job_func = fn;
- jobp->job_arg = arg;
- jobp->tpool = tpool;
- jobp->iod = iod;
- if (JOB_IO_READ == op) {
- jobp->io_op = JOB_IO_READ;
- jobp->io_poll_flags = PR_POLL_READ;
- } else if (JOB_IO_WRITE == op) {
- jobp->io_op = JOB_IO_WRITE;
- jobp->io_poll_flags = PR_POLL_WRITE;
- } else if (JOB_IO_ACCEPT == op) {
- jobp->io_op = JOB_IO_ACCEPT;
- jobp->io_poll_flags = PR_POLL_READ;
- } else if (JOB_IO_CONNECT == op) {
- jobp->io_op = JOB_IO_CONNECT;
- jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
- } else {
- delete_job(jobp);
- PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
- return NULL;
- }
-
- jobp->timeout = iod->timeout;
- if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
- (PR_INTERVAL_NO_WAIT == iod->timeout)) {
- jobp->absolute = iod->timeout;
- } else {
- now = PR_IntervalNow();
- jobp->absolute = now + iod->timeout;
- }
-
-
- PR_Lock(tpool->ioq.lock);
-
- if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
- (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
- PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
- } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
- PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
- } else {
- PRCList *qp;
- PRJob *tmp_jobp;
- /*
- * insert into the timeout-sorted ioq
- */
- for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
- qp = qp->prev) {
- tmp_jobp = JOB_LINKS_PTR(qp);
- if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
- break;
- }
- }
- PR_INSERT_AFTER(&jobp->links,qp);
- }
-
- jobp->on_ioq = PR_TRUE;
- tpool->ioq.cnt++;
- /*
- * notify io worker thread(s)
- */
- PR_Unlock(tpool->ioq.lock);
- notify_ioq(tpool);
- return jobp;
+ PRJob *jobp;
+ PRIntervalTime now;
+
+ jobp = alloc_job(joinable, tpool);
+ if (NULL == jobp) {
+ return NULL;
+ }
+
+ /*
+ * Add a new job to io_jobq
+ * wakeup io worker thread
+ */
+
+ jobp->job_func = fn;
+ jobp->job_arg = arg;
+ jobp->tpool = tpool;
+ jobp->iod = iod;
+ if (JOB_IO_READ == op) {
+ jobp->io_op = JOB_IO_READ;
+ jobp->io_poll_flags = PR_POLL_READ;
+ } else if (JOB_IO_WRITE == op) {
+ jobp->io_op = JOB_IO_WRITE;
+ jobp->io_poll_flags = PR_POLL_WRITE;
+ } else if (JOB_IO_ACCEPT == op) {
+ jobp->io_op = JOB_IO_ACCEPT;
+ jobp->io_poll_flags = PR_POLL_READ;
+ } else if (JOB_IO_CONNECT == op) {
+ jobp->io_op = JOB_IO_CONNECT;
+ jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
+ } else {
+ delete_job(jobp);
+ PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
+ return NULL;
+ }
+
+ jobp->timeout = iod->timeout;
+ if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
+ (PR_INTERVAL_NO_WAIT == iod->timeout)) {
+ jobp->absolute = iod->timeout;
+ } else {
+ now = PR_IntervalNow();
+ jobp->absolute = now + iod->timeout;
+ }
+
+
+ PR_Lock(tpool->ioq.lock);
+
+ if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
+ (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
+ PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
+ } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
+ PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
+ } else {
+ PRCList *qp;
+ PRJob *tmp_jobp;
+ /*
+ * insert into the timeout-sorted ioq
+ */
+ for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
+ qp = qp->prev) {
+ tmp_jobp = JOB_LINKS_PTR(qp);
+ if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
+ break;
+ }
+ }
+ PR_INSERT_AFTER(&jobp->links,qp);
+ }
+
+ jobp->on_ioq = PR_TRUE;
+ tpool->ioq.cnt++;
+ /*
+ * notify io worker thread(s)
+ */
+ PR_Unlock(tpool->ioq.lock);
+ notify_ioq(tpool);
+ return jobp;
}
/* queue a job, when a socket is readable */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
- PRBool joinable)
+ PRBool joinable)
{
- return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
+ return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
}
/* queue a job, when a socket is writeable */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
- PRBool joinable)
+ PRBool joinable)
{
- return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
+ return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
}
/* queue a job, when a socket has a pending connection */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
- void * arg, PRBool joinable)
+ void * arg, PRBool joinable)
{
- return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
+ return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
}
/* queue a job, when a socket can be connected */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
- const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
+ const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
{
- PRStatus rv;
- PRErrorCode err;
-
- rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
- if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
- /* connection pending */
- return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
- }
+ PRStatus rv;
+ PRErrorCode err;
+
+ rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
+ if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) {
+ /* connection pending */
+ return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
+ }
/*
* connection succeeded or failed; add to jobq right away
*/
- if (rv == PR_FAILURE)
- iod->error = err;
- else
- iod->error = 0;
+ if (rv == PR_FAILURE) {
+ iod->error = err;
+ }
+ else {
+ iod->error = 0;
+ }
return(PR_QueueJob(tpool, fn, arg, joinable));
}
@@ -877,311 +919,318 @@ PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
/* queue a job, when a timer expires */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
- PRJobFn fn, void * arg, PRBool joinable)
+ PRJobFn fn, void * arg, PRBool joinable)
{
- PRIntervalTime now;
- PRJob *jobp;
-
- if (PR_INTERVAL_NO_TIMEOUT == timeout) {
- PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
- return NULL;
- }
- if (PR_INTERVAL_NO_WAIT == timeout) {
- /*
- * no waiting; add to jobq right away
- */
- return(PR_QueueJob(tpool, fn, arg, joinable));
- }
- jobp = alloc_job(joinable, tpool);
- if (NULL == jobp) {
- return NULL;
- }
-
- /*
- * Add a new job to timer_jobq
- * wakeup timer worker thread
- */
-
- jobp->job_func = fn;
- jobp->job_arg = arg;
- jobp->tpool = tpool;
- jobp->timeout = timeout;
-
- now = PR_IntervalNow();
- jobp->absolute = now + timeout;
-
-
- PR_Lock(tpool->timerq.lock);
- jobp->on_timerq = PR_TRUE;
- if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
- PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
- else {
- PRCList *qp;
- PRJob *tmp_jobp;
- /*
- * insert into the sorted timer jobq
- */
- for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
- qp = qp->prev) {
- tmp_jobp = JOB_LINKS_PTR(qp);
- if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
- break;
- }
- }
- PR_INSERT_AFTER(&jobp->links,qp);
- }
- tpool->timerq.cnt++;
- /*
- * notify timer worker thread(s)
- */
- notify_timerq(tpool);
- PR_Unlock(tpool->timerq.lock);
- return jobp;
+ PRIntervalTime now;
+ PRJob *jobp;
+
+ if (PR_INTERVAL_NO_TIMEOUT == timeout) {
+ PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
+ return NULL;
+ }
+ if (PR_INTERVAL_NO_WAIT == timeout) {
+ /*
+ * no waiting; add to jobq right away
+ */
+ return(PR_QueueJob(tpool, fn, arg, joinable));
+ }
+ jobp = alloc_job(joinable, tpool);
+ if (NULL == jobp) {
+ return NULL;
+ }
+
+ /*
+ * Add a new job to timer_jobq
+ * wakeup timer worker thread
+ */
+
+ jobp->job_func = fn;
+ jobp->job_arg = arg;
+ jobp->tpool = tpool;
+ jobp->timeout = timeout;
+
+ now = PR_IntervalNow();
+ jobp->absolute = now + timeout;
+
+
+ PR_Lock(tpool->timerq.lock);
+ jobp->on_timerq = PR_TRUE;
+ if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
+ PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
+ }
+ else {
+ PRCList *qp;
+ PRJob *tmp_jobp;
+ /*
+ * insert into the sorted timer jobq
+ */
+ for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
+ qp = qp->prev) {
+ tmp_jobp = JOB_LINKS_PTR(qp);
+ if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
+ break;
+ }
+ }
+ PR_INSERT_AFTER(&jobp->links,qp);
+ }
+ tpool->timerq.cnt++;
+ /*
+ * notify timer worker thread(s)
+ */
+ notify_timerq(tpool);
+ PR_Unlock(tpool->timerq.lock);
+ return jobp;
}
static void
notify_timerq(PRThreadPool *tp)
{
- /*
- * wakeup the timer thread(s)
- */
- PR_NotifyCondVar(tp->timerq.cv);
+ /*
+ * wakeup the timer thread(s)
+ */
+ PR_NotifyCondVar(tp->timerq.cv);
}
static void
notify_ioq(PRThreadPool *tp)
{
-PRStatus rval_status;
+ PRStatus rval_status;
- /*
- * wakeup the io thread(s)
- */
- rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
- PR_ASSERT(PR_SUCCESS == rval_status);
+ /*
+ * wakeup the io thread(s)
+ */
+ rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
+ PR_ASSERT(PR_SUCCESS == rval_status);
}
/*
* cancel a job
*
- * XXXX: is this needed? likely to be removed
+ * XXXX: is this needed? likely to be removed
*/
PR_IMPLEMENT(PRStatus)
PR_CancelJob(PRJob *jobp) {
- PRStatus rval = PR_FAILURE;
- PRThreadPool *tp;
-
- if (jobp->on_timerq) {
- /*
- * now, check again while holding the timerq lock
- */
- tp = jobp->tpool;
- PR_Lock(tp->timerq.lock);
- if (jobp->on_timerq) {
- jobp->on_timerq = PR_FALSE;
- PR_REMOVE_AND_INIT_LINK(&jobp->links);
- tp->timerq.cnt--;
- PR_Unlock(tp->timerq.lock);
- if (!JOINABLE_JOB(jobp)) {
- delete_job(jobp);
- } else {
- JOIN_NOTIFY(jobp);
- }
- rval = PR_SUCCESS;
- } else
- PR_Unlock(tp->timerq.lock);
- } else if (jobp->on_ioq) {
- /*
- * now, check again while holding the ioq lock
- */
- tp = jobp->tpool;
- PR_Lock(tp->ioq.lock);
- if (jobp->on_ioq) {
- jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
- if (NULL == jobp->cancel_cv) {
- PR_Unlock(tp->ioq.lock);
- PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
- return PR_FAILURE;
- }
- /*
- * mark job 'cancelled' and notify io thread(s)
- * XXXX:
- * this assumes there is only one io thread; when there
- * are multiple threads, the io thread processing this job
- * must be notified.
- */
- jobp->cancel_io = PR_TRUE;
- PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
- notify_ioq(tp);
- PR_Lock(tp->ioq.lock);
- while (jobp->cancel_io)
- PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
- PR_Unlock(tp->ioq.lock);
- PR_ASSERT(!jobp->on_ioq);
- if (!JOINABLE_JOB(jobp)) {
- delete_job(jobp);
- } else {
- JOIN_NOTIFY(jobp);
- }
- rval = PR_SUCCESS;
- } else
- PR_Unlock(tp->ioq.lock);
- }
- if (PR_FAILURE == rval)
- PR_SetError(PR_INVALID_STATE_ERROR, 0);
- return rval;
+ PRStatus rval = PR_FAILURE;
+ PRThreadPool *tp;
+
+ if (jobp->on_timerq) {
+ /*
+ * now, check again while holding the timerq lock
+ */
+ tp = jobp->tpool;
+ PR_Lock(tp->timerq.lock);
+ if (jobp->on_timerq) {
+ jobp->on_timerq = PR_FALSE;
+ PR_REMOVE_AND_INIT_LINK(&jobp->links);
+ tp->timerq.cnt--;
+ PR_Unlock(tp->timerq.lock);
+ if (!JOINABLE_JOB(jobp)) {
+ delete_job(jobp);
+ } else {
+ JOIN_NOTIFY(jobp);
+ }
+ rval = PR_SUCCESS;
+ } else {
+ PR_Unlock(tp->timerq.lock);
+ }
+ } else if (jobp->on_ioq) {
+ /*
+ * now, check again while holding the ioq lock
+ */
+ tp = jobp->tpool;
+ PR_Lock(tp->ioq.lock);
+ if (jobp->on_ioq) {
+ jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
+ if (NULL == jobp->cancel_cv) {
+ PR_Unlock(tp->ioq.lock);
+ PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
+ return PR_FAILURE;
+ }
+ /*
+ * mark job 'cancelled' and notify io thread(s)
+ * XXXX:
+ * this assumes there is only one io thread; when there
+ * are multiple threads, the io thread processing this job
+ * must be notified.
+ */
+ jobp->cancel_io = PR_TRUE;
+ PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
+ notify_ioq(tp);
+ PR_Lock(tp->ioq.lock);
+ while (jobp->cancel_io) {
+ PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
+ }
+ PR_Unlock(tp->ioq.lock);
+ PR_ASSERT(!jobp->on_ioq);
+ if (!JOINABLE_JOB(jobp)) {
+ delete_job(jobp);
+ } else {
+ JOIN_NOTIFY(jobp);
+ }
+ rval = PR_SUCCESS;
+ } else {
+ PR_Unlock(tp->ioq.lock);
+ }
+ }
+ if (PR_FAILURE == rval) {
+ PR_SetError(PR_INVALID_STATE_ERROR, 0);
+ }
+ return rval;
}
/* join a job, wait until completion */
PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob *jobp)
{
- if (!JOINABLE_JOB(jobp)) {
- PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
- return PR_FAILURE;
- }
- PR_Lock(jobp->tpool->join_lock);
- while(jobp->join_wait)
- PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
- PR_Unlock(jobp->tpool->join_lock);
- delete_job(jobp);
- return PR_SUCCESS;
+ if (!JOINABLE_JOB(jobp)) {
+ PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
+ return PR_FAILURE;
+ }
+ PR_Lock(jobp->tpool->join_lock);
+ while(jobp->join_wait) {
+ PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
+ }
+ PR_Unlock(jobp->tpool->join_lock);
+ delete_job(jobp);
+ return PR_SUCCESS;
}
/* shutdown threadpool */
PR_IMPLEMENT(PRStatus)
PR_ShutdownThreadPool(PRThreadPool *tpool)
{
-PRStatus rval = PR_SUCCESS;
+ PRStatus rval = PR_SUCCESS;
- PR_Lock(tpool->jobq.lock);
- tpool->shutdown = PR_TRUE;
- PR_NotifyAllCondVar(tpool->shutdown_cv);
- PR_Unlock(tpool->jobq.lock);
+ PR_Lock(tpool->jobq.lock);
+ tpool->shutdown = PR_TRUE;
+ PR_NotifyAllCondVar(tpool->shutdown_cv);
+ PR_Unlock(tpool->jobq.lock);
- return rval;
+ return rval;
}
/*
* join thread pool
- * wait for termination of worker threads
- * reclaim threadpool resources
+ * wait for termination of worker threads
+ * reclaim threadpool resources
*/
PR_IMPLEMENT(PRStatus)
PR_JoinThreadPool(PRThreadPool *tpool)
{
-PRStatus rval = PR_SUCCESS;
-PRCList *head;
-PRStatus rval_status;
+ PRStatus rval = PR_SUCCESS;
+ PRCList *head;
+ PRStatus rval_status;
- PR_Lock(tpool->jobq.lock);
- while (!tpool->shutdown)
- PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
+ PR_Lock(tpool->jobq.lock);
+ while (!tpool->shutdown) {
+ PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
+ }
- /*
- * wakeup worker threads
- */
+ /*
+ * wakeup worker threads
+ */
#ifdef OPT_WINNT
- /*
- * post shutdown notification for all threads
- */
- {
- int i;
- for(i=0; i < tpool->current_threads; i++) {
- PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
- TRUE, NULL);
- }
- }
+ /*
+ * post shutdown notification for all threads
+ */
+ {
+ int i;
+ for(i=0; i < tpool->current_threads; i++) {
+ PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
+ TRUE, NULL);
+ }
+ }
#else
- PR_NotifyAllCondVar(tpool->jobq.cv);
+ PR_NotifyAllCondVar(tpool->jobq.cv);
#endif
- /*
- * wakeup io thread(s)
- */
- notify_ioq(tpool);
-
- /*
- * wakeup timer thread(s)
- */
- PR_Lock(tpool->timerq.lock);
- notify_timerq(tpool);
- PR_Unlock(tpool->timerq.lock);
-
- while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
- wthread *wthrp;
-
- head = PR_LIST_HEAD(&tpool->jobq.wthreads);
- PR_REMOVE_AND_INIT_LINK(head);
- PR_Unlock(tpool->jobq.lock);
- wthrp = WTHREAD_LINKS_PTR(head);
- rval_status = PR_JoinThread(wthrp->thread);
- PR_ASSERT(PR_SUCCESS == rval_status);
- PR_DELETE(wthrp);
- PR_Lock(tpool->jobq.lock);
- }
- PR_Unlock(tpool->jobq.lock);
- while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
- wthread *wthrp;
-
- head = PR_LIST_HEAD(&tpool->ioq.wthreads);
- PR_REMOVE_AND_INIT_LINK(head);
- wthrp = WTHREAD_LINKS_PTR(head);
- rval_status = PR_JoinThread(wthrp->thread);
- PR_ASSERT(PR_SUCCESS == rval_status);
- PR_DELETE(wthrp);
- }
-
- while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
- wthread *wthrp;
-
- head = PR_LIST_HEAD(&tpool->timerq.wthreads);
- PR_REMOVE_AND_INIT_LINK(head);
- wthrp = WTHREAD_LINKS_PTR(head);
- rval_status = PR_JoinThread(wthrp->thread);
- PR_ASSERT(PR_SUCCESS == rval_status);
- PR_DELETE(wthrp);
- }
-
- /*
- * Delete queued jobs
- */
- while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
- PRJob *jobp;
-
- head = PR_LIST_HEAD(&tpool->jobq.list);
- PR_REMOVE_AND_INIT_LINK(head);
- jobp = JOB_LINKS_PTR(head);
- tpool->jobq.cnt--;
- delete_job(jobp);
- }
-
- /* delete io jobs */
- while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
- PRJob *jobp;
-
- head = PR_LIST_HEAD(&tpool->ioq.list);
- PR_REMOVE_AND_INIT_LINK(head);
- tpool->ioq.cnt--;
- jobp = JOB_LINKS_PTR(head);
- delete_job(jobp);
- }
-
- /* delete timer jobs */
- while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
- PRJob *jobp;
-
- head = PR_LIST_HEAD(&tpool->timerq.list);
- PR_REMOVE_AND_INIT_LINK(head);
- tpool->timerq.cnt--;
- jobp = JOB_LINKS_PTR(head);
- delete_job(jobp);
- }
-
- PR_ASSERT(0 == tpool->jobq.cnt);
- PR_ASSERT(0 == tpool->ioq.cnt);
- PR_ASSERT(0 == tpool->timerq.cnt);
-
- delete_threadpool(tpool);
- return rval;
+ /*
+ * wakeup io thread(s)
+ */
+ notify_ioq(tpool);
+
+ /*
+ * wakeup timer thread(s)
+ */
+ PR_Lock(tpool->timerq.lock);
+ notify_timerq(tpool);
+ PR_Unlock(tpool->timerq.lock);
+
+ while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
+ wthread *wthrp;
+
+ head = PR_LIST_HEAD(&tpool->jobq.wthreads);
+ PR_REMOVE_AND_INIT_LINK(head);
+ PR_Unlock(tpool->jobq.lock);
+ wthrp = WTHREAD_LINKS_PTR(head);
+ rval_status = PR_JoinThread(wthrp->thread);
+ PR_ASSERT(PR_SUCCESS == rval_status);
+ PR_DELETE(wthrp);
+ PR_Lock(tpool->jobq.lock);
+ }
+ PR_Unlock(tpool->jobq.lock);
+ while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
+ wthread *wthrp;
+
+ head = PR_LIST_HEAD(&tpool->ioq.wthreads);
+ PR_REMOVE_AND_INIT_LINK(head);
+ wthrp = WTHREAD_LINKS_PTR(head);
+ rval_status = PR_JoinThread(wthrp->thread);
+ PR_ASSERT(PR_SUCCESS == rval_status);
+ PR_DELETE(wthrp);
+ }
+
+ while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
+ wthread *wthrp;
+
+ head = PR_LIST_HEAD(&tpool->timerq.wthreads);
+ PR_REMOVE_AND_INIT_LINK(head);
+ wthrp = WTHREAD_LINKS_PTR(head);
+ rval_status = PR_JoinThread(wthrp->thread);
+ PR_ASSERT(PR_SUCCESS == rval_status);
+ PR_DELETE(wthrp);
+ }
+
+ /*
+ * Delete queued jobs
+ */
+ while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
+ PRJob *jobp;
+
+ head = PR_LIST_HEAD(&tpool->jobq.list);
+ PR_REMOVE_AND_INIT_LINK(head);
+ jobp = JOB_LINKS_PTR(head);
+ tpool->jobq.cnt--;
+ delete_job(jobp);
+ }
+
+ /* delete io jobs */
+ while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
+ PRJob *jobp;
+
+ head = PR_LIST_HEAD(&tpool->ioq.list);
+ PR_REMOVE_AND_INIT_LINK(head);
+ tpool->ioq.cnt--;
+ jobp = JOB_LINKS_PTR(head);
+ delete_job(jobp);
+ }
+
+ /* delete timer jobs */
+ while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
+ PRJob *jobp;
+
+ head = PR_LIST_HEAD(&tpool->timerq.list);
+ PR_REMOVE_AND_INIT_LINK(head);
+ tpool->timerq.cnt--;
+ jobp = JOB_LINKS_PTR(head);
+ delete_job(jobp);
+ }
+
+ PR_ASSERT(0 == tpool->jobq.cnt);
+ PR_ASSERT(0 == tpool->ioq.cnt);
+ PR_ASSERT(0 == tpool->timerq.cnt);
+
+ delete_threadpool(tpool);
+ return rval;
}