1 /* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /* ====================================================================
3 * Copyright (c) 2008 Carnegie Mellon University. All rights
4 * reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 *
18 * This work was supported in part by funding from the Defense Advanced
19 * Research Projects Agency and the National Science Foundation of the
20 * United States of America, and the CMU Sphinx Speech Consortium.
21 *
22 * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND
23 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
24 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY
26 * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 *
34 * ====================================================================
35 *
36 */
37
38 /**
39 * @file sbthread.c
40 * @brief Simple portable thread functions
41 * @author David Huggins-Daines <dhuggins@cs.cmu.edu>
42 */
43
44 #include <string.h>
45
46 #include "sphinxbase/sbthread.h"
47 #include "sphinxbase/ckd_alloc.h"
48 #include "sphinxbase/err.h"
49
50 /*
51 * Platform-specific parts: threads, mutexes, and signals.
52 */
53 #if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__)
54 #ifndef _WIN32_WINNT
55 #define _WIN32_WINNT 0x0400
56 #endif /* not _WIN32_WINNT */
57 #include <windows.h>
58
59 struct sbthread_s {
60 cmd_ln_t *config;
61 sbmsgq_t *msgq;
62 sbthread_main func;
63 void *arg;
64 HANDLE th;
65 DWORD tid;
66 };
67
68 struct sbmsgq_s {
69 /* Ringbuffer for passing messages. */
70 char *data;
71 size_t depth;
72 size_t out;
73 size_t nbytes;
74
75 /* Current message is stored here. */
76 char *msg;
77 size_t msglen;
78 CRITICAL_SECTION mtx;
79 HANDLE evt;
80 };
81
82 struct sbevent_s {
83 HANDLE evt;
84 };
85
86 struct sbmtx_s {
87 CRITICAL_SECTION mtx;
88 };
89
90 DWORD WINAPI
sbthread_internal_main(LPVOID arg)91 sbthread_internal_main(LPVOID arg)
92 {
93 sbthread_t *th = (sbthread_t *)arg;
94 int rv;
95
96 rv = (*th->func)(th);
97 return (DWORD)rv;
98 }
99
100 sbthread_t *
sbthread_start(cmd_ln_t * config,sbthread_main func,void * arg)101 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
102 {
103 sbthread_t *th;
104
105 th = ckd_calloc(1, sizeof(*th));
106 th->config = config;
107 th->func = func;
108 th->arg = arg;
109 th->msgq = sbmsgq_init(256);
110 th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
111 if (th->th == NULL) {
112 sbthread_free(th);
113 return NULL;
114 }
115 return th;
116 }
117
118 int
sbthread_wait(sbthread_t * th)119 sbthread_wait(sbthread_t *th)
120 {
121 DWORD rv, exit;
122
123 /* It has already been joined. */
124 if (th->th == NULL)
125 return -1;
126
127 rv = WaitForSingleObject(th->th, INFINITE);
128 if (rv == WAIT_FAILED) {
129 E_ERROR("Failed to join thread: WAIT_FAILED\n");
130 return -1;
131 }
132 GetExitCodeThread(th->th, &exit);
133 CloseHandle(th->th);
134 th->th = NULL;
135 return (int)exit;
136 }
137
138 static DWORD
cond_timed_wait(HANDLE cond,int sec,int nsec)139 cond_timed_wait(HANDLE cond, int sec, int nsec)
140 {
141 DWORD rv;
142 if (sec == -1) {
143 rv = WaitForSingleObject(cond, INFINITE);
144 }
145 else {
146 DWORD ms;
147
148 ms = sec * 1000 + nsec / (1000*1000);
149 rv = WaitForSingleObject(cond, ms);
150 }
151 return rv;
152 }
153
154 /* Updated to use Unicode */
155 sbevent_t *
sbevent_init(void)156 sbevent_init(void)
157 {
158 sbevent_t *evt;
159
160 evt = ckd_calloc(1, sizeof(*evt));
161 evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
162 if (evt->evt == NULL) {
163 ckd_free(evt);
164 return NULL;
165 }
166 return evt;
167 }
168
169 void
sbevent_free(sbevent_t * evt)170 sbevent_free(sbevent_t *evt)
171 {
172 CloseHandle(evt->evt);
173 ckd_free(evt);
174 }
175
176 int
sbevent_signal(sbevent_t * evt)177 sbevent_signal(sbevent_t *evt)
178 {
179 return SetEvent(evt->evt) ? 0 : -1;
180 }
181
182 int
sbevent_wait(sbevent_t * evt,int sec,int nsec)183 sbevent_wait(sbevent_t *evt, int sec, int nsec)
184 {
185 DWORD rv;
186
187 rv = cond_timed_wait(evt->evt, sec, nsec);
188 return rv;
189 }
190
191 sbmtx_t *
sbmtx_init(void)192 sbmtx_init(void)
193 {
194 sbmtx_t *mtx;
195
196 mtx = ckd_calloc(1, sizeof(*mtx));
197 InitializeCriticalSection(&mtx->mtx);
198 return mtx;
199 }
200
201 int
sbmtx_trylock(sbmtx_t * mtx)202 sbmtx_trylock(sbmtx_t *mtx)
203 {
204 return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
205 }
206
207 int
sbmtx_lock(sbmtx_t * mtx)208 sbmtx_lock(sbmtx_t *mtx)
209 {
210 EnterCriticalSection(&mtx->mtx);
211 return 0;
212 }
213
214 int
sbmtx_unlock(sbmtx_t * mtx)215 sbmtx_unlock(sbmtx_t *mtx)
216 {
217 LeaveCriticalSection(&mtx->mtx);
218 return 0;
219 }
220
221 void
sbmtx_free(sbmtx_t * mtx)222 sbmtx_free(sbmtx_t *mtx)
223 {
224 DeleteCriticalSection(&mtx->mtx);
225 ckd_free(mtx);
226 }
227
228 sbmsgq_t *
sbmsgq_init(size_t depth)229 sbmsgq_init(size_t depth)
230 {
231 sbmsgq_t *msgq;
232
233 msgq = ckd_calloc(1, sizeof(*msgq));
234 msgq->depth = depth;
235 msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
236 if (msgq->evt == NULL) {
237 ckd_free(msgq);
238 return NULL;
239 }
240 InitializeCriticalSection(&msgq->mtx);
241 msgq->data = ckd_calloc(depth, 1);
242 msgq->msg = ckd_calloc(depth, 1);
243 return msgq;
244 }
245
246 void
sbmsgq_free(sbmsgq_t * msgq)247 sbmsgq_free(sbmsgq_t *msgq)
248 {
249 CloseHandle(msgq->evt);
250 ckd_free(msgq->data);
251 ckd_free(msgq->msg);
252 ckd_free(msgq);
253 }
254
255 int
sbmsgq_send(sbmsgq_t * q,size_t len,void const * data)256 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
257 {
258 char const *cdata = (char const *)data;
259 size_t in;
260
261 /* Don't allow things bigger than depth to be sent! */
262 if (len + sizeof(len) > q->depth)
263 return -1;
264
265 if (q->nbytes + len + sizeof(len) > q->depth)
266 WaitForSingleObject(q->evt, INFINITE);
267
268 /* Lock things while we manipulate the buffer (FIXME: this
269 actually should have been atomic with the wait above ...) */
270 EnterCriticalSection(&q->mtx);
271 in = (q->out + q->nbytes) % q->depth;
272 /* First write the size of the message. */
273 if (in + sizeof(len) > q->depth) {
274 /* Handle the annoying case where the size field gets wrapped around. */
275 size_t len1 = q->depth - in;
276 memcpy(q->data + in, &len, len1);
277 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
278 q->nbytes += sizeof(len);
279 in = sizeof(len) - len1;
280 }
281 else {
282 memcpy(q->data + in, &len, sizeof(len));
283 q->nbytes += sizeof(len);
284 in += sizeof(len);
285 }
286
287 /* Now write the message body. */
288 if (in + len > q->depth) {
289 /* Handle wraparound. */
290 size_t len1 = q->depth - in;
291 memcpy(q->data + in, cdata, len1);
292 q->nbytes += len1;
293 cdata += len1;
294 len -= len1;
295 in = 0;
296 }
297 memcpy(q->data + in, cdata, len);
298 q->nbytes += len;
299
300 /* Signal the condition variable. */
301 SetEvent(q->evt);
302 /* Unlock. */
303 LeaveCriticalSection(&q->mtx);
304
305 return 0;
306 }
307
308 void *
sbmsgq_wait(sbmsgq_t * q,size_t * out_len,int sec,int nsec)309 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
310 {
311 char *outptr;
312 size_t len;
313
314 /* Wait for data to be available. */
315 if (q->nbytes == 0) {
316 if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
317 /* Timed out or something... */
318 return NULL;
319 }
320 /* Lock to manipulate the queue (FIXME) */
321 EnterCriticalSection(&q->mtx);
322 /* Get the message size. */
323 if (q->out + sizeof(q->msglen) > q->depth) {
324 /* Handle annoying wraparound case. */
325 size_t len1 = q->depth - q->out;
326 memcpy(&q->msglen, q->data + q->out, len1);
327 memcpy(((char *)&q->msglen) + len1, q->data,
328 sizeof(q->msglen) - len1);
329 q->out = sizeof(q->msglen) - len1;
330 }
331 else {
332 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
333 q->out += sizeof(q->msglen);
334 }
335 q->nbytes -= sizeof(q->msglen);
336 /* Get the message body. */
337 outptr = q->msg;
338 len = q->msglen;
339 if (q->out + q->msglen > q->depth) {
340 /* Handle wraparound. */
341 size_t len1 = q->depth - q->out;
342 memcpy(outptr, q->data + q->out, len1);
343 outptr += len1;
344 len -= len1;
345 q->nbytes -= len1;
346 q->out = 0;
347 }
348 memcpy(outptr, q->data + q->out, len);
349 q->nbytes -= len;
350 q->out += len;
351
352 /* Signal the condition variable. */
353 SetEvent(q->evt);
354 /* Unlock. */
355 LeaveCriticalSection(&q->mtx);
356 if (out_len)
357 *out_len = q->msglen;
358 return q->msg;
359 }
360
361 #else /* POSIX */
362 #include <pthread.h>
363 #include <sys/time.h>
364
365 struct sbthread_s {
366 cmd_ln_t *config;
367 sbmsgq_t *msgq;
368 sbthread_main func;
369 void *arg;
370 pthread_t th;
371 };
372
373 struct sbmsgq_s {
374 /* Ringbuffer for passing messages. */
375 char *data;
376 size_t depth;
377 size_t out;
378 size_t nbytes;
379
380 /* Current message is stored here. */
381 char *msg;
382 size_t msglen;
383 pthread_mutex_t mtx;
384 pthread_cond_t cond;
385 };
386
387 struct sbevent_s {
388 pthread_mutex_t mtx;
389 pthread_cond_t cond;
390 int signalled;
391 };
392
393 struct sbmtx_s {
394 pthread_mutex_t mtx;
395 };
396
397 static void *
sbthread_internal_main(void * arg)398 sbthread_internal_main(void *arg)
399 {
400 sbthread_t *th = (sbthread_t *)arg;
401 int rv;
402
403 rv = (*th->func)(th);
404 return (void *)(long)rv;
405 }
406
407 sbthread_t *
sbthread_start(cmd_ln_t * config,sbthread_main func,void * arg)408 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
409 {
410 sbthread_t *th;
411 int rv;
412
413 th = ckd_calloc(1, sizeof(*th));
414 th->config = config;
415 th->func = func;
416 th->arg = arg;
417 th->msgq = sbmsgq_init(1024);
418 if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
419 E_ERROR("Failed to create thread: %d\n", rv);
420 sbthread_free(th);
421 return NULL;
422 }
423 return th;
424 }
425
426 int
sbthread_wait(sbthread_t * th)427 sbthread_wait(sbthread_t *th)
428 {
429 void *exit;
430 int rv;
431
432 /* It has already been joined. */
433 if (th->th == (pthread_t)-1)
434 return -1;
435
436 rv = pthread_join(th->th, &exit);
437 if (rv != 0) {
438 E_ERROR("Failed to join thread: %d\n", rv);
439 return -1;
440 }
441 th->th = (pthread_t)-1;
442 return (int)(long)exit;
443 }
444
445 sbmsgq_t *
sbmsgq_init(size_t depth)446 sbmsgq_init(size_t depth)
447 {
448 sbmsgq_t *msgq;
449
450 msgq = ckd_calloc(1, sizeof(*msgq));
451 msgq->depth = depth;
452 if (pthread_cond_init(&msgq->cond, NULL) != 0) {
453 ckd_free(msgq);
454 return NULL;
455 }
456 if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
457 pthread_cond_destroy(&msgq->cond);
458 ckd_free(msgq);
459 return NULL;
460 }
461 msgq->data = ckd_calloc(depth, 1);
462 msgq->msg = ckd_calloc(depth, 1);
463 return msgq;
464 }
465
466 void
sbmsgq_free(sbmsgq_t * msgq)467 sbmsgq_free(sbmsgq_t *msgq)
468 {
469 pthread_mutex_destroy(&msgq->mtx);
470 pthread_cond_destroy(&msgq->cond);
471 ckd_free(msgq->data);
472 ckd_free(msgq->msg);
473 ckd_free(msgq);
474 }
475
476 int
sbmsgq_send(sbmsgq_t * q,size_t len,void const * data)477 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
478 {
479 size_t in;
480
481 /* Don't allow things bigger than depth to be sent! */
482 if (len + sizeof(len) > q->depth)
483 return -1;
484
485 /* Lock the condition variable while we manipulate the buffer. */
486 pthread_mutex_lock(&q->mtx);
487 if (q->nbytes + len + sizeof(len) > q->depth) {
488 /* Unlock and wait for space to be available. */
489 if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
490 /* Timed out, don't send anything. */
491 pthread_mutex_unlock(&q->mtx);
492 return -1;
493 }
494 /* Condition is now locked again. */
495 }
496 in = (q->out + q->nbytes) % q->depth;
497
498 /* First write the size of the message. */
499 if (in + sizeof(len) > q->depth) {
500 /* Handle the annoying case where the size field gets wrapped around. */
501 size_t len1 = q->depth - in;
502 memcpy(q->data + in, &len, len1);
503 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
504 q->nbytes += sizeof(len);
505 in = sizeof(len) - len1;
506 }
507 else {
508 memcpy(q->data + in, &len, sizeof(len));
509 q->nbytes += sizeof(len);
510 in += sizeof(len);
511 }
512
513 /* Now write the message body. */
514 if (in + len > q->depth) {
515 /* Handle wraparound. */
516 size_t len1 = q->depth - in;
517 memcpy(q->data + in, data, len1);
518 q->nbytes += len1;
519 data = (char const *)data + len1;
520 len -= len1;
521 in = 0;
522 }
523 memcpy(q->data + in, data, len);
524 q->nbytes += len;
525
526 /* Signal the condition variable. */
527 pthread_cond_signal(&q->cond);
528 /* Unlock it, we have nothing else to do. */
529 pthread_mutex_unlock(&q->mtx);
530 return 0;
531 }
532
533 static int
cond_timed_wait(pthread_cond_t * cond,pthread_mutex_t * mtx,int sec,int nsec)534 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
535 {
536 int rv;
537 if (sec == -1) {
538 rv = pthread_cond_wait(cond, mtx);
539 }
540 else {
541 struct timeval now;
542 struct timespec end;
543
544 gettimeofday(&now, NULL);
545 end.tv_sec = now.tv_sec + sec;
546 end.tv_nsec = now.tv_usec * 1000 + nsec;
547 if (end.tv_nsec > (1000*1000*1000)) {
548 sec += end.tv_nsec / (1000*1000*1000);
549 end.tv_nsec = end.tv_nsec % (1000*1000*1000);
550 }
551 rv = pthread_cond_timedwait(cond, mtx, &end);
552 }
553 return rv;
554 }
555
556 void *
sbmsgq_wait(sbmsgq_t * q,size_t * out_len,int sec,int nsec)557 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
558 {
559 char *outptr;
560 size_t len;
561
562 /* Lock the condition variable while we manipulate nmsg. */
563 pthread_mutex_lock(&q->mtx);
564 if (q->nbytes == 0) {
565 /* Unlock the condition variable and wait for a signal. */
566 if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
567 /* Timed out or something... */
568 pthread_mutex_unlock(&q->mtx);
569 return NULL;
570 }
571 /* Condition variable is now locked again. */
572 }
573 /* Get the message size. */
574 if (q->out + sizeof(q->msglen) > q->depth) {
575 /* Handle annoying wraparound case. */
576 size_t len1 = q->depth - q->out;
577 memcpy(&q->msglen, q->data + q->out, len1);
578 memcpy(((char *)&q->msglen) + len1, q->data,
579 sizeof(q->msglen) - len1);
580 q->out = sizeof(q->msglen) - len1;
581 }
582 else {
583 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
584 q->out += sizeof(q->msglen);
585 }
586 q->nbytes -= sizeof(q->msglen);
587 /* Get the message body. */
588 outptr = q->msg;
589 len = q->msglen;
590 if (q->out + q->msglen > q->depth) {
591 /* Handle wraparound. */
592 size_t len1 = q->depth - q->out;
593 memcpy(outptr, q->data + q->out, len1);
594 outptr += len1;
595 len -= len1;
596 q->nbytes -= len1;
597 q->out = 0;
598 }
599 memcpy(outptr, q->data + q->out, len);
600 q->nbytes -= len;
601 q->out += len;
602
603 /* Signal the condition variable. */
604 pthread_cond_signal(&q->cond);
605 /* Unlock the condition variable, we are done. */
606 pthread_mutex_unlock(&q->mtx);
607 if (out_len)
608 *out_len = q->msglen;
609 return q->msg;
610 }
611
612 sbevent_t *
sbevent_init(void)613 sbevent_init(void)
614 {
615 sbevent_t *evt;
616 int rv;
617
618 evt = ckd_calloc(1, sizeof(*evt));
619 if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
620 E_ERROR("Failed to initialize mutex: %d\n", rv);
621 ckd_free(evt);
622 return NULL;
623 }
624 if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
625 E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
626 pthread_mutex_destroy(&evt->mtx);
627 ckd_free(evt);
628 return NULL;
629 }
630 return evt;
631 }
632
633 void
sbevent_free(sbevent_t * evt)634 sbevent_free(sbevent_t *evt)
635 {
636 pthread_mutex_destroy(&evt->mtx);
637 pthread_cond_destroy(&evt->cond);
638 ckd_free(evt);
639 }
640
641 int
sbevent_signal(sbevent_t * evt)642 sbevent_signal(sbevent_t *evt)
643 {
644 int rv;
645
646 pthread_mutex_lock(&evt->mtx);
647 evt->signalled = TRUE;
648 rv = pthread_cond_signal(&evt->cond);
649 pthread_mutex_unlock(&evt->mtx);
650 return rv;
651 }
652
653 int
sbevent_wait(sbevent_t * evt,int sec,int nsec)654 sbevent_wait(sbevent_t *evt, int sec, int nsec)
655 {
656 int rv = 0;
657
658 /* Lock the mutex before we check its signalled state. */
659 pthread_mutex_lock(&evt->mtx);
660 /* If it's not signalled, then wait until it is. */
661 if (!evt->signalled)
662 rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
663 /* Set its state to unsignalled if we were successful. */
664 if (rv == 0)
665 evt->signalled = FALSE;
666 /* And unlock its mutex. */
667 pthread_mutex_unlock(&evt->mtx);
668
669 return rv;
670 }
671
672 sbmtx_t *
sbmtx_init(void)673 sbmtx_init(void)
674 {
675 sbmtx_t *mtx;
676
677 mtx = ckd_calloc(1, sizeof(*mtx));
678 if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
679 ckd_free(mtx);
680 return NULL;
681 }
682 return mtx;
683 }
684
685 int
sbmtx_trylock(sbmtx_t * mtx)686 sbmtx_trylock(sbmtx_t *mtx)
687 {
688 return pthread_mutex_trylock(&mtx->mtx);
689 }
690
691 int
sbmtx_lock(sbmtx_t * mtx)692 sbmtx_lock(sbmtx_t *mtx)
693 {
694 return pthread_mutex_lock(&mtx->mtx);
695 }
696
697 int
sbmtx_unlock(sbmtx_t * mtx)698 sbmtx_unlock(sbmtx_t *mtx)
699 {
700 return pthread_mutex_unlock(&mtx->mtx);
701 }
702
703 void
sbmtx_free(sbmtx_t * mtx)704 sbmtx_free(sbmtx_t *mtx)
705 {
706 pthread_mutex_destroy(&mtx->mtx);
707 ckd_free(mtx);
708 }
709 #endif /* not WIN32 */
710
711 cmd_ln_t *
sbthread_config(sbthread_t * th)712 sbthread_config(sbthread_t *th)
713 {
714 return th->config;
715 }
716
717 void *
sbthread_arg(sbthread_t * th)718 sbthread_arg(sbthread_t *th)
719 {
720 return th->arg;
721 }
722
723 sbmsgq_t *
sbthread_msgq(sbthread_t * th)724 sbthread_msgq(sbthread_t *th)
725 {
726 return th->msgq;
727 }
728
729 int
sbthread_send(sbthread_t * th,size_t len,void const * data)730 sbthread_send(sbthread_t *th, size_t len, void const *data)
731 {
732 return sbmsgq_send(th->msgq, len, data);
733 }
734
735 void
sbthread_free(sbthread_t * th)736 sbthread_free(sbthread_t *th)
737 {
738 sbthread_wait(th);
739 sbmsgq_free(th->msgq);
740 ckd_free(th);
741 }
742