1 /* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /* ====================================================================
3  * Copyright (c) 2008 Carnegie Mellon University.  All rights
4  * reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  *
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  *
18  * This work was supported in part by funding from the Defense Advanced
19  * Research Projects Agency and the National Science Foundation of the
20  * United States of America, and the CMU Sphinx Speech Consortium.
21  *
22  * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND
23  * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
24  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY
26  * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  *
34  * ====================================================================
35  *
36  */
37 
38 /**
39  * @file sbthread.c
40  * @brief Simple portable thread functions
41  * @author David Huggins-Daines <dhuggins@cs.cmu.edu>
42  */
43 
44 #include <string.h>
45 
46 #include "sphinxbase/sbthread.h"
47 #include "sphinxbase/ckd_alloc.h"
48 #include "sphinxbase/err.h"
49 
50 /*
51  * Platform-specific parts: threads, mutexes, and signals.
52  */
53 #if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__)
54 #ifndef _WIN32_WINNT
55 #define _WIN32_WINNT 0x0400
56 #endif /* not _WIN32_WINNT */
57 #include <windows.h>
58 
59 struct sbthread_s {
60     cmd_ln_t *config;
61     sbmsgq_t *msgq;
62     sbthread_main func;
63     void *arg;
64     HANDLE th;
65     DWORD tid;
66 };
67 
68 struct sbmsgq_s {
69     /* Ringbuffer for passing messages. */
70     char *data;
71     size_t depth;
72     size_t out;
73     size_t nbytes;
74 
75     /* Current message is stored here. */
76     char *msg;
77     size_t msglen;
78     CRITICAL_SECTION mtx;
79     HANDLE evt;
80 };
81 
82 struct sbevent_s {
83     HANDLE evt;
84 };
85 
86 struct sbmtx_s {
87     CRITICAL_SECTION mtx;
88 };
89 
90 DWORD WINAPI
sbthread_internal_main(LPVOID arg)91 sbthread_internal_main(LPVOID arg)
92 {
93     sbthread_t *th = (sbthread_t *)arg;
94     int rv;
95 
96     rv = (*th->func)(th);
97     return (DWORD)rv;
98 }
99 
100 sbthread_t *
sbthread_start(cmd_ln_t * config,sbthread_main func,void * arg)101 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
102 {
103     sbthread_t *th;
104 
105     th = ckd_calloc(1, sizeof(*th));
106     th->config = config;
107     th->func = func;
108     th->arg = arg;
109     th->msgq = sbmsgq_init(256);
110     th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
111     if (th->th == NULL) {
112         sbthread_free(th);
113         return NULL;
114     }
115     return th;
116 }
117 
118 int
sbthread_wait(sbthread_t * th)119 sbthread_wait(sbthread_t *th)
120 {
121     DWORD rv, exit;
122 
123     /* It has already been joined. */
124     if (th->th == NULL)
125         return -1;
126 
127     rv = WaitForSingleObject(th->th, INFINITE);
128     if (rv == WAIT_FAILED) {
129         E_ERROR("Failed to join thread: WAIT_FAILED\n");
130         return -1;
131     }
132     GetExitCodeThread(th->th, &exit);
133     CloseHandle(th->th);
134     th->th = NULL;
135     return (int)exit;
136 }
137 
138 static DWORD
cond_timed_wait(HANDLE cond,int sec,int nsec)139 cond_timed_wait(HANDLE cond, int sec, int nsec)
140 {
141     DWORD rv;
142     if (sec == -1) {
143         rv = WaitForSingleObject(cond, INFINITE);
144     }
145     else {
146         DWORD ms;
147 
148         ms = sec * 1000 + nsec / (1000*1000);
149         rv = WaitForSingleObject(cond, ms);
150     }
151     return rv;
152 }
153 
154 /* Updated to use Unicode */
155 sbevent_t *
sbevent_init(void)156 sbevent_init(void)
157 {
158     sbevent_t *evt;
159 
160     evt = ckd_calloc(1, sizeof(*evt));
161     evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
162     if (evt->evt == NULL) {
163         ckd_free(evt);
164         return NULL;
165     }
166     return evt;
167 }
168 
169 void
sbevent_free(sbevent_t * evt)170 sbevent_free(sbevent_t *evt)
171 {
172     CloseHandle(evt->evt);
173     ckd_free(evt);
174 }
175 
176 int
sbevent_signal(sbevent_t * evt)177 sbevent_signal(sbevent_t *evt)
178 {
179     return SetEvent(evt->evt) ? 0 : -1;
180 }
181 
182 int
sbevent_wait(sbevent_t * evt,int sec,int nsec)183 sbevent_wait(sbevent_t *evt, int sec, int nsec)
184 {
185     DWORD rv;
186 
187     rv = cond_timed_wait(evt->evt, sec, nsec);
188     return rv;
189 }
190 
191 sbmtx_t *
sbmtx_init(void)192 sbmtx_init(void)
193 {
194     sbmtx_t *mtx;
195 
196     mtx = ckd_calloc(1, sizeof(*mtx));
197     InitializeCriticalSection(&mtx->mtx);
198     return mtx;
199 }
200 
201 int
sbmtx_trylock(sbmtx_t * mtx)202 sbmtx_trylock(sbmtx_t *mtx)
203 {
204     return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
205 }
206 
207 int
sbmtx_lock(sbmtx_t * mtx)208 sbmtx_lock(sbmtx_t *mtx)
209 {
210     EnterCriticalSection(&mtx->mtx);
211     return 0;
212 }
213 
214 int
sbmtx_unlock(sbmtx_t * mtx)215 sbmtx_unlock(sbmtx_t *mtx)
216 {
217     LeaveCriticalSection(&mtx->mtx);
218     return 0;
219 }
220 
221 void
sbmtx_free(sbmtx_t * mtx)222 sbmtx_free(sbmtx_t *mtx)
223 {
224     DeleteCriticalSection(&mtx->mtx);
225     ckd_free(mtx);
226 }
227 
228 sbmsgq_t *
sbmsgq_init(size_t depth)229 sbmsgq_init(size_t depth)
230 {
231     sbmsgq_t *msgq;
232 
233     msgq = ckd_calloc(1, sizeof(*msgq));
234     msgq->depth = depth;
235     msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
236     if (msgq->evt == NULL) {
237         ckd_free(msgq);
238         return NULL;
239     }
240     InitializeCriticalSection(&msgq->mtx);
241     msgq->data = ckd_calloc(depth, 1);
242     msgq->msg = ckd_calloc(depth, 1);
243     return msgq;
244 }
245 
246 void
sbmsgq_free(sbmsgq_t * msgq)247 sbmsgq_free(sbmsgq_t *msgq)
248 {
249     CloseHandle(msgq->evt);
250     ckd_free(msgq->data);
251     ckd_free(msgq->msg);
252     ckd_free(msgq);
253 }
254 
255 int
sbmsgq_send(sbmsgq_t * q,size_t len,void const * data)256 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
257 {
258     char const *cdata = (char const *)data;
259     size_t in;
260 
261     /* Don't allow things bigger than depth to be sent! */
262     if (len + sizeof(len) > q->depth)
263         return -1;
264 
265     if (q->nbytes + len + sizeof(len) > q->depth)
266         WaitForSingleObject(q->evt, INFINITE);
267 
268     /* Lock things while we manipulate the buffer (FIXME: this
269        actually should have been atomic with the wait above ...) */
270     EnterCriticalSection(&q->mtx);
271     in = (q->out + q->nbytes) % q->depth;
272     /* First write the size of the message. */
273     if (in + sizeof(len) > q->depth) {
274         /* Handle the annoying case where the size field gets wrapped around. */
275         size_t len1 = q->depth - in;
276         memcpy(q->data + in, &len, len1);
277         memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
278         q->nbytes += sizeof(len);
279         in = sizeof(len) - len1;
280     }
281     else {
282         memcpy(q->data + in, &len, sizeof(len));
283         q->nbytes += sizeof(len);
284         in += sizeof(len);
285     }
286 
287     /* Now write the message body. */
288     if (in + len > q->depth) {
289         /* Handle wraparound. */
290         size_t len1 = q->depth - in;
291         memcpy(q->data + in, cdata, len1);
292         q->nbytes += len1;
293         cdata += len1;
294         len -= len1;
295         in = 0;
296     }
297     memcpy(q->data + in, cdata, len);
298     q->nbytes += len;
299 
300     /* Signal the condition variable. */
301     SetEvent(q->evt);
302     /* Unlock. */
303     LeaveCriticalSection(&q->mtx);
304 
305     return 0;
306 }
307 
308 void *
sbmsgq_wait(sbmsgq_t * q,size_t * out_len,int sec,int nsec)309 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
310 {
311     char *outptr;
312     size_t len;
313 
314     /* Wait for data to be available. */
315     if (q->nbytes == 0) {
316         if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
317             /* Timed out or something... */
318             return NULL;
319     }
320     /* Lock to manipulate the queue (FIXME) */
321     EnterCriticalSection(&q->mtx);
322     /* Get the message size. */
323     if (q->out + sizeof(q->msglen) > q->depth) {
324         /* Handle annoying wraparound case. */
325         size_t len1 = q->depth - q->out;
326         memcpy(&q->msglen, q->data + q->out, len1);
327         memcpy(((char *)&q->msglen) + len1, q->data,
328                sizeof(q->msglen) - len1);
329         q->out = sizeof(q->msglen) - len1;
330     }
331     else {
332         memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
333         q->out += sizeof(q->msglen);
334     }
335     q->nbytes -= sizeof(q->msglen);
336     /* Get the message body. */
337     outptr = q->msg;
338     len = q->msglen;
339     if (q->out + q->msglen > q->depth) {
340         /* Handle wraparound. */
341         size_t len1 = q->depth - q->out;
342         memcpy(outptr, q->data + q->out, len1);
343         outptr += len1;
344         len -= len1;
345         q->nbytes -= len1;
346         q->out = 0;
347     }
348     memcpy(outptr, q->data + q->out, len);
349     q->nbytes -= len;
350     q->out += len;
351 
352     /* Signal the condition variable. */
353     SetEvent(q->evt);
354     /* Unlock. */
355     LeaveCriticalSection(&q->mtx);
356     if (out_len)
357         *out_len = q->msglen;
358     return q->msg;
359 }
360 
361 #else /* POSIX */
362 #include <pthread.h>
363 #include <sys/time.h>
364 
365 struct sbthread_s {
366     cmd_ln_t *config;
367     sbmsgq_t *msgq;
368     sbthread_main func;
369     void *arg;
370     pthread_t th;
371 };
372 
373 struct sbmsgq_s {
374     /* Ringbuffer for passing messages. */
375     char *data;
376     size_t depth;
377     size_t out;
378     size_t nbytes;
379 
380     /* Current message is stored here. */
381     char *msg;
382     size_t msglen;
383     pthread_mutex_t mtx;
384     pthread_cond_t cond;
385 };
386 
387 struct sbevent_s {
388     pthread_mutex_t mtx;
389     pthread_cond_t cond;
390     int signalled;
391 };
392 
393 struct sbmtx_s {
394     pthread_mutex_t mtx;
395 };
396 
397 static void *
sbthread_internal_main(void * arg)398 sbthread_internal_main(void *arg)
399 {
400     sbthread_t *th = (sbthread_t *)arg;
401     int rv;
402 
403     rv = (*th->func)(th);
404     return (void *)(long)rv;
405 }
406 
407 sbthread_t *
sbthread_start(cmd_ln_t * config,sbthread_main func,void * arg)408 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
409 {
410     sbthread_t *th;
411     int rv;
412 
413     th = ckd_calloc(1, sizeof(*th));
414     th->config = config;
415     th->func = func;
416     th->arg = arg;
417     th->msgq = sbmsgq_init(1024);
418     if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
419         E_ERROR("Failed to create thread: %d\n", rv);
420         sbthread_free(th);
421         return NULL;
422     }
423     return th;
424 }
425 
426 int
sbthread_wait(sbthread_t * th)427 sbthread_wait(sbthread_t *th)
428 {
429     void *exit;
430     int rv;
431 
432     /* It has already been joined. */
433     if (th->th == (pthread_t)-1)
434         return -1;
435 
436     rv = pthread_join(th->th, &exit);
437     if (rv != 0) {
438         E_ERROR("Failed to join thread: %d\n", rv);
439         return -1;
440     }
441     th->th = (pthread_t)-1;
442     return (int)(long)exit;
443 }
444 
445 sbmsgq_t *
sbmsgq_init(size_t depth)446 sbmsgq_init(size_t depth)
447 {
448     sbmsgq_t *msgq;
449 
450     msgq = ckd_calloc(1, sizeof(*msgq));
451     msgq->depth = depth;
452     if (pthread_cond_init(&msgq->cond, NULL) != 0) {
453         ckd_free(msgq);
454         return NULL;
455     }
456     if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
457         pthread_cond_destroy(&msgq->cond);
458         ckd_free(msgq);
459         return NULL;
460     }
461     msgq->data = ckd_calloc(depth, 1);
462     msgq->msg = ckd_calloc(depth, 1);
463     return msgq;
464 }
465 
466 void
sbmsgq_free(sbmsgq_t * msgq)467 sbmsgq_free(sbmsgq_t *msgq)
468 {
469     pthread_mutex_destroy(&msgq->mtx);
470     pthread_cond_destroy(&msgq->cond);
471     ckd_free(msgq->data);
472     ckd_free(msgq->msg);
473     ckd_free(msgq);
474 }
475 
476 int
sbmsgq_send(sbmsgq_t * q,size_t len,void const * data)477 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
478 {
479     size_t in;
480 
481     /* Don't allow things bigger than depth to be sent! */
482     if (len + sizeof(len) > q->depth)
483         return -1;
484 
485     /* Lock the condition variable while we manipulate the buffer. */
486     pthread_mutex_lock(&q->mtx);
487     if (q->nbytes + len + sizeof(len) > q->depth) {
488         /* Unlock and wait for space to be available. */
489         if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
490             /* Timed out, don't send anything. */
491             pthread_mutex_unlock(&q->mtx);
492             return -1;
493         }
494         /* Condition is now locked again. */
495     }
496     in = (q->out + q->nbytes) % q->depth;
497 
498     /* First write the size of the message. */
499     if (in + sizeof(len) > q->depth) {
500         /* Handle the annoying case where the size field gets wrapped around. */
501         size_t len1 = q->depth - in;
502         memcpy(q->data + in, &len, len1);
503         memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
504         q->nbytes += sizeof(len);
505         in = sizeof(len) - len1;
506     }
507     else {
508         memcpy(q->data + in, &len, sizeof(len));
509         q->nbytes += sizeof(len);
510         in += sizeof(len);
511     }
512 
513     /* Now write the message body. */
514     if (in + len > q->depth) {
515         /* Handle wraparound. */
516         size_t len1 = q->depth - in;
517         memcpy(q->data + in, data, len1);
518         q->nbytes += len1;
519         data = (char const *)data + len1;
520         len -= len1;
521         in = 0;
522     }
523     memcpy(q->data + in, data, len);
524     q->nbytes += len;
525 
526     /* Signal the condition variable. */
527     pthread_cond_signal(&q->cond);
528     /* Unlock it, we have nothing else to do. */
529     pthread_mutex_unlock(&q->mtx);
530     return 0;
531 }
532 
533 static int
cond_timed_wait(pthread_cond_t * cond,pthread_mutex_t * mtx,int sec,int nsec)534 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
535 {
536     int rv;
537     if (sec == -1) {
538         rv = pthread_cond_wait(cond, mtx);
539     }
540     else {
541         struct timeval now;
542         struct timespec end;
543 
544         gettimeofday(&now, NULL);
545         end.tv_sec = now.tv_sec + sec;
546         end.tv_nsec = now.tv_usec * 1000 + nsec;
547         if (end.tv_nsec > (1000*1000*1000)) {
548             sec += end.tv_nsec / (1000*1000*1000);
549             end.tv_nsec = end.tv_nsec % (1000*1000*1000);
550         }
551         rv = pthread_cond_timedwait(cond, mtx, &end);
552     }
553     return rv;
554 }
555 
556 void *
sbmsgq_wait(sbmsgq_t * q,size_t * out_len,int sec,int nsec)557 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
558 {
559     char *outptr;
560     size_t len;
561 
562     /* Lock the condition variable while we manipulate nmsg. */
563     pthread_mutex_lock(&q->mtx);
564     if (q->nbytes == 0) {
565         /* Unlock the condition variable and wait for a signal. */
566         if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
567             /* Timed out or something... */
568             pthread_mutex_unlock(&q->mtx);
569             return NULL;
570         }
571         /* Condition variable is now locked again. */
572     }
573     /* Get the message size. */
574     if (q->out + sizeof(q->msglen) > q->depth) {
575         /* Handle annoying wraparound case. */
576         size_t len1 = q->depth - q->out;
577         memcpy(&q->msglen, q->data + q->out, len1);
578         memcpy(((char *)&q->msglen) + len1, q->data,
579                sizeof(q->msglen) - len1);
580         q->out = sizeof(q->msglen) - len1;
581     }
582     else {
583         memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
584         q->out += sizeof(q->msglen);
585     }
586     q->nbytes -= sizeof(q->msglen);
587     /* Get the message body. */
588     outptr = q->msg;
589     len = q->msglen;
590     if (q->out + q->msglen > q->depth) {
591         /* Handle wraparound. */
592         size_t len1 = q->depth - q->out;
593         memcpy(outptr, q->data + q->out, len1);
594         outptr += len1;
595         len -= len1;
596         q->nbytes -= len1;
597         q->out = 0;
598     }
599     memcpy(outptr, q->data + q->out, len);
600     q->nbytes -= len;
601     q->out += len;
602 
603     /* Signal the condition variable. */
604     pthread_cond_signal(&q->cond);
605     /* Unlock the condition variable, we are done. */
606     pthread_mutex_unlock(&q->mtx);
607     if (out_len)
608         *out_len = q->msglen;
609     return q->msg;
610 }
611 
612 sbevent_t *
sbevent_init(void)613 sbevent_init(void)
614 {
615     sbevent_t *evt;
616     int rv;
617 
618     evt = ckd_calloc(1, sizeof(*evt));
619     if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
620         E_ERROR("Failed to initialize mutex: %d\n", rv);
621         ckd_free(evt);
622         return NULL;
623     }
624     if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
625         E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
626         pthread_mutex_destroy(&evt->mtx);
627         ckd_free(evt);
628         return NULL;
629     }
630     return evt;
631 }
632 
633 void
sbevent_free(sbevent_t * evt)634 sbevent_free(sbevent_t *evt)
635 {
636     pthread_mutex_destroy(&evt->mtx);
637     pthread_cond_destroy(&evt->cond);
638     ckd_free(evt);
639 }
640 
641 int
sbevent_signal(sbevent_t * evt)642 sbevent_signal(sbevent_t *evt)
643 {
644     int rv;
645 
646     pthread_mutex_lock(&evt->mtx);
647     evt->signalled = TRUE;
648     rv = pthread_cond_signal(&evt->cond);
649     pthread_mutex_unlock(&evt->mtx);
650     return rv;
651 }
652 
653 int
sbevent_wait(sbevent_t * evt,int sec,int nsec)654 sbevent_wait(sbevent_t *evt, int sec, int nsec)
655 {
656     int rv = 0;
657 
658     /* Lock the mutex before we check its signalled state. */
659     pthread_mutex_lock(&evt->mtx);
660     /* If it's not signalled, then wait until it is. */
661     if (!evt->signalled)
662         rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
663     /* Set its state to unsignalled if we were successful. */
664     if (rv == 0)
665         evt->signalled = FALSE;
666     /* And unlock its mutex. */
667     pthread_mutex_unlock(&evt->mtx);
668 
669     return rv;
670 }
671 
672 sbmtx_t *
sbmtx_init(void)673 sbmtx_init(void)
674 {
675     sbmtx_t *mtx;
676 
677     mtx = ckd_calloc(1, sizeof(*mtx));
678     if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
679         ckd_free(mtx);
680         return NULL;
681     }
682     return mtx;
683 }
684 
685 int
sbmtx_trylock(sbmtx_t * mtx)686 sbmtx_trylock(sbmtx_t *mtx)
687 {
688     return pthread_mutex_trylock(&mtx->mtx);
689 }
690 
691 int
sbmtx_lock(sbmtx_t * mtx)692 sbmtx_lock(sbmtx_t *mtx)
693 {
694     return pthread_mutex_lock(&mtx->mtx);
695 }
696 
697 int
sbmtx_unlock(sbmtx_t * mtx)698 sbmtx_unlock(sbmtx_t *mtx)
699 {
700     return pthread_mutex_unlock(&mtx->mtx);
701 }
702 
703 void
sbmtx_free(sbmtx_t * mtx)704 sbmtx_free(sbmtx_t *mtx)
705 {
706     pthread_mutex_destroy(&mtx->mtx);
707     ckd_free(mtx);
708 }
709 #endif /* not WIN32 */
710 
711 cmd_ln_t *
sbthread_config(sbthread_t * th)712 sbthread_config(sbthread_t *th)
713 {
714     return th->config;
715 }
716 
717 void *
sbthread_arg(sbthread_t * th)718 sbthread_arg(sbthread_t *th)
719 {
720     return th->arg;
721 }
722 
723 sbmsgq_t *
sbthread_msgq(sbthread_t * th)724 sbthread_msgq(sbthread_t *th)
725 {
726     return th->msgq;
727 }
728 
729 int
sbthread_send(sbthread_t * th,size_t len,void const * data)730 sbthread_send(sbthread_t *th, size_t len, void const *data)
731 {
732     return sbmsgq_send(th->msgq, len, data);
733 }
734 
735 void
sbthread_free(sbthread_t * th)736 sbthread_free(sbthread_t *th)
737 {
738     sbthread_wait(th);
739     sbmsgq_free(th->msgq);
740     ckd_free(th);
741 }
742