SphinxBase 5prealpha
sbthread.c
Go to the documentation of this file.
1/* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/* ====================================================================
3 * Copyright (c) 2008 Carnegie Mellon University. All rights
4 * reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 *
18 * This work was supported in part by funding from the Defense Advanced
19 * Research Projects Agency and the National Science Foundation of the
20 * United States of America, and the CMU Sphinx Speech Consortium.
21 *
22 * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND
23 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
24 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY
26 * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 *
34 * ====================================================================
35 *
36 */
37
44#include <string.h>
45
46#include "sphinxbase/sbthread.h"
48#include "sphinxbase/err.h"
49
50/*
51 * Platform-specific parts: threads, mutexes, and signals.
52 */
53#if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__)
54#define _WIN32_WINNT 0x0400
55#include <windows.h>
56
57struct sbthread_s {
58 cmd_ln_t *config;
59 sbmsgq_t *msgq;
60 sbthread_main func;
61 void *arg;
62 HANDLE th;
63 DWORD tid;
64};
65
66struct sbmsgq_s {
67 /* Ringbuffer for passing messages. */
68 char *data;
69 size_t depth;
70 size_t out;
71 size_t nbytes;
72
73 /* Current message is stored here. */
74 char *msg;
75 size_t msglen;
76 CRITICAL_SECTION mtx;
77 HANDLE evt;
78};
79
80struct sbevent_s {
81 HANDLE evt;
82};
83
84struct sbmtx_s {
85 CRITICAL_SECTION mtx;
86};
87
88DWORD WINAPI
89sbthread_internal_main(LPVOID arg)
90{
91 sbthread_t *th = (sbthread_t *)arg;
92 int rv;
93
94 rv = (*th->func)(th);
95 return (DWORD)rv;
96}
97
99sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
100{
101 sbthread_t *th;
102
103 th = ckd_calloc(1, sizeof(*th));
104 th->config = config;
105 th->func = func;
106 th->arg = arg;
107 th->msgq = sbmsgq_init(256);
108 th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
109 if (th->th == NULL) {
110 sbthread_free(th);
111 return NULL;
112 }
113 return th;
114}
115
116int
118{
119 DWORD rv, exit;
120
121 /* It has already been joined. */
122 if (th->th == NULL)
123 return -1;
124
125 rv = WaitForSingleObject(th->th, INFINITE);
126 if (rv == WAIT_FAILED) {
127 E_ERROR("Failed to join thread: WAIT_FAILED\n");
128 return -1;
129 }
130 GetExitCodeThread(th->th, &exit);
131 CloseHandle(th->th);
132 th->th = NULL;
133 return (int)exit;
134}
135
136static DWORD
137cond_timed_wait(HANDLE cond, int sec, int nsec)
138{
139 DWORD rv;
140 if (sec == -1) {
141 rv = WaitForSingleObject(cond, INFINITE);
142 }
143 else {
144 DWORD ms;
145
146 ms = sec * 1000 + nsec / (1000*1000);
147 rv = WaitForSingleObject(cond, ms);
148 }
149 return rv;
150}
151
152/* Updated to use Unicode */
153sbevent_t *
154sbevent_init(void)
155{
156 sbevent_t *evt;
157
158 evt = ckd_calloc(1, sizeof(*evt));
159 evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
160 if (evt->evt == NULL) {
161 ckd_free(evt);
162 return NULL;
163 }
164 return evt;
165}
166
167void
169{
170 CloseHandle(evt->evt);
171 ckd_free(evt);
172}
173
174int
176{
177 return SetEvent(evt->evt) ? 0 : -1;
178}
179
180int
181sbevent_wait(sbevent_t *evt, int sec, int nsec)
182{
183 DWORD rv;
184
185 rv = cond_timed_wait(evt->evt, sec, nsec);
186 return rv;
187}
188
189sbmtx_t *
190sbmtx_init(void)
191{
192 sbmtx_t *mtx;
193
194 mtx = ckd_calloc(1, sizeof(*mtx));
195 InitializeCriticalSection(&mtx->mtx);
196 return mtx;
197}
198
199int
201{
202 return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
203}
204
205int
207{
208 EnterCriticalSection(&mtx->mtx);
209 return 0;
210}
211
212int
214{
215 LeaveCriticalSection(&mtx->mtx);
216 return 0;
217}
218
219void
221{
222 DeleteCriticalSection(&mtx->mtx);
223 ckd_free(mtx);
224}
225
226sbmsgq_t *
227sbmsgq_init(size_t depth)
228{
229 sbmsgq_t *msgq;
230
231 msgq = ckd_calloc(1, sizeof(*msgq));
232 msgq->depth = depth;
233 msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
234 if (msgq->evt == NULL) {
235 ckd_free(msgq);
236 return NULL;
237 }
238 InitializeCriticalSection(&msgq->mtx);
239 msgq->data = ckd_calloc(depth, 1);
240 msgq->msg = ckd_calloc(depth, 1);
241 return msgq;
242}
243
244void
246{
247 CloseHandle(msgq->evt);
248 ckd_free(msgq->data);
249 ckd_free(msgq->msg);
250 ckd_free(msgq);
251}
252
253int
254sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
255{
256 char const *cdata = (char const *)data;
257 size_t in;
258
259 /* Don't allow things bigger than depth to be sent! */
260 if (len + sizeof(len) > q->depth)
261 return -1;
262
263 if (q->nbytes + len + sizeof(len) > q->depth)
264 WaitForSingleObject(q->evt, INFINITE);
265
266 /* Lock things while we manipulate the buffer (FIXME: this
267 actually should have been atomic with the wait above ...) */
268 EnterCriticalSection(&q->mtx);
269 in = (q->out + q->nbytes) % q->depth;
270 /* First write the size of the message. */
271 if (in + sizeof(len) > q->depth) {
272 /* Handle the annoying case where the size field gets wrapped around. */
273 size_t len1 = q->depth - in;
274 memcpy(q->data + in, &len, len1);
275 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
276 q->nbytes += sizeof(len);
277 in = sizeof(len) - len1;
278 }
279 else {
280 memcpy(q->data + in, &len, sizeof(len));
281 q->nbytes += sizeof(len);
282 in += sizeof(len);
283 }
284
285 /* Now write the message body. */
286 if (in + len > q->depth) {
287 /* Handle wraparound. */
288 size_t len1 = q->depth - in;
289 memcpy(q->data + in, cdata, len1);
290 q->nbytes += len1;
291 cdata += len1;
292 len -= len1;
293 in = 0;
294 }
295 memcpy(q->data + in, cdata, len);
296 q->nbytes += len;
297
298 /* Signal the condition variable. */
299 SetEvent(q->evt);
300 /* Unlock. */
301 LeaveCriticalSection(&q->mtx);
302
303 return 0;
304}
305
306void *
307sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
308{
309 char *outptr;
310 size_t len;
311
312 /* Wait for data to be available. */
313 if (q->nbytes == 0) {
314 if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
315 /* Timed out or something... */
316 return NULL;
317 }
318 /* Lock to manipulate the queue (FIXME) */
319 EnterCriticalSection(&q->mtx);
320 /* Get the message size. */
321 if (q->out + sizeof(q->msglen) > q->depth) {
322 /* Handle annoying wraparound case. */
323 size_t len1 = q->depth - q->out;
324 memcpy(&q->msglen, q->data + q->out, len1);
325 memcpy(((char *)&q->msglen) + len1, q->data,
326 sizeof(q->msglen) - len1);
327 q->out = sizeof(q->msglen) - len1;
328 }
329 else {
330 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
331 q->out += sizeof(q->msglen);
332 }
333 q->nbytes -= sizeof(q->msglen);
334 /* Get the message body. */
335 outptr = q->msg;
336 len = q->msglen;
337 if (q->out + q->msglen > q->depth) {
338 /* Handle wraparound. */
339 size_t len1 = q->depth - q->out;
340 memcpy(outptr, q->data + q->out, len1);
341 outptr += len1;
342 len -= len1;
343 q->nbytes -= len1;
344 q->out = 0;
345 }
346 memcpy(outptr, q->data + q->out, len);
347 q->nbytes -= len;
348 q->out += len;
349
350 /* Signal the condition variable. */
351 SetEvent(q->evt);
352 /* Unlock. */
353 LeaveCriticalSection(&q->mtx);
354 if (out_len)
355 *out_len = q->msglen;
356 return q->msg;
357}
358
359#else /* POSIX */
360#include <pthread.h>
361#include <sys/time.h>
362
364 cmd_ln_t *config;
365 sbmsgq_t *msgq;
366 sbthread_main func;
367 void *arg;
368 pthread_t th;
369};
370
371struct sbmsgq_s {
372 /* Ringbuffer for passing messages. */
373 char *data;
374 size_t depth;
375 size_t out;
376 size_t nbytes;
377
378 /* Current message is stored here. */
379 char *msg;
380 size_t msglen;
381 pthread_mutex_t mtx;
382 pthread_cond_t cond;
383};
384
385struct sbevent_s {
386 pthread_mutex_t mtx;
387 pthread_cond_t cond;
388 int signalled;
389};
390
391struct sbmtx_s {
392 pthread_mutex_t mtx;
393};
394
395static void *
396sbthread_internal_main(void *arg)
397{
398 sbthread_t *th = (sbthread_t *)arg;
399 int rv;
400
401 rv = (*th->func)(th);
402 return (void *)(long)rv;
403}
404
406sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
407{
408 sbthread_t *th;
409 int rv;
410
411 th = ckd_calloc(1, sizeof(*th));
412 th->config = config;
413 th->func = func;
414 th->arg = arg;
415 th->msgq = sbmsgq_init(1024);
416 if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
417 E_ERROR("Failed to create thread: %d\n", rv);
418 sbthread_free(th);
419 return NULL;
420 }
421 return th;
422}
423
424int
426{
427 void *exit;
428 int rv;
429
430 /* It has already been joined. */
431 if (th->th == (pthread_t)-1)
432 return -1;
433
434 rv = pthread_join(th->th, &exit);
435 if (rv != 0) {
436 E_ERROR("Failed to join thread: %d\n", rv);
437 return -1;
438 }
439 th->th = (pthread_t)-1;
440 return (int)(long)exit;
441}
442
443sbmsgq_t *
444sbmsgq_init(size_t depth)
445{
446 sbmsgq_t *msgq;
447
448 msgq = ckd_calloc(1, sizeof(*msgq));
449 msgq->depth = depth;
450 if (pthread_cond_init(&msgq->cond, NULL) != 0) {
451 ckd_free(msgq);
452 return NULL;
453 }
454 if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
455 pthread_cond_destroy(&msgq->cond);
456 ckd_free(msgq);
457 return NULL;
458 }
459 msgq->data = ckd_calloc(depth, 1);
460 msgq->msg = ckd_calloc(depth, 1);
461 return msgq;
462}
463
464void
466{
467 pthread_mutex_destroy(&msgq->mtx);
468 pthread_cond_destroy(&msgq->cond);
469 ckd_free(msgq->data);
470 ckd_free(msgq->msg);
471 ckd_free(msgq);
472}
473
474int
475sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
476{
477 size_t in;
478
479 /* Don't allow things bigger than depth to be sent! */
480 if (len + sizeof(len) > q->depth)
481 return -1;
482
483 /* Lock the condition variable while we manipulate the buffer. */
484 pthread_mutex_lock(&q->mtx);
485 if (q->nbytes + len + sizeof(len) > q->depth) {
486 /* Unlock and wait for space to be available. */
487 if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
488 /* Timed out, don't send anything. */
489 pthread_mutex_unlock(&q->mtx);
490 return -1;
491 }
492 /* Condition is now locked again. */
493 }
494 in = (q->out + q->nbytes) % q->depth;
495
496 /* First write the size of the message. */
497 if (in + sizeof(len) > q->depth) {
498 /* Handle the annoying case where the size field gets wrapped around. */
499 size_t len1 = q->depth - in;
500 memcpy(q->data + in, &len, len1);
501 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
502 q->nbytes += sizeof(len);
503 in = sizeof(len) - len1;
504 }
505 else {
506 memcpy(q->data + in, &len, sizeof(len));
507 q->nbytes += sizeof(len);
508 in += sizeof(len);
509 }
510
511 /* Now write the message body. */
512 if (in + len > q->depth) {
513 /* Handle wraparound. */
514 size_t len1 = q->depth - in;
515 memcpy(q->data + in, data, len1);
516 q->nbytes += len1;
517 data = (char const *)data + len1;
518 len -= len1;
519 in = 0;
520 }
521 memcpy(q->data + in, data, len);
522 q->nbytes += len;
523
524 /* Signal the condition variable. */
525 pthread_cond_signal(&q->cond);
526 /* Unlock it, we have nothing else to do. */
527 pthread_mutex_unlock(&q->mtx);
528 return 0;
529}
530
531static int
532cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
533{
534 int rv;
535 if (sec == -1) {
536 rv = pthread_cond_wait(cond, mtx);
537 }
538 else {
539 struct timeval now;
540 struct timespec end;
541
542 gettimeofday(&now, NULL);
543 end.tv_sec = now.tv_sec + sec;
544 end.tv_nsec = now.tv_usec * 1000 + nsec;
545 if (end.tv_nsec > (1000*1000*1000)) {
546 sec += end.tv_nsec / (1000*1000*1000);
547 end.tv_nsec = end.tv_nsec % (1000*1000*1000);
548 }
549 rv = pthread_cond_timedwait(cond, mtx, &end);
550 }
551 return rv;
552}
553
554void *
555sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
556{
557 char *outptr;
558 size_t len;
559
560 /* Lock the condition variable while we manipulate nmsg. */
561 pthread_mutex_lock(&q->mtx);
562 if (q->nbytes == 0) {
563 /* Unlock the condition variable and wait for a signal. */
564 if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
565 /* Timed out or something... */
566 pthread_mutex_unlock(&q->mtx);
567 return NULL;
568 }
569 /* Condition variable is now locked again. */
570 }
571 /* Get the message size. */
572 if (q->out + sizeof(q->msglen) > q->depth) {
573 /* Handle annoying wraparound case. */
574 size_t len1 = q->depth - q->out;
575 memcpy(&q->msglen, q->data + q->out, len1);
576 memcpy(((char *)&q->msglen) + len1, q->data,
577 sizeof(q->msglen) - len1);
578 q->out = sizeof(q->msglen) - len1;
579 }
580 else {
581 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
582 q->out += sizeof(q->msglen);
583 }
584 q->nbytes -= sizeof(q->msglen);
585 /* Get the message body. */
586 outptr = q->msg;
587 len = q->msglen;
588 if (q->out + q->msglen > q->depth) {
589 /* Handle wraparound. */
590 size_t len1 = q->depth - q->out;
591 memcpy(outptr, q->data + q->out, len1);
592 outptr += len1;
593 len -= len1;
594 q->nbytes -= len1;
595 q->out = 0;
596 }
597 memcpy(outptr, q->data + q->out, len);
598 q->nbytes -= len;
599 q->out += len;
600
601 /* Signal the condition variable. */
602 pthread_cond_signal(&q->cond);
603 /* Unlock the condition variable, we are done. */
604 pthread_mutex_unlock(&q->mtx);
605 if (out_len)
606 *out_len = q->msglen;
607 return q->msg;
608}
609
610sbevent_t *
612{
613 sbevent_t *evt;
614 int rv;
615
616 evt = ckd_calloc(1, sizeof(*evt));
617 if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
618 E_ERROR("Failed to initialize mutex: %d\n", rv);
619 ckd_free(evt);
620 return NULL;
621 }
622 if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
623 E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
624 pthread_mutex_destroy(&evt->mtx);
625 ckd_free(evt);
626 return NULL;
627 }
628 return evt;
629}
630
631void
633{
634 pthread_mutex_destroy(&evt->mtx);
635 pthread_cond_destroy(&evt->cond);
636 ckd_free(evt);
637}
638
639int
641{
642 int rv;
643
644 pthread_mutex_lock(&evt->mtx);
645 evt->signalled = TRUE;
646 rv = pthread_cond_signal(&evt->cond);
647 pthread_mutex_unlock(&evt->mtx);
648 return rv;
649}
650
651int
652sbevent_wait(sbevent_t *evt, int sec, int nsec)
653{
654 int rv = 0;
655
656 /* Lock the mutex before we check its signalled state. */
657 pthread_mutex_lock(&evt->mtx);
658 /* If it's not signalled, then wait until it is. */
659 if (!evt->signalled)
660 rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
661 /* Set its state to unsignalled if we were successful. */
662 if (rv == 0)
663 evt->signalled = FALSE;
664 /* And unlock its mutex. */
665 pthread_mutex_unlock(&evt->mtx);
666
667 return rv;
668}
669
670sbmtx_t *
672{
673 sbmtx_t *mtx;
674
675 mtx = ckd_calloc(1, sizeof(*mtx));
676 if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
677 ckd_free(mtx);
678 return NULL;
679 }
680 return mtx;
681}
682
683int
685{
686 return pthread_mutex_trylock(&mtx->mtx);
687}
688
689int
691{
692 return pthread_mutex_lock(&mtx->mtx);
693}
694
695int
697{
698 return pthread_mutex_unlock(&mtx->mtx);
699}
700
701void
703{
704 pthread_mutex_destroy(&mtx->mtx);
705 ckd_free(mtx);
706}
707#endif /* not WIN32 */
708
709cmd_ln_t *
711{
712 return th->config;
713}
714
715void *
717{
718 return th->arg;
719}
720
721sbmsgq_t *
723{
724 return th->msgq;
725}
726
727int
728sbthread_send(sbthread_t *th, size_t len, void const *data)
729{
730 return sbmsgq_send(th->msgq, len, data);
731}
732
733void
735{
736 sbthread_wait(th);
737 sbmsgq_free(th->msgq);
738 ckd_free(th);
739}
Sphinx's memory allocation/deallocation routines.
SPHINXBASE_EXPORT void ckd_free(void *ptr)
Test and free a 1-D array.
Definition ckd_alloc.c:244
#define ckd_calloc(n, sz)
Macros to simplify the use of above functions.
Definition ckd_alloc.h:248
Implementation of logging routines.
#define E_ERROR(...)
Print error message to error log.
Definition err.h:104
#define E_ERROR_SYSTEM(...)
Print error text; Call perror("");.
Definition err.h:99
int sbmtx_lock(sbmtx_t *mtx)
Acquire a mutex.
Definition sbthread.c:690
sbthread_t * sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
Start a new thread.
Definition sbthread.c:406
sbmtx_t * sbmtx_init(void)
Create a mutex.
Definition sbthread.c:671
void * sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
Wait for a message from a queue.
Definition sbthread.c:555
sbmsgq_t * sbthread_msgq(sbthread_t *th)
Get message queue from a thread.
Definition sbthread.c:722
void sbthread_free(sbthread_t *th)
Free a thread object.
Definition sbthread.c:734
sbevent_t * sbevent_init(void)
Initialize an event.
Definition sbthread.c:611
void * sbthread_arg(sbthread_t *th)
Get argument pointer from a thread.
Definition sbthread.c:716
int sbevent_signal(sbevent_t *evt)
Signal an event.
Definition sbthread.c:640
sbmsgq_t * sbmsgq_init(size_t depth)
Create a message queue.
Definition sbthread.c:444
int sbevent_wait(sbevent_t *evt, int sec, int nsec)
Wait for an event to be signalled.
Definition sbthread.c:652
void sbmsgq_free(sbmsgq_t *msgq)
Free a message queue.
Definition sbthread.c:465
int sbmtx_trylock(sbmtx_t *mtx)
Try to acquire a mutex.
Definition sbthread.c:684
int sbmtx_unlock(sbmtx_t *mtx)
Release a mutex.
Definition sbthread.c:696
void sbevent_free(sbevent_t *evt)
Free an event.
Definition sbthread.c:632
cmd_ln_t * sbthread_config(sbthread_t *th)
Get configuration object from a thread.
Definition sbthread.c:710
int sbthread_wait(sbthread_t *th)
Wait for a thread to complete.
Definition sbthread.c:425
int sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
Post a message to a queue.
Definition sbthread.c:475
void sbmtx_free(sbmtx_t *mtx)
Dispose of a mutex.
Definition sbthread.c:702
int sbthread_send(sbthread_t *th, size_t len, void const *data)
Send an asynchronous message to a thread.
Definition sbthread.c:728
Simple portable thread functions.
int(* sbthread_main)(sbthread_t *th)
Entry point for a thread.
Definition sbthread.h:82
Opaque structure used to hold the results of command-line parsing.