diff options
Diffstat (limited to 'media/sphinxbase/src/libsphinxbase/util/sbthread.c')
-rw-r--r-- | media/sphinxbase/src/libsphinxbase/util/sbthread.c | 741 |
1 files changed, 741 insertions, 0 deletions
diff --git a/media/sphinxbase/src/libsphinxbase/util/sbthread.c b/media/sphinxbase/src/libsphinxbase/util/sbthread.c new file mode 100644 index 000000000..28bf77356 --- /dev/null +++ b/media/sphinxbase/src/libsphinxbase/util/sbthread.c @@ -0,0 +1,741 @@ +/* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* ==================================================================== + * Copyright (c) 2008 Carnegie Mellon University. All rights + * reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * + * This work was supported in part by funding from the Defense Advanced + * Research Projects Agency and the National Science Foundation of the + * United States of America, and the CMU Sphinx Speech Consortium. + * + * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND + * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY + * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * ==================================================================== + * + */ + +/** + * @file sbthread.c + * @brief Simple portable thread functions + * @author David Huggins-Daines <dhuggins@cs.cmu.edu> + */ + +#include <string.h> + +#include "sphinxbase/sbthread.h" +#include "sphinxbase/ckd_alloc.h" +#include "sphinxbase/err.h" + +/* + * Platform-specific parts: threads, mutexes, and signals. + */ +#if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__) +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0400 +#endif /* not _WIN32_WINNT */ +#include <windows.h> + +struct sbthread_s { + cmd_ln_t *config; + sbmsgq_t *msgq; + sbthread_main func; + void *arg; + HANDLE th; + DWORD tid; +}; + +struct sbmsgq_s { + /* Ringbuffer for passing messages. */ + char *data; + size_t depth; + size_t out; + size_t nbytes; + + /* Current message is stored here. */ + char *msg; + size_t msglen; + CRITICAL_SECTION mtx; + HANDLE evt; +}; + +struct sbevent_s { + HANDLE evt; +}; + +struct sbmtx_s { + CRITICAL_SECTION mtx; +}; + +DWORD WINAPI +sbthread_internal_main(LPVOID arg) +{ + sbthread_t *th = (sbthread_t *)arg; + int rv; + + rv = (*th->func)(th); + return (DWORD)rv; +} + +sbthread_t * +sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg) +{ + sbthread_t *th; + + th = ckd_calloc(1, sizeof(*th)); + th->config = config; + th->func = func; + th->arg = arg; + th->msgq = sbmsgq_init(256); + th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid); + if (th->th == NULL) { + sbthread_free(th); + return NULL; + } + return th; +} + +int +sbthread_wait(sbthread_t *th) +{ + DWORD rv, exit; + + /* It has already been joined. */ + if (th->th == NULL) + return -1; + + rv = WaitForSingleObject(th->th, INFINITE); + if (rv == WAIT_FAILED) { + E_ERROR("Failed to join thread: WAIT_FAILED\n"); + return -1; + } + GetExitCodeThread(th->th, &exit); + CloseHandle(th->th); + th->th = NULL; + return (int)exit; +} + +static DWORD +cond_timed_wait(HANDLE cond, int sec, int nsec) +{ + DWORD rv; + if (sec == -1) { + rv = WaitForSingleObject(cond, INFINITE); + } + else { + DWORD ms; + + ms = sec * 1000 + nsec / (1000*1000); + rv = WaitForSingleObject(cond, ms); + } + return rv; +} + +/* Updated to use Unicode */ +sbevent_t * +sbevent_init(void) +{ + sbevent_t *evt; + + evt = ckd_calloc(1, sizeof(*evt)); + evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL); + if (evt->evt == NULL) { + ckd_free(evt); + return NULL; + } + return evt; +} + +void +sbevent_free(sbevent_t *evt) +{ + CloseHandle(evt->evt); + ckd_free(evt); +} + +int +sbevent_signal(sbevent_t *evt) +{ + return SetEvent(evt->evt) ? 0 : -1; +} + +int +sbevent_wait(sbevent_t *evt, int sec, int nsec) +{ + DWORD rv; + + rv = cond_timed_wait(evt->evt, sec, nsec); + return rv; +} + +sbmtx_t * +sbmtx_init(void) +{ + sbmtx_t *mtx; + + mtx = ckd_calloc(1, sizeof(*mtx)); + InitializeCriticalSection(&mtx->mtx); + return mtx; +} + +int +sbmtx_trylock(sbmtx_t *mtx) +{ + return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1; +} + +int +sbmtx_lock(sbmtx_t *mtx) +{ + EnterCriticalSection(&mtx->mtx); + return 0; +} + +int +sbmtx_unlock(sbmtx_t *mtx) +{ + LeaveCriticalSection(&mtx->mtx); + return 0; +} + +void +sbmtx_free(sbmtx_t *mtx) +{ + DeleteCriticalSection(&mtx->mtx); + ckd_free(mtx); +} + +sbmsgq_t * +sbmsgq_init(size_t depth) +{ + sbmsgq_t *msgq; + + msgq = ckd_calloc(1, sizeof(*msgq)); + msgq->depth = depth; + msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL); + if (msgq->evt == NULL) { + ckd_free(msgq); + return NULL; + } + InitializeCriticalSection(&msgq->mtx); + msgq->data = ckd_calloc(depth, 1); + msgq->msg = ckd_calloc(depth, 1); + return msgq; +} + +void +sbmsgq_free(sbmsgq_t *msgq) +{ + CloseHandle(msgq->evt); + ckd_free(msgq->data); + ckd_free(msgq->msg); + ckd_free(msgq); +} + +int +sbmsgq_send(sbmsgq_t *q, size_t len, void const *data) +{ + char const *cdata = (char const *)data; + size_t in; + + /* Don't allow things bigger than depth to be sent! */ + if (len + sizeof(len) > q->depth) + return -1; + + if (q->nbytes + len + sizeof(len) > q->depth) + WaitForSingleObject(q->evt, INFINITE); + + /* Lock things while we manipulate the buffer (FIXME: this + actually should have been atomic with the wait above ...) */ + EnterCriticalSection(&q->mtx); + in = (q->out + q->nbytes) % q->depth; + /* First write the size of the message. */ + if (in + sizeof(len) > q->depth) { + /* Handle the annoying case where the size field gets wrapped around. */ + size_t len1 = q->depth - in; + memcpy(q->data + in, &len, len1); + memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1); + q->nbytes += sizeof(len); + in = sizeof(len) - len1; + } + else { + memcpy(q->data + in, &len, sizeof(len)); + q->nbytes += sizeof(len); + in += sizeof(len); + } + + /* Now write the message body. */ + if (in + len > q->depth) { + /* Handle wraparound. */ + size_t len1 = q->depth - in; + memcpy(q->data + in, cdata, len1); + q->nbytes += len1; + cdata += len1; + len -= len1; + in = 0; + } + memcpy(q->data + in, cdata, len); + q->nbytes += len; + + /* Signal the condition variable. */ + SetEvent(q->evt); + /* Unlock. */ + LeaveCriticalSection(&q->mtx); + + return 0; +} + +void * +sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec) +{ + char *outptr; + size_t len; + + /* Wait for data to be available. */ + if (q->nbytes == 0) { + if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED) + /* Timed out or something... */ + return NULL; + } + /* Lock to manipulate the queue (FIXME) */ + EnterCriticalSection(&q->mtx); + /* Get the message size. */ + if (q->out + sizeof(q->msglen) > q->depth) { + /* Handle annoying wraparound case. */ + size_t len1 = q->depth - q->out; + memcpy(&q->msglen, q->data + q->out, len1); + memcpy(((char *)&q->msglen) + len1, q->data, + sizeof(q->msglen) - len1); + q->out = sizeof(q->msglen) - len1; + } + else { + memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen)); + q->out += sizeof(q->msglen); + } + q->nbytes -= sizeof(q->msglen); + /* Get the message body. */ + outptr = q->msg; + len = q->msglen; + if (q->out + q->msglen > q->depth) { + /* Handle wraparound. */ + size_t len1 = q->depth - q->out; + memcpy(outptr, q->data + q->out, len1); + outptr += len1; + len -= len1; + q->nbytes -= len1; + q->out = 0; + } + memcpy(outptr, q->data + q->out, len); + q->nbytes -= len; + q->out += len; + + /* Signal the condition variable. */ + SetEvent(q->evt); + /* Unlock. */ + LeaveCriticalSection(&q->mtx); + if (out_len) + *out_len = q->msglen; + return q->msg; +} + +#else /* POSIX */ +#include <pthread.h> +#include <sys/time.h> + +struct sbthread_s { + cmd_ln_t *config; + sbmsgq_t *msgq; + sbthread_main func; + void *arg; + pthread_t th; +}; + +struct sbmsgq_s { + /* Ringbuffer for passing messages. */ + char *data; + size_t depth; + size_t out; + size_t nbytes; + + /* Current message is stored here. */ + char *msg; + size_t msglen; + pthread_mutex_t mtx; + pthread_cond_t cond; +}; + +struct sbevent_s { + pthread_mutex_t mtx; + pthread_cond_t cond; + int signalled; +}; + +struct sbmtx_s { + pthread_mutex_t mtx; +}; + +static void * +sbthread_internal_main(void *arg) +{ + sbthread_t *th = (sbthread_t *)arg; + int rv; + + rv = (*th->func)(th); + return (void *)(long)rv; +} + +sbthread_t * +sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg) +{ + sbthread_t *th; + int rv; + + th = ckd_calloc(1, sizeof(*th)); + th->config = config; + th->func = func; + th->arg = arg; + th->msgq = sbmsgq_init(1024); + if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) { + E_ERROR("Failed to create thread: %d\n", rv); + sbthread_free(th); + return NULL; + } + return th; +} + +int +sbthread_wait(sbthread_t *th) +{ + void *exit; + int rv; + + /* It has already been joined. */ + if (th->th == (pthread_t)-1) + return -1; + + rv = pthread_join(th->th, &exit); + if (rv != 0) { + E_ERROR("Failed to join thread: %d\n", rv); + return -1; + } + th->th = (pthread_t)-1; + return (int)(long)exit; +} + +sbmsgq_t * +sbmsgq_init(size_t depth) +{ + sbmsgq_t *msgq; + + msgq = ckd_calloc(1, sizeof(*msgq)); + msgq->depth = depth; + if (pthread_cond_init(&msgq->cond, NULL) != 0) { + ckd_free(msgq); + return NULL; + } + if (pthread_mutex_init(&msgq->mtx, NULL) != 0) { + pthread_cond_destroy(&msgq->cond); + ckd_free(msgq); + return NULL; + } + msgq->data = ckd_calloc(depth, 1); + msgq->msg = ckd_calloc(depth, 1); + return msgq; +} + +void +sbmsgq_free(sbmsgq_t *msgq) +{ + pthread_mutex_destroy(&msgq->mtx); + pthread_cond_destroy(&msgq->cond); + ckd_free(msgq->data); + ckd_free(msgq->msg); + ckd_free(msgq); +} + +int +sbmsgq_send(sbmsgq_t *q, size_t len, void const *data) +{ + size_t in; + + /* Don't allow things bigger than depth to be sent! */ + if (len + sizeof(len) > q->depth) + return -1; + + /* Lock the condition variable while we manipulate the buffer. */ + pthread_mutex_lock(&q->mtx); + if (q->nbytes + len + sizeof(len) > q->depth) { + /* Unlock and wait for space to be available. */ + if (pthread_cond_wait(&q->cond, &q->mtx) != 0) { + /* Timed out, don't send anything. */ + pthread_mutex_unlock(&q->mtx); + return -1; + } + /* Condition is now locked again. */ + } + in = (q->out + q->nbytes) % q->depth; + + /* First write the size of the message. */ + if (in + sizeof(len) > q->depth) { + /* Handle the annoying case where the size field gets wrapped around. */ + size_t len1 = q->depth - in; + memcpy(q->data + in, &len, len1); + memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1); + q->nbytes += sizeof(len); + in = sizeof(len) - len1; + } + else { + memcpy(q->data + in, &len, sizeof(len)); + q->nbytes += sizeof(len); + in += sizeof(len); + } + + /* Now write the message body. */ + if (in + len > q->depth) { + /* Handle wraparound. */ + size_t len1 = q->depth - in; + memcpy(q->data + in, data, len1); + q->nbytes += len1; + data = (char const *)data + len1; + len -= len1; + in = 0; + } + memcpy(q->data + in, data, len); + q->nbytes += len; + + /* Signal the condition variable. */ + pthread_cond_signal(&q->cond); + /* Unlock it, we have nothing else to do. */ + pthread_mutex_unlock(&q->mtx); + return 0; +} + +static int +cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec) +{ + int rv; + if (sec == -1) { + rv = pthread_cond_wait(cond, mtx); + } + else { + struct timeval now; + struct timespec end; + + gettimeofday(&now, NULL); + end.tv_sec = now.tv_sec + sec; + end.tv_nsec = now.tv_usec * 1000 + nsec; + if (end.tv_nsec > (1000*1000*1000)) { + sec += end.tv_nsec / (1000*1000*1000); + end.tv_nsec = end.tv_nsec % (1000*1000*1000); + } + rv = pthread_cond_timedwait(cond, mtx, &end); + } + return rv; +} + +void * +sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec) +{ + char *outptr; + size_t len; + + /* Lock the condition variable while we manipulate nmsg. */ + pthread_mutex_lock(&q->mtx); + if (q->nbytes == 0) { + /* Unlock the condition variable and wait for a signal. */ + if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) { + /* Timed out or something... */ + pthread_mutex_unlock(&q->mtx); + return NULL; + } + /* Condition variable is now locked again. */ + } + /* Get the message size. */ + if (q->out + sizeof(q->msglen) > q->depth) { + /* Handle annoying wraparound case. */ + size_t len1 = q->depth - q->out; + memcpy(&q->msglen, q->data + q->out, len1); + memcpy(((char *)&q->msglen) + len1, q->data, + sizeof(q->msglen) - len1); + q->out = sizeof(q->msglen) - len1; + } + else { + memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen)); + q->out += sizeof(q->msglen); + } + q->nbytes -= sizeof(q->msglen); + /* Get the message body. */ + outptr = q->msg; + len = q->msglen; + if (q->out + q->msglen > q->depth) { + /* Handle wraparound. */ + size_t len1 = q->depth - q->out; + memcpy(outptr, q->data + q->out, len1); + outptr += len1; + len -= len1; + q->nbytes -= len1; + q->out = 0; + } + memcpy(outptr, q->data + q->out, len); + q->nbytes -= len; + q->out += len; + + /* Signal the condition variable. */ + pthread_cond_signal(&q->cond); + /* Unlock the condition variable, we are done. */ + pthread_mutex_unlock(&q->mtx); + if (out_len) + *out_len = q->msglen; + return q->msg; +} + +sbevent_t * +sbevent_init(void) +{ + sbevent_t *evt; + int rv; + + evt = ckd_calloc(1, sizeof(*evt)); + if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) { + E_ERROR("Failed to initialize mutex: %d\n", rv); + ckd_free(evt); + return NULL; + } + if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) { + E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv); + pthread_mutex_destroy(&evt->mtx); + ckd_free(evt); + return NULL; + } + return evt; +} + +void +sbevent_free(sbevent_t *evt) +{ + pthread_mutex_destroy(&evt->mtx); + pthread_cond_destroy(&evt->cond); + ckd_free(evt); +} + +int +sbevent_signal(sbevent_t *evt) +{ + int rv; + + pthread_mutex_lock(&evt->mtx); + evt->signalled = TRUE; + rv = pthread_cond_signal(&evt->cond); + pthread_mutex_unlock(&evt->mtx); + return rv; +} + +int +sbevent_wait(sbevent_t *evt, int sec, int nsec) +{ + int rv = 0; + + /* Lock the mutex before we check its signalled state. */ + pthread_mutex_lock(&evt->mtx); + /* If it's not signalled, then wait until it is. */ + if (!evt->signalled) + rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec); + /* Set its state to unsignalled if we were successful. */ + if (rv == 0) + evt->signalled = FALSE; + /* And unlock its mutex. */ + pthread_mutex_unlock(&evt->mtx); + + return rv; +} + +sbmtx_t * +sbmtx_init(void) +{ + sbmtx_t *mtx; + + mtx = ckd_calloc(1, sizeof(*mtx)); + if (pthread_mutex_init(&mtx->mtx, NULL) != 0) { + ckd_free(mtx); + return NULL; + } + return mtx; +} + +int +sbmtx_trylock(sbmtx_t *mtx) +{ + return pthread_mutex_trylock(&mtx->mtx); +} + +int +sbmtx_lock(sbmtx_t *mtx) +{ + return pthread_mutex_lock(&mtx->mtx); +} + +int +sbmtx_unlock(sbmtx_t *mtx) +{ + return pthread_mutex_unlock(&mtx->mtx); +} + +void +sbmtx_free(sbmtx_t *mtx) +{ + pthread_mutex_destroy(&mtx->mtx); + ckd_free(mtx); +} +#endif /* not WIN32 */ + +cmd_ln_t * +sbthread_config(sbthread_t *th) +{ + return th->config; +} + +void * +sbthread_arg(sbthread_t *th) +{ + return th->arg; +} + +sbmsgq_t * +sbthread_msgq(sbthread_t *th) +{ + return th->msgq; +} + +int +sbthread_send(sbthread_t *th, size_t len, void const *data) +{ + return sbmsgq_send(th->msgq, len, data); +} + +void +sbthread_free(sbthread_t *th) +{ + sbthread_wait(th); + sbmsgq_free(th->msgq); + ckd_free(th); +} |