1 /*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORK_THREAD
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 /* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * daemon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results... If this really pays
32 * off is debatable.
33 */
34 #define WORKITEMS_ALLOC_INC 16
35 #define RESPONSES_ALLOC_INC 4
36
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
40 */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE (64U * 1024)
43 #endif
44
45 #ifndef THREAD_MAXSTACKSIZE
46 # define THREAD_MAXSTACKSIZE (256U * 1024)
47 #endif
48
49 /* need a good integer to store a pointer... */
50 #ifndef UINTPTR_T
51 # if defined(UINTPTR_MAX)
52 # define UINTPTR_T uintptr_t
53 # elif defined(UINT_PTR)
54 # define UINTPTR_T UINT_PTR
55 # else
56 # define UINTPTR_T size_t
57 # endif
58 #endif
59
60
61 #ifdef SYS_WINNT
62
63 # define thread_exit(c) _endthreadex(c)
64 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
65 u_int WINAPI blocking_thread(void *);
66 static BOOL same_os_sema(const sem_ref obj, void * osobj);
67
68 #else
69
70 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
71 # define tickle_sem sem_post
72 void * blocking_thread(void *);
73 static void block_thread_signals(sigset_t *);
74
75 #endif
76
77 #ifdef WORK_PIPE
78 addremove_io_fd_func addremove_io_fd;
79 #else
80 addremove_io_semaphore_func addremove_io_semaphore;
81 #endif
82
83 static void start_blocking_thread(blocking_child *);
84 static void start_blocking_thread_internal(blocking_child *);
85 static void prepare_child_sems(blocking_child *);
86 static int wait_for_sem(sem_ref, struct timespec *);
87 static int ensure_workitems_empty_slot(blocking_child *);
88 static int ensure_workresp_empty_slot(blocking_child *);
89 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
90 static void cleanup_after_child(blocking_child *);
91
92 static sema_type worker_mmutex;
93 static sem_ref worker_memlock;
94
95 /* --------------------------------------------------------------------
96 * locking the global worker state table (and other global stuff)
97 */
98 void
worker_global_lock(int inOrOut)99 worker_global_lock(
100 int inOrOut)
101 {
102 if (worker_memlock) {
103 if (inOrOut)
104 wait_for_sem(worker_memlock, NULL);
105 else
106 tickle_sem(worker_memlock);
107 }
108 }
109
110 /* --------------------------------------------------------------------
111 * implementation isolation wrapper
112 */
113 void
exit_worker(int exitcode)114 exit_worker(
115 int exitcode
116 )
117 {
118 thread_exit(exitcode); /* see #define thread_exit */
119 }
120
121 /* --------------------------------------------------------------------
122 * sleep for a given time or until the wakup semaphore is tickled.
123 */
124 int
worker_sleep(blocking_child * c,time_t seconds)125 worker_sleep(
126 blocking_child * c,
127 time_t seconds
128 )
129 {
130 struct timespec until;
131 int rc;
132
133 # ifdef HAVE_CLOCK_GETTIME
134 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
135 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
136 return -1;
137 }
138 # else
139 if (0 != getclock(TIMEOFDAY, &until)) {
140 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
141 return -1;
142 }
143 # endif
144 until.tv_sec += seconds;
145 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
146 if (0 == rc)
147 return -1;
148 if (-1 == rc && ETIMEDOUT == errno)
149 return 0;
150 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
151 return -1;
152 }
153
154
155 /* --------------------------------------------------------------------
156 * Wake up a worker that takes a nap.
157 */
158 void
interrupt_worker_sleep(void)159 interrupt_worker_sleep(void)
160 {
161 u_int idx;
162 blocking_child * c;
163
164 for (idx = 0; idx < blocking_children_alloc; idx++) {
165 c = blocking_children[idx];
166 if (NULL == c || NULL == c->wake_scheduled_sleep)
167 continue;
168 tickle_sem(c->wake_scheduled_sleep);
169 }
170 }
171
172 /* --------------------------------------------------------------------
173 * Make sure there is an empty slot at the head of the request
174 * queue. Tell if the queue is currently empty.
175 */
176 static int
ensure_workitems_empty_slot(blocking_child * c)177 ensure_workitems_empty_slot(
178 blocking_child *c
179 )
180 {
181 /*
182 ** !!! PRECONDITION: caller holds access lock!
183 **
184 ** This simply tries to increase the size of the buffer if it
185 ** becomes full. The resize operation does *not* maintain the
186 ** order of requests, but that should be irrelevant since the
187 ** processing is considered asynchronous anyway.
188 **
189 ** Return if the buffer is currently empty.
190 */
191
192 static const size_t each =
193 sizeof(blocking_children[0]->workitems[0]);
194
195 size_t new_alloc;
196 size_t slots_used;
197 size_t sidx;
198
199 slots_used = c->head_workitem - c->tail_workitem;
200 if (slots_used >= c->workitems_alloc) {
201 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
202 c->workitems = erealloc(c->workitems, new_alloc * each);
203 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
204 c->workitems[sidx] = NULL;
205 c->tail_workitem = 0;
206 c->head_workitem = c->workitems_alloc;
207 c->workitems_alloc = new_alloc;
208 }
209 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
210 return (0 == slots_used);
211 }
212
213 /* --------------------------------------------------------------------
214 * Make sure there is an empty slot at the head of the response
215 * queue. Tell if the queue is currently empty.
216 */
217 static int
ensure_workresp_empty_slot(blocking_child * c)218 ensure_workresp_empty_slot(
219 blocking_child *c
220 )
221 {
222 /*
223 ** !!! PRECONDITION: caller holds access lock!
224 **
225 ** Works like the companion function above.
226 */
227
228 static const size_t each =
229 sizeof(blocking_children[0]->responses[0]);
230
231 size_t new_alloc;
232 size_t slots_used;
233 size_t sidx;
234
235 slots_used = c->head_response - c->tail_response;
236 if (slots_used >= c->responses_alloc) {
237 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
238 c->responses = erealloc(c->responses, new_alloc * each);
239 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
240 c->responses[sidx] = NULL;
241 c->tail_response = 0;
242 c->head_response = c->responses_alloc;
243 c->responses_alloc = new_alloc;
244 }
245 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
246 return (0 == slots_used);
247 }
248
249
250 /* --------------------------------------------------------------------
251 * queue_req_pointer() - append a work item or idle exit request to
252 * blocking_workitems[]. Employ proper locking.
253 */
254 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)255 queue_req_pointer(
256 blocking_child * c,
257 blocking_pipe_header * hdr
258 )
259 {
260 size_t qhead;
261
262 /* >>>> ACCESS LOCKING STARTS >>>> */
263 wait_for_sem(c->accesslock, NULL);
264 ensure_workitems_empty_slot(c);
265 qhead = c->head_workitem;
266 c->workitems[qhead % c->workitems_alloc] = hdr;
267 c->head_workitem = 1 + qhead;
268 tickle_sem(c->accesslock);
269 /* <<<< ACCESS LOCKING ENDS <<<< */
270
271 /* queue consumer wake-up notification */
272 tickle_sem(c->workitems_pending);
273
274 return 0;
275 }
276
277 /* --------------------------------------------------------------------
278 * API function to make sure a worker is running, a proper private copy
279 * of the data is made, the data eneterd into the queue and the worker
280 * is signalled.
281 */
282 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)283 send_blocking_req_internal(
284 blocking_child * c,
285 blocking_pipe_header * hdr,
286 void * data
287 )
288 {
289 blocking_pipe_header * threadcopy;
290 size_t payload_octets;
291
292 REQUIRE(hdr != NULL);
293 REQUIRE(data != NULL);
294 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
295
296 if (hdr->octets <= sizeof(*hdr))
297 return 1; /* failure */
298 payload_octets = hdr->octets - sizeof(*hdr);
299
300 if (NULL == c->thread_ref)
301 start_blocking_thread(c);
302 threadcopy = emalloc(hdr->octets);
303 memcpy(threadcopy, hdr, sizeof(*hdr));
304 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
305
306 return queue_req_pointer(c, threadcopy);
307 }
308
309 /* --------------------------------------------------------------------
310 * Wait for the 'incoming queue no longer empty' signal, lock the shared
311 * structure and dequeue an item.
312 */
313 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)314 receive_blocking_req_internal(
315 blocking_child * c
316 )
317 {
318 blocking_pipe_header * req;
319 size_t qhead, qtail;
320
321 req = NULL;
322 do {
323 /* wait for tickle from the producer side */
324 wait_for_sem(c->workitems_pending, NULL);
325
326 /* >>>> ACCESS LOCKING STARTS >>>> */
327 wait_for_sem(c->accesslock, NULL);
328 qhead = c->head_workitem;
329 do {
330 qtail = c->tail_workitem;
331 if (qhead == qtail)
332 break;
333 c->tail_workitem = qtail + 1;
334 qtail %= c->workitems_alloc;
335 req = c->workitems[qtail];
336 c->workitems[qtail] = NULL;
337 } while (NULL == req);
338 tickle_sem(c->accesslock);
339 /* <<<< ACCESS LOCKING ENDS <<<< */
340
341 } while (NULL == req);
342
343 INSIST(NULL != req);
344 if (CHILD_EXIT_REQ == req) { /* idled out */
345 send_blocking_resp_internal(c, CHILD_GONE_RESP);
346 req = NULL;
347 }
348
349 return req;
350 }
351
352 /* --------------------------------------------------------------------
353 * Push a response into the return queue and eventually tickle the
354 * receiver.
355 */
356 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)357 send_blocking_resp_internal(
358 blocking_child * c,
359 blocking_pipe_header * resp
360 )
361 {
362 size_t qhead;
363 int empty;
364
365 /* >>>> ACCESS LOCKING STARTS >>>> */
366 wait_for_sem(c->accesslock, NULL);
367 empty = ensure_workresp_empty_slot(c);
368 qhead = c->head_response;
369 c->responses[qhead % c->responses_alloc] = resp;
370 c->head_response = 1 + qhead;
371 tickle_sem(c->accesslock);
372 /* <<<< ACCESS LOCKING ENDS <<<< */
373
374 /* queue consumer wake-up notification */
375 if (empty)
376 {
377 # ifdef WORK_PIPE
378 if (1 != write(c->resp_write_pipe, "", 1))
379 msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
380 " failed to notify main thread!",
381 (BLOCKING_GETNAMEINFO == resp->rtype)
382 ? "name"
383 : "addr"
384 );
385 # else
386 tickle_sem(c->responses_pending);
387 # endif
388 }
389 return 0;
390 }
391
392
393 #ifndef WORK_PIPE
394
395 /* --------------------------------------------------------------------
396 * Check if a (Windows-)handle to a semaphore is actually the same we
397 * are using inside the sema wrapper.
398 */
399 static BOOL
same_os_sema(const sem_ref obj,void * osh)400 same_os_sema(
401 const sem_ref obj,
402 void* osh
403 )
404 {
405 return obj && osh && (obj->shnd == (HANDLE)osh);
406 }
407
408 /* --------------------------------------------------------------------
409 * Find the shared context that associates to an OS handle and make sure
410 * the data is dequeued and processed.
411 */
412 void
handle_blocking_resp_sem(void * context)413 handle_blocking_resp_sem(
414 void * context
415 )
416 {
417 blocking_child * c;
418 u_int idx;
419
420 c = NULL;
421 for (idx = 0; idx < blocking_children_alloc; idx++) {
422 c = blocking_children[idx];
423 if (c != NULL &&
424 c->thread_ref != NULL &&
425 same_os_sema(c->responses_pending, context))
426 break;
427 }
428 if (idx < blocking_children_alloc)
429 process_blocking_resp(c);
430 }
431 #endif /* !WORK_PIPE */
432
433 /* --------------------------------------------------------------------
434 * Fetch the next response from the return queue. In case of signalling
435 * via pipe, make sure the pipe is flushed, too.
436 */
437 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)438 receive_blocking_resp_internal(
439 blocking_child * c
440 )
441 {
442 blocking_pipe_header * removed;
443 size_t qhead, qtail, slot;
444
445 #ifdef WORK_PIPE
446 int rc;
447 char scratch[32];
448
449 do
450 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
451 while (-1 == rc && EINTR == errno);
452 #endif
453
454 /* >>>> ACCESS LOCKING STARTS >>>> */
455 wait_for_sem(c->accesslock, NULL);
456 qhead = c->head_response;
457 qtail = c->tail_response;
458 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
459 slot = qtail % c->responses_alloc;
460 removed = c->responses[slot];
461 c->responses[slot] = NULL;
462 }
463 c->tail_response = qtail;
464 tickle_sem(c->accesslock);
465 /* <<<< ACCESS LOCKING ENDS <<<< */
466
467 if (NULL != removed) {
468 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
469 BLOCKING_RESP_MAGIC == removed->magic_sig);
470 }
471 if (CHILD_GONE_RESP == removed) {
472 cleanup_after_child(c);
473 removed = NULL;
474 }
475
476 return removed;
477 }
478
479 /* --------------------------------------------------------------------
480 * Light up a new worker.
481 */
482 static void
start_blocking_thread(blocking_child * c)483 start_blocking_thread(
484 blocking_child * c
485 )
486 {
487
488 DEBUG_INSIST(!c->reusable);
489
490 prepare_child_sems(c);
491 start_blocking_thread_internal(c);
492 }
493
494 /* --------------------------------------------------------------------
495 * Create a worker thread. There are several differences between POSIX
496 * and Windows, of course -- most notably the Windows thread is a
497 * detached thread, and we keep the handle around until we want to get
498 * rid of the thread. The notification scheme also differs: Windows
499 * makes use of semaphores in both directions, POSIX uses a pipe for
500 * integration with 'select()' or alike.
501 */
502 static void
start_blocking_thread_internal(blocking_child * c)503 start_blocking_thread_internal(
504 blocking_child * c
505 )
506 #ifdef SYS_WINNT
507 {
508 BOOL resumed;
509
510 c->thread_ref = NULL;
511 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
512 c->thr_table[0].thnd =
513 (HANDLE)_beginthreadex(
514 NULL,
515 0,
516 &blocking_thread,
517 c,
518 CREATE_SUSPENDED,
519 NULL);
520
521 if (NULL == c->thr_table[0].thnd) {
522 msyslog(LOG_ERR, "start blocking thread failed: %m");
523 exit(-1);
524 }
525 /* remember the thread priority is only within the process class */
526 if (!SetThreadPriority(c->thr_table[0].thnd,
527 THREAD_PRIORITY_BELOW_NORMAL)) {
528 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
529 }
530 if (NULL != pSetThreadDescription) {
531 (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
532 }
533 resumed = ResumeThread(c->thr_table[0].thnd);
534 DEBUG_INSIST(resumed);
535 c->thread_ref = &c->thr_table[0];
536 }
537 #else /* pthreads start_blocking_thread_internal() follows */
538 {
539 # ifdef NEED_PTHREAD_INIT
540 static int pthread_init_called;
541 # endif
542 pthread_attr_t thr_attr;
543 int rc;
544 int pipe_ends[2]; /* read then write */
545 int is_pipe;
546 int flags;
547 size_t ostacksize;
548 size_t nstacksize;
549 sigset_t saved_sig_mask;
550
551 c->thread_ref = NULL;
552
553 # ifdef NEED_PTHREAD_INIT
554 /*
555 * from lib/isc/unix/app.c:
556 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
557 */
558 if (!pthread_init_called) {
559 pthread_init();
560 pthread_init_called = TRUE;
561 }
562 # endif
563
564 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
565 if (0 != rc) {
566 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
567 exit(1);
568 }
569 c->resp_read_pipe = move_fd(pipe_ends[0]);
570 c->resp_write_pipe = move_fd(pipe_ends[1]);
571 c->ispipe = is_pipe;
572 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
573 if (-1 == flags) {
574 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
575 exit(1);
576 }
577 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
578 if (-1 == rc) {
579 msyslog(LOG_ERR,
580 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
581 exit(1);
582 }
583 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
584 pthread_attr_init(&thr_attr);
585 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
586 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
587 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
588 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
589 if (0 != rc) {
590 msyslog(LOG_ERR,
591 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
592 strerror(rc));
593 } else {
594 nstacksize = ostacksize;
595 /* order is important here: first clamp on upper limit,
596 * and the PTHREAD min stack size is ultimate override!
597 */
598 if (nstacksize > THREAD_MAXSTACKSIZE)
599 nstacksize = THREAD_MAXSTACKSIZE;
600 # ifdef PTHREAD_STACK_MAX
601 if (nstacksize > PTHREAD_STACK_MAX)
602 nstacksize = PTHREAD_STACK_MAX;
603 # endif
604
605 /* now clamp on lower stack limit. */
606 if (nstacksize < THREAD_MINSTACKSIZE)
607 nstacksize = THREAD_MINSTACKSIZE;
608 # ifdef PTHREAD_STACK_MIN
609 if (nstacksize < PTHREAD_STACK_MIN)
610 nstacksize = PTHREAD_STACK_MIN;
611 # endif
612
613 if (nstacksize != ostacksize)
614 rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
615 if (0 != rc)
616 msyslog(LOG_ERR,
617 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
618 (u_long)ostacksize, (u_long)nstacksize,
619 strerror(rc));
620 }
621 #else
622 UNUSED_ARG(nstacksize);
623 UNUSED_ARG(ostacksize);
624 #endif
625 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
626 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
627 #endif
628 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
629 block_thread_signals(&saved_sig_mask);
630 rc = pthread_create(&c->thr_table[0], &thr_attr,
631 &blocking_thread, c);
632 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
633 pthread_attr_destroy(&thr_attr);
634 if (0 != rc) {
635 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
636 strerror(rc));
637 exit(1);
638 }
639 c->thread_ref = &c->thr_table[0];
640 }
641 #endif
642
643 /* --------------------------------------------------------------------
644 * block_thread_signals()
645 *
646 * Temporarily block signals used by ntpd main thread, so that signal
647 * mask inherited by child threads leaves them blocked. Returns prior
648 * active signal mask via pmask, to be restored by the main thread
649 * after pthread_create().
650 */
651 #ifndef SYS_WINNT
652 void
block_thread_signals(sigset_t * pmask)653 block_thread_signals(
654 sigset_t * pmask
655 )
656 {
657 sigset_t block;
658
659 sigemptyset(&block);
660 # ifdef HAVE_SIGNALED_IO
661 # ifdef SIGIO
662 sigaddset(&block, SIGIO);
663 # endif
664 # ifdef SIGPOLL
665 sigaddset(&block, SIGPOLL);
666 # endif
667 # endif /* HAVE_SIGNALED_IO */
668 sigaddset(&block, SIGALRM);
669 sigaddset(&block, MOREDEBUGSIG);
670 sigaddset(&block, LESSDEBUGSIG);
671 # ifdef SIGDIE1
672 sigaddset(&block, SIGDIE1);
673 # endif
674 # ifdef SIGDIE2
675 sigaddset(&block, SIGDIE2);
676 # endif
677 # ifdef SIGDIE3
678 sigaddset(&block, SIGDIE3);
679 # endif
680 # ifdef SIGDIE4
681 sigaddset(&block, SIGDIE4);
682 # endif
683 # ifdef SIGBUS
684 sigaddset(&block, SIGBUS);
685 # endif
686 sigemptyset(pmask);
687 pthread_sigmask(SIG_BLOCK, &block, pmask);
688 }
689 #endif /* !SYS_WINNT */
690
691
692 /* --------------------------------------------------------------------
693 * Create & destroy semaphores. This is sufficiently different between
694 * POSIX and Windows to warrant wrapper functions and close enough to
695 * use the concept of synchronization via semaphore for all platforms.
696 */
697 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)698 create_sema(
699 sema_type* semptr,
700 u_int inival,
701 u_int maxval)
702 {
703 #ifdef SYS_WINNT
704
705 long svini, svmax;
706 if (NULL != semptr) {
707 svini = (inival < LONG_MAX)
708 ? (long)inival : LONG_MAX;
709 svmax = (maxval < LONG_MAX && maxval > 0)
710 ? (long)maxval : LONG_MAX;
711 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
712 if (NULL == semptr->shnd)
713 semptr = NULL;
714 }
715
716 #else
717
718 (void)maxval;
719 if (semptr && sem_init(semptr, FALSE, inival))
720 semptr = NULL;
721
722 #endif
723
724 return semptr;
725 }
726
727 /* ------------------------------------------------------------------ */
728 static sem_ref
delete_sema(sem_ref obj)729 delete_sema(
730 sem_ref obj)
731 {
732
733 # ifdef SYS_WINNT
734
735 if (obj) {
736 if (obj->shnd)
737 CloseHandle(obj->shnd);
738 obj->shnd = NULL;
739 }
740
741 # else
742
743 if (obj)
744 sem_destroy(obj);
745
746 # endif
747
748 return NULL;
749 }
750
751 /* --------------------------------------------------------------------
752 * prepare_child_sems()
753 *
754 * create sync & access semaphores
755 *
756 * All semaphores are cleared, only the access semaphore has 1 unit.
757 * Childs wait on 'workitems_pending', then grabs 'sema_access'
758 * and dequeues jobs. When done, 'sema_access' is given one unit back.
759 *
760 * The producer grabs 'sema_access', manages the queue, restores
761 * 'sema_access' and puts one unit into 'workitems_pending'.
762 *
763 * The story goes the same for the response queue.
764 */
765 static void
prepare_child_sems(blocking_child * c)766 prepare_child_sems(
767 blocking_child *c
768 )
769 {
770 if (NULL == worker_memlock)
771 worker_memlock = create_sema(&worker_mmutex, 1, 1);
772
773 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
774 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
775 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
776 # ifndef WORK_PIPE
777 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
778 # endif
779 }
780
781 /* --------------------------------------------------------------------
782 * wait for semaphore. Where the wait can be interrupted, it will
783 * internally resume -- When this function returns, there is either no
784 * semaphore at all, a timeout occurred, or the caller could
785 * successfully take a token from the semaphore.
786 *
787 * For untimed wait, not checking the result of this function at all is
788 * definitely an option.
789 */
790 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)791 wait_for_sem(
792 sem_ref sem,
793 struct timespec * timeout /* wall-clock */
794 )
795 #ifdef SYS_WINNT
796 {
797 struct timespec now;
798 struct timespec delta;
799 DWORD msec;
800 DWORD rc;
801
802 if (!(sem && sem->shnd)) {
803 errno = EINVAL;
804 return -1;
805 }
806
807 if (NULL == timeout) {
808 msec = INFINITE;
809 } else {
810 getclock(TIMEOFDAY, &now);
811 delta = sub_tspec(*timeout, now);
812 if (delta.tv_sec < 0) {
813 msec = 0;
814 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
815 msec = INFINITE;
816 } else {
817 msec = 1000 * (DWORD)delta.tv_sec;
818 msec += delta.tv_nsec / (1000 * 1000);
819 }
820 }
821 rc = WaitForSingleObject(sem->shnd, msec);
822 if (WAIT_OBJECT_0 == rc)
823 return 0;
824 if (WAIT_TIMEOUT == rc) {
825 errno = ETIMEDOUT;
826 return -1;
827 }
828 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
829 errno = EFAULT;
830 return -1;
831 }
832 #else /* pthreads wait_for_sem() follows */
833 {
834 int rc = -1;
835
836 if (sem) do {
837 if (NULL == timeout)
838 rc = sem_wait(sem);
839 else
840 rc = sem_timedwait(sem, timeout);
841 } while (rc == -1 && errno == EINTR);
842 else
843 errno = EINVAL;
844
845 return rc;
846 }
847 #endif
848
849 /* --------------------------------------------------------------------
850 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
851 * calling conventions under Windows and POSIX-defined signature
852 * otherwise.
853 */
854 #ifdef SYS_WINNT
855 u_int WINAPI
856 #else
857 void *
858 #endif
blocking_thread(void * ThreadArg)859 blocking_thread(
860 void * ThreadArg
861 )
862 {
863 blocking_child *c;
864
865 c = ThreadArg;
866 exit_worker(blocking_child_common(c));
867
868 /* NOTREACHED */
869 return 0;
870 }
871
872 /* --------------------------------------------------------------------
873 * req_child_exit() runs in the parent.
874 *
875 * This function is called from from the idle timer, too, and possibly
876 * without a thread being there any longer. Since we have folded up our
877 * tent in that case and all the semaphores are already gone, we simply
878 * ignore this request in this case.
879 *
880 * Since the existence of the semaphores is controlled exclusively by
881 * the parent, there's no risk of data race here.
882 */
883 int
req_child_exit(blocking_child * c)884 req_child_exit(
885 blocking_child *c
886 )
887 {
888 return (c->accesslock)
889 ? queue_req_pointer(c, CHILD_EXIT_REQ)
890 : 0;
891 }
892
893 /* --------------------------------------------------------------------
894 * cleanup_after_child() runs in parent.
895 */
896 static void
cleanup_after_child(blocking_child * c)897 cleanup_after_child(
898 blocking_child * c
899 )
900 {
901 DEBUG_INSIST(!c->reusable);
902
903 # ifdef SYS_WINNT
904 /* The thread was not created in detached state, so we better
905 * clean up.
906 */
907 if (c->thread_ref && c->thread_ref->thnd) {
908 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
909 INSIST(CloseHandle(c->thread_ref->thnd));
910 c->thread_ref->thnd = NULL;
911 }
912 # endif
913 c->thread_ref = NULL;
914
915 /* remove semaphores and (if signalling vi IO) pipes */
916
917 c->accesslock = delete_sema(c->accesslock);
918 c->workitems_pending = delete_sema(c->workitems_pending);
919 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
920
921 # ifdef WORK_PIPE
922 DEBUG_INSIST(-1 != c->resp_read_pipe);
923 DEBUG_INSIST(-1 != c->resp_write_pipe);
924 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
925 close(c->resp_write_pipe);
926 close(c->resp_read_pipe);
927 c->resp_write_pipe = -1;
928 c->resp_read_pipe = -1;
929 # else
930 DEBUG_INSIST(NULL != c->responses_pending);
931 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
932 c->responses_pending = delete_sema(c->responses_pending);
933 # endif
934
935 /* Is it necessary to check if there are pending requests and
936 * responses? If so, and if there are, what to do with them?
937 */
938
939 /* re-init buffer index sequencers */
940 c->head_workitem = 0;
941 c->tail_workitem = 0;
942 c->head_response = 0;
943 c->tail_response = 0;
944
945 c->reusable = TRUE;
946 }
947
948
949 #else /* !WORK_THREAD follows */
950 char work_thread_nonempty_compilation_unit;
951 #endif
952