1 /* $NetBSD: work_thread.c,v 1.8 2020/05/25 20:47:24 christos Exp $ */
2
3 /*
4 * work_thread.c - threads implementation for blocking worker child.
5 */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8
9 #ifdef WORK_THREAD
10
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 #ifndef SYS_WINNT
15 #include <pthread.h>
16 #endif
17
18 #include "ntp_stdlib.h"
19 #include "ntp_malloc.h"
20 #include "ntp_syslog.h"
21 #include "ntpd.h"
22 #include "ntp_io.h"
23 #include "ntp_assert.h"
24 #include "ntp_unixtime.h"
25 #include "timespecops.h"
26 #include "ntp_worker.h"
27
28 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
29 #define CHILD_GONE_RESP CHILD_EXIT_REQ
30 /* Queue size increments:
31 * The request queue grows a bit faster than the response queue -- the
32 * daemon can push requests and pull results faster on avarage than the
33 * worker can process requests and push results... If this really pays
34 * off is debatable.
35 */
36 #define WORKITEMS_ALLOC_INC 16
37 #define RESPONSES_ALLOC_INC 4
38
39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40 * set the maximum to 256kB. If the minimum goes below the
41 * system-defined minimum stack size, we have to adjust accordingly.
42 */
43 #ifndef THREAD_MINSTACKSIZE
44 # define THREAD_MINSTACKSIZE (64U * 1024)
45 #endif
46 #ifndef __sun
47 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
48 # undef THREAD_MINSTACKSIZE
49 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
50 #endif
51 #endif
52
53 #ifndef THREAD_MAXSTACKSIZE
54 # define THREAD_MAXSTACKSIZE (256U * 1024)
55 #endif
56 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
57 # undef THREAD_MAXSTACKSIZE
58 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
59 #endif
60
61 /* need a good integer to store a pointer... */
62 #ifndef UINTPTR_T
63 # if defined(UINTPTR_MAX)
64 # define UINTPTR_T uintptr_t
65 # elif defined(UINT_PTR)
66 # define UINTPTR_T UINT_PTR
67 # else
68 # define UINTPTR_T size_t
69 # endif
70 #endif
71
72
73 #ifdef SYS_WINNT
74
75 # define thread_exit(c) _endthreadex(c)
76 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
77 u_int WINAPI blocking_thread(void *);
78 static BOOL same_os_sema(const sem_ref obj, void * osobj);
79
80 #else
81
82 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
83 # define tickle_sem sem_post
84 void * blocking_thread(void *);
85 static void block_thread_signals(sigset_t *);
86
87 #endif
88
89 #ifdef WORK_PIPE
90 addremove_io_fd_func addremove_io_fd;
91 #else
92 addremove_io_semaphore_func addremove_io_semaphore;
93 #endif
94
95 static void start_blocking_thread(blocking_child *);
96 static void start_blocking_thread_internal(blocking_child *);
97 static void prepare_child_sems(blocking_child *);
98 static int wait_for_sem(sem_ref, struct timespec *);
99 static int ensure_workitems_empty_slot(blocking_child *);
100 static int ensure_workresp_empty_slot(blocking_child *);
101 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
102 static void cleanup_after_child(blocking_child *);
103
104 static sema_type worker_mmutex;
105 static sem_ref worker_memlock;
106
107 /* --------------------------------------------------------------------
108 * locking the global worker state table (and other global stuff)
109 */
110 void
worker_global_lock(int inOrOut)111 worker_global_lock(
112 int inOrOut)
113 {
114 if (worker_memlock) {
115 if (inOrOut)
116 wait_for_sem(worker_memlock, NULL);
117 else
118 tickle_sem(worker_memlock);
119 }
120 }
121
122 /* --------------------------------------------------------------------
123 * implementation isolation wrapper
124 */
125 void
exit_worker(int exitcode)126 exit_worker(
127 int exitcode
128 )
129 {
130 thread_exit(exitcode); /* see #define thread_exit */
131 }
132
133 /* --------------------------------------------------------------------
134 * sleep for a given time or until the wakup semaphore is tickled.
135 */
136 int
worker_sleep(blocking_child * c,time_t seconds)137 worker_sleep(
138 blocking_child * c,
139 time_t seconds
140 )
141 {
142 struct timespec until;
143 int rc;
144
145 # ifdef HAVE_CLOCK_GETTIME
146 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
147 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
148 return -1;
149 }
150 # else
151 if (0 != getclock(TIMEOFDAY, &until)) {
152 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
153 return -1;
154 }
155 # endif
156 until.tv_sec += seconds;
157 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
158 if (0 == rc)
159 return -1;
160 if (-1 == rc && ETIMEDOUT == errno)
161 return 0;
162 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
163 return -1;
164 }
165
166
167 /* --------------------------------------------------------------------
168 * Wake up a worker that takes a nap.
169 */
170 void
interrupt_worker_sleep(void)171 interrupt_worker_sleep(void)
172 {
173 u_int idx;
174 blocking_child * c;
175
176 for (idx = 0; idx < blocking_children_alloc; idx++) {
177 c = blocking_children[idx];
178 if (NULL == c || NULL == c->wake_scheduled_sleep)
179 continue;
180 tickle_sem(c->wake_scheduled_sleep);
181 }
182 }
183
184 /* --------------------------------------------------------------------
185 * Make sure there is an empty slot at the head of the request
186 * queue. Tell if the queue is currently empty.
187 */
188 static int
ensure_workitems_empty_slot(blocking_child * c)189 ensure_workitems_empty_slot(
190 blocking_child *c
191 )
192 {
193 /*
194 ** !!! PRECONDITION: caller holds access lock!
195 **
196 ** This simply tries to increase the size of the buffer if it
197 ** becomes full. The resize operation does *not* maintain the
198 ** order of requests, but that should be irrelevant since the
199 ** processing is considered asynchronous anyway.
200 **
201 ** Return if the buffer is currently empty.
202 */
203
204 static const size_t each =
205 sizeof(blocking_children[0]->workitems[0]);
206
207 size_t new_alloc;
208 size_t slots_used;
209 size_t sidx;
210
211 slots_used = c->head_workitem - c->tail_workitem;
212 if (slots_used >= c->workitems_alloc) {
213 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
214 c->workitems = erealloc(c->workitems, new_alloc * each);
215 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
216 c->workitems[sidx] = NULL;
217 c->tail_workitem = 0;
218 c->head_workitem = c->workitems_alloc;
219 c->workitems_alloc = new_alloc;
220 }
221 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
222 return (0 == slots_used);
223 }
224
225 /* --------------------------------------------------------------------
226 * Make sure there is an empty slot at the head of the response
227 * queue. Tell if the queue is currently empty.
228 */
229 static int
ensure_workresp_empty_slot(blocking_child * c)230 ensure_workresp_empty_slot(
231 blocking_child *c
232 )
233 {
234 /*
235 ** !!! PRECONDITION: caller holds access lock!
236 **
237 ** Works like the companion function above.
238 */
239
240 static const size_t each =
241 sizeof(blocking_children[0]->responses[0]);
242
243 size_t new_alloc;
244 size_t slots_used;
245 size_t sidx;
246
247 slots_used = c->head_response - c->tail_response;
248 if (slots_used >= c->responses_alloc) {
249 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
250 c->responses = erealloc(c->responses, new_alloc * each);
251 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
252 c->responses[sidx] = NULL;
253 c->tail_response = 0;
254 c->head_response = c->responses_alloc;
255 c->responses_alloc = new_alloc;
256 }
257 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
258 return (0 == slots_used);
259 }
260
261
262 /* --------------------------------------------------------------------
263 * queue_req_pointer() - append a work item or idle exit request to
264 * blocking_workitems[]. Employ proper locking.
265 */
266 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)267 queue_req_pointer(
268 blocking_child * c,
269 blocking_pipe_header * hdr
270 )
271 {
272 size_t qhead;
273
274 /* >>>> ACCESS LOCKING STARTS >>>> */
275 wait_for_sem(c->accesslock, NULL);
276 ensure_workitems_empty_slot(c);
277 qhead = c->head_workitem;
278 c->workitems[qhead % c->workitems_alloc] = hdr;
279 c->head_workitem = 1 + qhead;
280 tickle_sem(c->accesslock);
281 /* <<<< ACCESS LOCKING ENDS <<<< */
282
283 /* queue consumer wake-up notification */
284 tickle_sem(c->workitems_pending);
285
286 return 0;
287 }
288
289 /* --------------------------------------------------------------------
290 * API function to make sure a worker is running, a proper private copy
291 * of the data is made, the data eneterd into the queue and the worker
292 * is signalled.
293 */
294 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)295 send_blocking_req_internal(
296 blocking_child * c,
297 blocking_pipe_header * hdr,
298 void * data
299 )
300 {
301 blocking_pipe_header * threadcopy;
302 size_t payload_octets;
303
304 REQUIRE(hdr != NULL);
305 REQUIRE(data != NULL);
306 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
307
308 if (hdr->octets <= sizeof(*hdr))
309 return 1; /* failure */
310 payload_octets = hdr->octets - sizeof(*hdr);
311
312 if (NULL == c->thread_ref)
313 start_blocking_thread(c);
314 threadcopy = emalloc(hdr->octets);
315 memcpy(threadcopy, hdr, sizeof(*hdr));
316 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
317
318 return queue_req_pointer(c, threadcopy);
319 }
320
321 /* --------------------------------------------------------------------
322 * Wait for the 'incoming queue no longer empty' signal, lock the shared
323 * structure and dequeue an item.
324 */
325 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)326 receive_blocking_req_internal(
327 blocking_child * c
328 )
329 {
330 blocking_pipe_header * req;
331 size_t qhead, qtail;
332
333 req = NULL;
334 do {
335 /* wait for tickle from the producer side */
336 wait_for_sem(c->workitems_pending, NULL);
337
338 /* >>>> ACCESS LOCKING STARTS >>>> */
339 wait_for_sem(c->accesslock, NULL);
340 qhead = c->head_workitem;
341 do {
342 qtail = c->tail_workitem;
343 if (qhead == qtail)
344 break;
345 c->tail_workitem = qtail + 1;
346 qtail %= c->workitems_alloc;
347 req = c->workitems[qtail];
348 c->workitems[qtail] = NULL;
349 } while (NULL == req);
350 tickle_sem(c->accesslock);
351 /* <<<< ACCESS LOCKING ENDS <<<< */
352
353 } while (NULL == req);
354
355 INSIST(NULL != req);
356 if (CHILD_EXIT_REQ == req) { /* idled out */
357 send_blocking_resp_internal(c, CHILD_GONE_RESP);
358 req = NULL;
359 }
360
361 return req;
362 }
363
364 /* --------------------------------------------------------------------
365 * Push a response into the return queue and eventually tickle the
366 * receiver.
367 */
368 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)369 send_blocking_resp_internal(
370 blocking_child * c,
371 blocking_pipe_header * resp
372 )
373 {
374 size_t qhead;
375 int empty;
376
377 /* >>>> ACCESS LOCKING STARTS >>>> */
378 wait_for_sem(c->accesslock, NULL);
379 empty = ensure_workresp_empty_slot(c);
380 qhead = c->head_response;
381 c->responses[qhead % c->responses_alloc] = resp;
382 c->head_response = 1 + qhead;
383 tickle_sem(c->accesslock);
384 /* <<<< ACCESS LOCKING ENDS <<<< */
385
386 /* queue consumer wake-up notification */
387 if (empty)
388 {
389 # ifdef WORK_PIPE
390 if (1 != write(c->resp_write_pipe, "", 1))
391 msyslog(LOG_WARNING, "async resolver: %s",
392 "failed to notify main thread!");
393 # else
394 tickle_sem(c->responses_pending);
395 # endif
396 }
397 return 0;
398 }
399
400
401 #ifndef WORK_PIPE
402
403 /* --------------------------------------------------------------------
404 * Check if a (Windows-)hanndle to a semaphore is actually the same we
405 * are using inside the sema wrapper.
406 */
407 static BOOL
same_os_sema(const sem_ref obj,void * osh)408 same_os_sema(
409 const sem_ref obj,
410 void* osh
411 )
412 {
413 return obj && osh && (obj->shnd == (HANDLE)osh);
414 }
415
416 /* --------------------------------------------------------------------
417 * Find the shared context that associates to an OS handle and make sure
418 * the data is dequeued and processed.
419 */
420 void
handle_blocking_resp_sem(void * context)421 handle_blocking_resp_sem(
422 void * context
423 )
424 {
425 blocking_child * c;
426 u_int idx;
427
428 c = NULL;
429 for (idx = 0; idx < blocking_children_alloc; idx++) {
430 c = blocking_children[idx];
431 if (c != NULL &&
432 c->thread_ref != NULL &&
433 same_os_sema(c->responses_pending, context))
434 break;
435 }
436 if (idx < blocking_children_alloc)
437 process_blocking_resp(c);
438 }
439 #endif /* !WORK_PIPE */
440
441 /* --------------------------------------------------------------------
442 * Fetch the next response from the return queue. In case of signalling
443 * via pipe, make sure the pipe is flushed, too.
444 */
445 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)446 receive_blocking_resp_internal(
447 blocking_child * c
448 )
449 {
450 blocking_pipe_header * removed;
451 size_t qhead, qtail, slot;
452
453 #ifdef WORK_PIPE
454 int rc;
455 char scratch[32];
456
457 do
458 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
459 while (-1 == rc && EINTR == errno);
460 #endif
461
462 /* >>>> ACCESS LOCKING STARTS >>>> */
463 wait_for_sem(c->accesslock, NULL);
464 qhead = c->head_response;
465 qtail = c->tail_response;
466 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
467 slot = qtail % c->responses_alloc;
468 removed = c->responses[slot];
469 c->responses[slot] = NULL;
470 }
471 c->tail_response = qtail;
472 tickle_sem(c->accesslock);
473 /* <<<< ACCESS LOCKING ENDS <<<< */
474
475 if (NULL != removed) {
476 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
477 BLOCKING_RESP_MAGIC == removed->magic_sig);
478 }
479 if (CHILD_GONE_RESP == removed) {
480 cleanup_after_child(c);
481 removed = NULL;
482 }
483
484 return removed;
485 }
486
487 /* --------------------------------------------------------------------
488 * Light up a new worker.
489 */
490 static void
start_blocking_thread(blocking_child * c)491 start_blocking_thread(
492 blocking_child * c
493 )
494 {
495
496 DEBUG_INSIST(!c->reusable);
497
498 prepare_child_sems(c);
499 start_blocking_thread_internal(c);
500 }
501
502 /* --------------------------------------------------------------------
503 * Create a worker thread. There are several differences between POSIX
504 * and Windows, of course -- most notably the Windows thread is no
505 * detached thread, and we keep the handle around until we want to get
506 * rid of the thread. The notification scheme also differs: Windows
507 * makes use of semaphores in both directions, POSIX uses a pipe for
508 * integration with 'select()' or alike.
509 */
510 static void
start_blocking_thread_internal(blocking_child * c)511 start_blocking_thread_internal(
512 blocking_child * c
513 )
514 #ifdef SYS_WINNT
515 {
516 BOOL resumed;
517
518 c->thread_ref = NULL;
519 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
520 c->thr_table[0].thnd =
521 (HANDLE)_beginthreadex(
522 NULL,
523 0,
524 &blocking_thread,
525 c,
526 CREATE_SUSPENDED,
527 NULL);
528
529 if (NULL == c->thr_table[0].thnd) {
530 msyslog(LOG_ERR, "start blocking thread failed: %m");
531 exit(-1);
532 }
533 /* remember the thread priority is only within the process class */
534 if (!SetThreadPriority(c->thr_table[0].thnd,
535 THREAD_PRIORITY_BELOW_NORMAL))
536 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
537
538 resumed = ResumeThread(c->thr_table[0].thnd);
539 DEBUG_INSIST(resumed);
540 c->thread_ref = &c->thr_table[0];
541 }
542 #else /* pthreads start_blocking_thread_internal() follows */
543 {
544 # ifdef NEED_PTHREAD_INIT
545 static int pthread_init_called;
546 # endif
547 pthread_attr_t thr_attr;
548 int rc;
549 int pipe_ends[2]; /* read then write */
550 int is_pipe;
551 int flags;
552 size_t ostacksize;
553 size_t nstacksize;
554 sigset_t saved_sig_mask;
555
556 c->thread_ref = NULL;
557
558 # ifdef NEED_PTHREAD_INIT
559 /*
560 * from lib/isc/unix/app.c:
561 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
562 */
563 if (!pthread_init_called) {
564 pthread_init();
565 pthread_init_called = TRUE;
566 }
567 # endif
568
569 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
570 if (0 != rc) {
571 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
572 exit(1);
573 }
574 c->resp_read_pipe = move_fd(pipe_ends[0]);
575 c->resp_write_pipe = move_fd(pipe_ends[1]);
576 c->ispipe = is_pipe;
577 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
578 if (-1 == flags) {
579 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
580 exit(1);
581 }
582 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
583 if (-1 == rc) {
584 msyslog(LOG_ERR,
585 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
586 exit(1);
587 }
588 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
589 pthread_attr_init(&thr_attr);
590 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
591 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
592 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
593 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
594 if (0 != rc) {
595 msyslog(LOG_ERR,
596 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
597 strerror(rc));
598 } else {
599 if (ostacksize < THREAD_MINSTACKSIZE)
600 nstacksize = THREAD_MINSTACKSIZE;
601 else if (ostacksize > THREAD_MAXSTACKSIZE)
602 nstacksize = THREAD_MAXSTACKSIZE;
603 else
604 nstacksize = ostacksize;
605 if (nstacksize != ostacksize)
606 rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
607 if (0 != rc)
608 msyslog(LOG_ERR,
609 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
610 (u_long)ostacksize, (u_long)nstacksize,
611 strerror(rc));
612 }
613 #else
614 UNUSED_ARG(nstacksize);
615 UNUSED_ARG(ostacksize);
616 #endif
617 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
618 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
619 #endif
620 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
621 block_thread_signals(&saved_sig_mask);
622 rc = pthread_create(&c->thr_table[0], &thr_attr,
623 &blocking_thread, c);
624 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
625 pthread_attr_destroy(&thr_attr);
626 if (0 != rc) {
627 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
628 strerror(rc));
629 exit(1);
630 }
631 c->thread_ref = &c->thr_table[0];
632 }
633 #endif
634
635 /* --------------------------------------------------------------------
636 * block_thread_signals()
637 *
638 * Temporarily block signals used by ntpd main thread, so that signal
639 * mask inherited by child threads leaves them blocked. Returns prior
640 * active signal mask via pmask, to be restored by the main thread
641 * after pthread_create().
642 */
643 #ifndef SYS_WINNT
644 void
block_thread_signals(sigset_t * pmask)645 block_thread_signals(
646 sigset_t * pmask
647 )
648 {
649 sigset_t block;
650
651 sigemptyset(&block);
652 # ifdef HAVE_SIGNALED_IO
653 # ifdef SIGIO
654 sigaddset(&block, SIGIO);
655 # endif
656 # ifdef SIGPOLL
657 sigaddset(&block, SIGPOLL);
658 # endif
659 # endif /* HAVE_SIGNALED_IO */
660 sigaddset(&block, SIGALRM);
661 sigaddset(&block, MOREDEBUGSIG);
662 sigaddset(&block, LESSDEBUGSIG);
663 # ifdef SIGDIE1
664 sigaddset(&block, SIGDIE1);
665 # endif
666 # ifdef SIGDIE2
667 sigaddset(&block, SIGDIE2);
668 # endif
669 # ifdef SIGDIE3
670 sigaddset(&block, SIGDIE3);
671 # endif
672 # ifdef SIGDIE4
673 sigaddset(&block, SIGDIE4);
674 # endif
675 # ifdef SIGBUS
676 sigaddset(&block, SIGBUS);
677 # endif
678 sigemptyset(pmask);
679 pthread_sigmask(SIG_BLOCK, &block, pmask);
680 }
681 #endif /* !SYS_WINNT */
682
683
684 /* --------------------------------------------------------------------
685 * Create & destroy semaphores. This is sufficiently different between
686 * POSIX and Windows to warrant wrapper functions and close enough to
687 * use the concept of synchronization via semaphore for all platforms.
688 */
689 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)690 create_sema(
691 sema_type* semptr,
692 u_int inival,
693 u_int maxval)
694 {
695 #ifdef SYS_WINNT
696
697 long svini, svmax;
698 if (NULL != semptr) {
699 svini = (inival < LONG_MAX)
700 ? (long)inival : LONG_MAX;
701 svmax = (maxval < LONG_MAX && maxval > 0)
702 ? (long)maxval : LONG_MAX;
703 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
704 if (NULL == semptr->shnd)
705 semptr = NULL;
706 }
707
708 #else
709
710 (void)maxval;
711 if (semptr && sem_init(semptr, FALSE, inival))
712 semptr = NULL;
713
714 #endif
715
716 return semptr;
717 }
718
719 /* ------------------------------------------------------------------ */
720 static sem_ref
delete_sema(sem_ref obj)721 delete_sema(
722 sem_ref obj)
723 {
724
725 # ifdef SYS_WINNT
726
727 if (obj) {
728 if (obj->shnd)
729 CloseHandle(obj->shnd);
730 obj->shnd = NULL;
731 }
732
733 # else
734
735 if (obj)
736 sem_destroy(obj);
737
738 # endif
739
740 return NULL;
741 }
742
743 /* --------------------------------------------------------------------
744 * prepare_child_sems()
745 *
746 * create sync & access semaphores
747 *
748 * All semaphores are cleared, only the access semaphore has 1 unit.
749 * Childs wait on 'workitems_pending', then grabs 'sema_access'
750 * and dequeues jobs. When done, 'sema_access' is given one unit back.
751 *
752 * The producer grabs 'sema_access', manages the queue, restores
753 * 'sema_access' and puts one unit into 'workitems_pending'.
754 *
755 * The story goes the same for the response queue.
756 */
757 static void
prepare_child_sems(blocking_child * c)758 prepare_child_sems(
759 blocking_child *c
760 )
761 {
762 if (NULL == worker_memlock)
763 worker_memlock = create_sema(&worker_mmutex, 1, 1);
764
765 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
766 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
767 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
768 # ifndef WORK_PIPE
769 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
770 # endif
771 }
772
773 /* --------------------------------------------------------------------
774 * wait for semaphore. Where the wait can be interrupted, it will
775 * internally resume -- When this function returns, there is either no
776 * semaphore at all, a timeout occurred, or the caller could
777 * successfully take a token from the semaphore.
778 *
779 * For untimed wait, not checking the result of this function at all is
780 * definitely an option.
781 */
782 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)783 wait_for_sem(
784 sem_ref sem,
785 struct timespec * timeout /* wall-clock */
786 )
787 #ifdef SYS_WINNT
788 {
789 struct timespec now;
790 struct timespec delta;
791 DWORD msec;
792 DWORD rc;
793
794 if (!(sem && sem->shnd)) {
795 errno = EINVAL;
796 return -1;
797 }
798
799 if (NULL == timeout) {
800 msec = INFINITE;
801 } else {
802 getclock(TIMEOFDAY, &now);
803 delta = sub_tspec(*timeout, now);
804 if (delta.tv_sec < 0) {
805 msec = 0;
806 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
807 msec = INFINITE;
808 } else {
809 msec = 1000 * (DWORD)delta.tv_sec;
810 msec += delta.tv_nsec / (1000 * 1000);
811 }
812 }
813 rc = WaitForSingleObject(sem->shnd, msec);
814 if (WAIT_OBJECT_0 == rc)
815 return 0;
816 if (WAIT_TIMEOUT == rc) {
817 errno = ETIMEDOUT;
818 return -1;
819 }
820 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
821 errno = EFAULT;
822 return -1;
823 }
824 #else /* pthreads wait_for_sem() follows */
825 {
826 int rc = -1;
827
828 if (sem) do {
829 if (NULL == timeout)
830 rc = sem_wait(sem);
831 else
832 rc = sem_timedwait(sem, timeout);
833 } while (rc == -1 && errno == EINTR);
834 else
835 errno = EINVAL;
836
837 return rc;
838 }
839 #endif
840
841 /* --------------------------------------------------------------------
842 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
843 * calling conventions under Windows and POSIX-defined signature
844 * otherwise.
845 */
846 #ifdef SYS_WINNT
847 u_int WINAPI
848 #else
849 void *
850 #endif
blocking_thread(void * ThreadArg)851 blocking_thread(
852 void * ThreadArg
853 )
854 {
855 blocking_child *c;
856
857 c = ThreadArg;
858 exit_worker(blocking_child_common(c));
859
860 /* NOTREACHED */
861 return 0;
862 }
863
864 /* --------------------------------------------------------------------
865 * req_child_exit() runs in the parent.
866 *
867 * This function is called from from the idle timer, too, and possibly
868 * without a thread being there any longer. Since we have folded up our
869 * tent in that case and all the semaphores are already gone, we simply
870 * ignore this request in this case.
871 *
872 * Since the existence of the semaphores is controlled exclusively by
873 * the parent, there's no risk of data race here.
874 */
875 int
req_child_exit(blocking_child * c)876 req_child_exit(
877 blocking_child *c
878 )
879 {
880 return (c->accesslock)
881 ? queue_req_pointer(c, CHILD_EXIT_REQ)
882 : 0;
883 }
884
885 /* --------------------------------------------------------------------
886 * cleanup_after_child() runs in parent.
887 */
888 static void
cleanup_after_child(blocking_child * c)889 cleanup_after_child(
890 blocking_child * c
891 )
892 {
893 DEBUG_INSIST(!c->reusable);
894
895 # ifdef SYS_WINNT
896 /* The thread was not created in detached state, so we better
897 * clean up.
898 */
899 if (c->thread_ref && c->thread_ref->thnd) {
900 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
901 INSIST(CloseHandle(c->thread_ref->thnd));
902 c->thread_ref->thnd = NULL;
903 }
904 # endif
905 c->thread_ref = NULL;
906
907 /* remove semaphores and (if signalling vi IO) pipes */
908
909 c->accesslock = delete_sema(c->accesslock);
910 c->workitems_pending = delete_sema(c->workitems_pending);
911 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
912
913 # ifdef WORK_PIPE
914 DEBUG_INSIST(-1 != c->resp_read_pipe);
915 DEBUG_INSIST(-1 != c->resp_write_pipe);
916 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
917 close(c->resp_write_pipe);
918 close(c->resp_read_pipe);
919 c->resp_write_pipe = -1;
920 c->resp_read_pipe = -1;
921 # else
922 DEBUG_INSIST(NULL != c->responses_pending);
923 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
924 c->responses_pending = delete_sema(c->responses_pending);
925 # endif
926
927 /* Is it necessary to check if there are pending requests and
928 * responses? If so, and if there are, what to do with them?
929 */
930
931 /* re-init buffer index sequencers */
932 c->head_workitem = 0;
933 c->tail_workitem = 0;
934 c->head_response = 0;
935 c->tail_response = 0;
936
937 c->reusable = TRUE;
938 }
939
940
941 #else /* !WORK_THREAD follows */
942 char work_thread_nonempty_compilation_unit;
943 #endif
944