xref: /netbsd/external/bsd/ntp/dist/libntp/ntp_worker.c (revision 9034ec65)
1 /*	$NetBSD: ntp_worker.c,v 1.7 2020/05/25 20:47:24 christos Exp $	*/
2 
3 /*
4  * ntp_worker.c
5  */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8 
9 #ifdef WORKER
10 
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 
15 #include "iosignal.h"
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "intreswork.h"
24 
25 
26 #define CHILD_MAX_IDLE	(3 * 60)	/* seconds, idle worker limit */
27 
28 blocking_child **	blocking_children;
29 size_t			blocking_children_alloc;
30 int			worker_per_query;	/* boolean */
31 int			intres_req_pending;
32 volatile u_int		blocking_child_ready_seen;
33 volatile u_int		blocking_child_ready_done;
34 
35 
36 #ifndef HAVE_IO_COMPLETION_PORT
37 /*
38  * pipe_socketpair()
39  *
40  * Provides an AF_UNIX socketpair on systems which have them, otherwise
41  * pair of unidirectional pipes.
42  */
43 int
pipe_socketpair(int caller_fds[2],int * is_pipe)44 pipe_socketpair(
45 	int	caller_fds[2],
46 	int *	is_pipe
47 	)
48 {
49 	int	rc;
50 	int	fds[2];
51 	int	called_pipe;
52 
53 #ifdef HAVE_SOCKETPAIR
54 	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]);
55 #else
56 	rc = -1;
57 #endif
58 
59 	if (-1 == rc) {
60 		rc = pipe(&fds[0]);
61 		called_pipe = TRUE;
62 	} else {
63 		called_pipe = FALSE;
64 	}
65 
66 	if (-1 == rc)
67 		return rc;
68 
69 	caller_fds[0] = fds[0];
70 	caller_fds[1] = fds[1];
71 	if (is_pipe != NULL)
72 		*is_pipe = called_pipe;
73 
74 	return 0;
75 }
76 
77 
78 /*
79  * close_all_except()
80  *
81  * Close all file descriptors except the given keep_fd.
82  */
83 void
close_all_except(int keep_fd)84 close_all_except(
85 	int keep_fd
86 	)
87 {
88 	int fd;
89 
90 	for (fd = 0; fd < keep_fd; fd++)
91 		close(fd);
92 
93 	close_all_beyond(keep_fd);
94 }
95 
96 
97 /*
98  * close_all_beyond()
99  *
100  * Close all file descriptors after the given keep_fd, which is the
101  * highest fd to keep open.
102  */
103 void
close_all_beyond(int keep_fd)104 close_all_beyond(
105 	int keep_fd
106 	)
107 {
108 # ifdef HAVE_CLOSEFROM
109 	closefrom(keep_fd + 1);
110 # elif defined(F_CLOSEM)
111 	/*
112 	 * From 'Writing Reliable AIX Daemons,' SG24-4946-00,
113 	 * by Eric Agar (saves us from doing 32767 system
114 	 * calls)
115 	 */
116 	if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1)
117 		msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1);
118 # else	/* !HAVE_CLOSEFROM && !F_CLOSEM follows */
119 	int fd;
120 	int max_fd;
121 
122 	max_fd = GETDTABLESIZE();
123 	for (fd = keep_fd + 1; fd < max_fd; fd++)
124 		close(fd);
125 # endif	/* !HAVE_CLOSEFROM && !F_CLOSEM */
126 }
127 #endif	/* HAVE_IO_COMPLETION_PORT */
128 
129 
130 u_int
available_blocking_child_slot(void)131 available_blocking_child_slot(void)
132 {
133 	const size_t	each = sizeof(blocking_children[0]);
134 	u_int		slot;
135 	size_t		prev_alloc;
136 	size_t		new_alloc;
137 	size_t		prev_octets;
138 	size_t		octets;
139 
140 	for (slot = 0; slot < blocking_children_alloc; slot++) {
141 		if (NULL == blocking_children[slot])
142 			return slot;
143 		if (blocking_children[slot]->reusable) {
144 			blocking_children[slot]->reusable = FALSE;
145 			return slot;
146 		}
147 	}
148 
149 	prev_alloc = blocking_children_alloc;
150 	prev_octets = prev_alloc * each;
151 	new_alloc = blocking_children_alloc + 4;
152 	octets = new_alloc * each;
153 	blocking_children = erealloc_zero(blocking_children, octets,
154 					  prev_octets);
155 	blocking_children_alloc = new_alloc;
156 
157 	/* assume we'll never have enough workers to overflow u_int */
158 	return (u_int)prev_alloc;
159 }
160 
161 
162 int
queue_blocking_request(blocking_work_req rtype,void * req,size_t reqsize,blocking_work_callback done_func,void * context)163 queue_blocking_request(
164 	blocking_work_req	rtype,
165 	void *			req,
166 	size_t			reqsize,
167 	blocking_work_callback	done_func,
168 	void *			context
169 	)
170 {
171 	static u_int		intres_slot = UINT_MAX;
172 	u_int			child_slot;
173 	blocking_child *	c;
174 	blocking_pipe_header	req_hdr;
175 
176 	req_hdr.octets = sizeof(req_hdr) + reqsize;
177 	req_hdr.magic_sig = BLOCKING_REQ_MAGIC;
178 	req_hdr.rtype = rtype;
179 	req_hdr.done_func = done_func;
180 	req_hdr.context = context;
181 
182 	child_slot = UINT_MAX;
183 	if (worker_per_query || UINT_MAX == intres_slot ||
184 	    blocking_children[intres_slot]->reusable)
185 		child_slot = available_blocking_child_slot();
186 	if (!worker_per_query) {
187 		if (UINT_MAX == intres_slot)
188 			intres_slot = child_slot;
189 		else
190 			child_slot = intres_slot;
191 		if (0 == intres_req_pending)
192 			intres_timeout_req(0);
193 	}
194 	intres_req_pending++;
195 	INSIST(UINT_MAX != child_slot);
196 	c = blocking_children[child_slot];
197 	if (NULL == c) {
198 		c = emalloc_zero(sizeof(*c));
199 #ifdef WORK_FORK
200 		c->req_read_pipe = -1;
201 		c->req_write_pipe = -1;
202 #endif
203 #ifdef WORK_PIPE
204 		c->resp_read_pipe = -1;
205 		c->resp_write_pipe = -1;
206 #endif
207 		blocking_children[child_slot] = c;
208 	}
209 	req_hdr.child_idx = child_slot;
210 
211 	return send_blocking_req_internal(c, &req_hdr, req);
212 }
213 
214 
queue_blocking_response(blocking_child * c,blocking_pipe_header * resp,size_t respsize,const blocking_pipe_header * req)215 int queue_blocking_response(
216 	blocking_child *		c,
217 	blocking_pipe_header *		resp,
218 	size_t				respsize,
219 	const blocking_pipe_header *	req
220 	)
221 {
222 	resp->octets = respsize;
223 	resp->magic_sig = BLOCKING_RESP_MAGIC;
224 	resp->rtype = req->rtype;
225 	resp->context = req->context;
226 	resp->done_func = req->done_func;
227 
228 	return send_blocking_resp_internal(c, resp);
229 }
230 
231 
232 void
process_blocking_resp(blocking_child * c)233 process_blocking_resp(
234 	blocking_child *	c
235 	)
236 {
237 	blocking_pipe_header *	resp;
238 	void *			data;
239 
240 	/*
241 	 * On Windows send_blocking_resp_internal() may signal the
242 	 * blocking_response_ready event multiple times while we're
243 	 * processing a response, so always consume all available
244 	 * responses before returning to test the event again.
245 	 */
246 #ifdef WORK_THREAD
247 	do {
248 #endif
249 		resp = receive_blocking_resp_internal(c);
250 		if (NULL != resp) {
251 			DEBUG_REQUIRE(BLOCKING_RESP_MAGIC ==
252 				      resp->magic_sig);
253 			data = (char *)resp + sizeof(*resp);
254 			intres_req_pending--;
255 			(*resp->done_func)(resp->rtype, resp->context,
256 					   resp->octets - sizeof(*resp),
257 					   data);
258 			free(resp);
259 		}
260 #ifdef WORK_THREAD
261 	} while (NULL != resp);
262 #endif
263 	if (!worker_per_query && 0 == intres_req_pending)
264 		intres_timeout_req(CHILD_MAX_IDLE);
265 	else if (worker_per_query)
266 		req_child_exit(c);
267 }
268 
269 void
harvest_blocking_responses(void)270 harvest_blocking_responses(void)
271 {
272 	size_t		idx;
273 	blocking_child*	cp;
274 	u_int		scseen, scdone;
275 
276 	scseen = blocking_child_ready_seen;
277 	scdone = blocking_child_ready_done;
278 	if (scdone != scseen) {
279 		blocking_child_ready_done = scseen;
280 		for (idx = 0; idx < blocking_children_alloc; idx++) {
281 			cp = blocking_children[idx];
282 			if (NULL == cp)
283 				continue;
284 			scseen = cp->resp_ready_seen;
285 			scdone = cp->resp_ready_done;
286 			if (scdone != scseen) {
287 				cp->resp_ready_done = scseen;
288 				process_blocking_resp(cp);
289 			}
290 		}
291 	}
292 }
293 
294 
295 /*
296  * blocking_child_common runs as a forked child or a thread
297  */
298 int
blocking_child_common(blocking_child * c)299 blocking_child_common(
300 	blocking_child	*c
301 	)
302 {
303 	int say_bye;
304 	blocking_pipe_header *req;
305 
306 	say_bye = FALSE;
307 	while (!say_bye) {
308 		req = receive_blocking_req_internal(c);
309 		if (NULL == req) {
310 			say_bye = TRUE;
311 			continue;
312 		}
313 
314 		DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig);
315 
316 		switch (req->rtype) {
317 		case BLOCKING_GETADDRINFO:
318 			if (blocking_getaddrinfo(c, req))
319 				say_bye = TRUE;
320 			break;
321 
322 		case BLOCKING_GETNAMEINFO:
323 			if (blocking_getnameinfo(c, req))
324 				say_bye = TRUE;
325 			break;
326 
327 		default:
328 			msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype);
329 			say_bye = TRUE;
330 		}
331 
332 		free(req);
333 	}
334 
335 	return 0;
336 }
337 
338 
339 /*
340  * worker_idle_timer_fired()
341  *
342  * The parent starts this timer when the last pending response has been
343  * received from the child, making it idle, and clears the timer when a
344  * request is dispatched to the child.  Once the timer expires, the
345  * child is sent packing.
346  *
347  * This is called when worker_idle_timer is nonzero and less than or
348  * equal to current_time.
349  */
350 void
worker_idle_timer_fired(void)351 worker_idle_timer_fired(void)
352 {
353 	u_int			idx;
354 	blocking_child *	c;
355 
356 	DEBUG_REQUIRE(0 == intres_req_pending);
357 
358 	intres_timeout_req(0);
359 	for (idx = 0; idx < blocking_children_alloc; idx++) {
360 		c = blocking_children[idx];
361 		if (NULL == c)
362 			continue;
363 		req_child_exit(c);
364 	}
365 }
366 
367 
368 #else	/* !WORKER follows */
369 int ntp_worker_nonempty_compilation_unit;
370 #endif
371