1 /*-
2  * Copyright (c) 2012 The FreeBSD Foundation
3  * All rights reserved.
4  *
5  * This software was developed by Pawel Jakub Dawidek under sponsorship from
6  * the FreeBSD Foundation.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 #include <config/config.h>
31 
32 #include <sys/param.h>
33 #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
34 #include <sys/endian.h>
35 #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
36 #ifdef HAVE_MACHINE_ENDIAN_H
37 #include <machine/endian.h>
38 #else /* !HAVE_MACHINE_ENDIAN_H */
39 #ifdef HAVE_ENDIAN_H
40 #include <endian.h>
41 #else /* !HAVE_ENDIAN_H */
42 #error "No supported endian.h"
43 #endif /* !HAVE_ENDIAN_H */
44 #endif /* !HAVE_MACHINE_ENDIAN_H */
45 #include <compat/endian.h>
46 #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
47 #include <sys/queue.h>
48 #include <sys/stat.h>
49 #include <sys/wait.h>
50 
51 #include <stdio.h>
52 #include <stdlib.h>
53 #include <unistd.h>
54 
55 #include <ctype.h>
56 #include <dirent.h>
57 #include <err.h>
58 #include <errno.h>
59 #include <fcntl.h>
60 #ifdef HAVE_LIBUTIL_H
61 #include <libutil.h>
62 #endif
63 #include <signal.h>
64 #include <string.h>
65 #include <strings.h>
66 
67 #include <openssl/hmac.h>
68 
69 #ifndef HAVE_SIGTIMEDWAIT
70 #include "sigtimedwait.h"
71 #endif
72 
73 #include "auditdistd.h"
74 #include "pjdlog.h"
75 #include "proto.h"
76 #include "sandbox.h"
77 #include "subr.h"
78 #include "synch.h"
79 #include "trail.h"
80 
81 static struct adist_config *adcfg;
82 static struct adist_host *adhost;
83 
84 static pthread_rwlock_t adist_remote_lock;
85 static pthread_mutex_t adist_remote_mtx;
86 static pthread_cond_t adist_remote_cond;
87 static struct trail *adist_trail;
88 
89 static TAILQ_HEAD(, adreq) adist_free_list;
90 static pthread_mutex_t adist_free_list_lock;
91 static pthread_cond_t adist_free_list_cond;
92 static TAILQ_HEAD(, adreq) adist_send_list;
93 static pthread_mutex_t adist_send_list_lock;
94 static pthread_cond_t adist_send_list_cond;
95 static TAILQ_HEAD(, adreq) adist_recv_list;
96 static pthread_mutex_t adist_recv_list_lock;
97 static pthread_cond_t adist_recv_list_cond;
98 
99 static void
100 init_environment(void)
101 {
102 	struct adreq *adreq;
103 	unsigned int ii;
104 
105 	rw_init(&adist_remote_lock);
106 	mtx_init(&adist_remote_mtx);
107 	cv_init(&adist_remote_cond);
108 	TAILQ_INIT(&adist_free_list);
109 	mtx_init(&adist_free_list_lock);
110 	cv_init(&adist_free_list_cond);
111 	TAILQ_INIT(&adist_send_list);
112 	mtx_init(&adist_send_list_lock);
113 	cv_init(&adist_send_list_cond);
114 	TAILQ_INIT(&adist_recv_list);
115 	mtx_init(&adist_recv_list_lock);
116 	cv_init(&adist_recv_list_cond);
117 
118 	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
119 		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
120 		if (adreq == NULL) {
121 			pjdlog_exitx(EX_TEMPFAIL,
122 			    "Unable to allocate %zu bytes of memory for adreq object.",
123 			    sizeof(*adreq) + ADIST_BUF_SIZE);
124 		}
125 		adreq->adr_byteorder = ADIST_BYTEORDER;
126 		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
127 		adreq->adr_seq = 0;
128 		adreq->adr_datasize = 0;
129 		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
130 	}
131 }
132 
133 static int
134 sender_connect(void)
135 {
136 	unsigned char rnd[32], hash[32], resp[32];
137 	struct proto_conn *conn;
138 	char welcome[8];
139 	int16_t val;
140 
141 	val = 1;
142 	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
143 		pjdlog_exit(EX_TEMPFAIL,
144 		    "Unable to send connection request to parent");
145 	}
146 	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
147 		pjdlog_exit(EX_TEMPFAIL,
148 		    "Unable to receive reply to connection request from parent");
149 	}
150 	if (val != 0) {
151 		errno = val;
152 		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
153 		    adhost->adh_remoteaddr);
154 		return (-1);
155 	}
156 	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
157 		pjdlog_exit(EX_TEMPFAIL,
158 		    "Unable to receive connection from parent");
159 	}
160 	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
161 		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
162 		    adhost->adh_remoteaddr);
163 		proto_close(conn);
164 		return (-1);
165 	}
166 	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
167 	/* Error in setting timeout is not critical, but why should it fail? */
168 	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
169 		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
170 	else
171 		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
172 
173 	/* Exchange welcome message, which includes version number. */
174 	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
175 	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
176 		pjdlog_errno(LOG_WARNING,
177 		    "Unable to send welcome message to %s",
178 		    adhost->adh_remoteaddr);
179 		proto_close(conn);
180 		return (-1);
181 	}
182 	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
183 	bzero(welcome, sizeof(welcome));
184 	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
185 		pjdlog_errno(LOG_WARNING,
186 		    "Unable to receive welcome message from %s",
187 		    adhost->adh_remoteaddr);
188 		proto_close(conn);
189 		return (-1);
190 	}
191 	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
192 	    !isdigit(welcome[6]) || welcome[7] != '\0') {
193 		pjdlog_warning("Invalid welcome message from %s.",
194 		    adhost->adh_remoteaddr);
195 		proto_close(conn);
196 		return (-1);
197 	}
198 	pjdlog_debug(1, "Welcome message received (%s).", welcome);
199 	/*
200 	 * Receiver can only reply with version number lower or equal to
201 	 * the one we sent.
202 	 */
203 	adhost->adh_version = atoi(welcome + 5);
204 	if (adhost->adh_version > ADIST_VERSION) {
205 		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
206 		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
207 		proto_close(conn);
208 		return (-1);
209 	}
210 
211 	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
212 	    adhost->adh_remoteaddr);
213 
214 	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
215 		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
216 		    adhost->adh_remoteaddr);
217 		proto_close(conn);
218 		return (-1);
219 	}
220 	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
221 
222 	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
223 		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
224 		    adhost->adh_remoteaddr);
225 		proto_close(conn);
226 		return (-1);
227 	}
228 	pjdlog_debug(1, "Challenge received.");
229 
230 	if (HMAC(EVP_sha256(), adhost->adh_password,
231 	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
232 	    NULL) == NULL) {
233 		pjdlog_warning("Unable to generate response.");
234 		proto_close(conn);
235 		return (-1);
236 	}
237 	pjdlog_debug(1, "Response generated.");
238 
239 	if (proto_send(conn, hash, sizeof(hash)) == -1) {
240 		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
241 		    adhost->adh_remoteaddr);
242 		proto_close(conn);
243 		return (-1);
244 	}
245 	pjdlog_debug(1, "Response sent.");
246 
247 	if (adist_random(rnd, sizeof(rnd)) == -1) {
248 		pjdlog_warning("Unable to generate challenge.");
249 		proto_close(conn);
250 		return (-1);
251 	}
252 	pjdlog_debug(1, "Challenge generated.");
253 
254 	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
255 		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
256 		    adhost->adh_remoteaddr);
257 		proto_close(conn);
258 		return (-1);
259 	}
260 	pjdlog_debug(1, "Challenge sent.");
261 
262 	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
263 		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
264 		    adhost->adh_remoteaddr);
265 		proto_close(conn);
266 		return (-1);
267 	}
268 	pjdlog_debug(1, "Response received.");
269 
270 	if (HMAC(EVP_sha256(), adhost->adh_password,
271 	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
272 	    NULL) == NULL) {
273 		pjdlog_warning("Unable to generate hash.");
274 		proto_close(conn);
275 		return (-1);
276 	}
277 	pjdlog_debug(1, "Hash generated.");
278 
279 	if (memcmp(resp, hash, sizeof(hash)) != 0) {
280 		pjdlog_warning("Invalid response from %s (wrong password?).",
281 		    adhost->adh_remoteaddr);
282 		proto_close(conn);
283 		return (-1);
284 	}
285 	pjdlog_info("Receiver authenticated.");
286 
287 	if (proto_recv(conn, &adhost->adh_trail_offset,
288 	    sizeof(adhost->adh_trail_offset)) == -1) {
289 		pjdlog_errno(LOG_WARNING,
290 		    "Unable to receive size of the most recent trail file from %s",
291 		    adhost->adh_remoteaddr);
292 		proto_close(conn);
293 		return (-1);
294 	}
295 	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
296 	if (proto_recv(conn, &adhost->adh_trail_name,
297 	    sizeof(adhost->adh_trail_name)) == -1) {
298 		pjdlog_errno(LOG_WARNING,
299 		    "Unable to receive name of the most recent trail file from %s",
300 		    adhost->adh_remoteaddr);
301 		proto_close(conn);
302 		return (-1);
303 	}
304 	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
305 	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
306 
307 	rw_wlock(&adist_remote_lock);
308 	mtx_lock(&adist_remote_mtx);
309 	PJDLOG_ASSERT(adhost->adh_remote == NULL);
310 	PJDLOG_ASSERT(conn != NULL);
311 	adhost->adh_remote = conn;
312 	mtx_unlock(&adist_remote_mtx);
313 	rw_unlock(&adist_remote_lock);
314 	cv_signal(&adist_remote_cond);
315 
316 	return (0);
317 }
318 
319 static void
320 sender_disconnect(void)
321 {
322 
323 	rw_wlock(&adist_remote_lock);
324 	/*
325 	 * Check for a race between dropping rlock and acquiring wlock -
326 	 * another thread can close connection in-between.
327 	 */
328 	if (adhost->adh_remote == NULL) {
329 		rw_unlock(&adist_remote_lock);
330 		return;
331 	}
332 	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
333 	proto_close(adhost->adh_remote);
334 	mtx_lock(&adist_remote_mtx);
335 	adhost->adh_remote = NULL;
336 	adhost->adh_reset = true;
337 	adhost->adh_trail_name[0] = '\0';
338 	adhost->adh_trail_offset = 0;
339 	mtx_unlock(&adist_remote_mtx);
340 	rw_unlock(&adist_remote_lock);
341 
342 	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
343 
344 	/* Move all in-flight requests back onto free list. */
345 	mtx_lock(&adist_free_list_lock);
346 	mtx_lock(&adist_send_list_lock);
347 	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
348 	mtx_unlock(&adist_send_list_lock);
349 	mtx_lock(&adist_recv_list_lock);
350 	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
351 	mtx_unlock(&adist_recv_list_lock);
352 	mtx_unlock(&adist_free_list_lock);
353 }
354 
355 static void
356 adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
357     size_t size)
358 {
359 	static uint64_t seq = 1;
360 
361 	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
362 
363 	switch (cmd) {
364 	case ADIST_CMD_OPEN:
365 	case ADIST_CMD_CLOSE:
366 		PJDLOG_ASSERT(data != NULL && size == 0);
367 		size = strlen(data) + 1;
368 		break;
369 	case ADIST_CMD_APPEND:
370 		PJDLOG_ASSERT(data != NULL && size > 0);
371 		break;
372 	case ADIST_CMD_KEEPALIVE:
373 	case ADIST_CMD_ERROR:
374 		PJDLOG_ASSERT(data == NULL && size == 0);
375 		break;
376 	default:
377 		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
378 	}
379 
380 	adreq->adr_cmd = cmd;
381 	adreq->adr_seq = seq++;
382 	adreq->adr_datasize = size;
383 	/* Don't copy if data is already in out buffer. */
384 	if (data != NULL && data != adreq->adr_data)
385 		bcopy(data, adreq->adr_data, size);
386 }
387 
388 static bool
389 read_thread_wait(void)
390 {
391 	bool newfile = false;
392 
393 	mtx_lock(&adist_remote_mtx);
394 	if (adhost->adh_reset) {
395 reset:
396 		adhost->adh_reset = false;
397 		if (trail_filefd(adist_trail) != -1)
398 			trail_close(adist_trail);
399 		trail_reset(adist_trail);
400 		while (adhost->adh_remote == NULL)
401 			cv_wait(&adist_remote_cond, &adist_remote_mtx);
402 		trail_start(adist_trail, adhost->adh_trail_name,
403 		    adhost->adh_trail_offset);
404 		newfile = true;
405 	}
406 	mtx_unlock(&adist_remote_mtx);
407 	while (trail_filefd(adist_trail) == -1) {
408 		newfile = true;
409 		wait_for_dir();
410 		/*
411 		 * We may have been disconnected and reconnected in the
412 		 * meantime, check if reset is set.
413 		 */
414 		mtx_lock(&adist_remote_mtx);
415 		if (adhost->adh_reset)
416 			goto reset;
417 		mtx_unlock(&adist_remote_mtx);
418 		if (trail_filefd(adist_trail) == -1)
419 			trail_next(adist_trail);
420 	}
421 	if (newfile) {
422 		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
423 		    adhost->adh_directory,
424 		    trail_filename(adist_trail));
425 		(void)wait_for_file_init(trail_filefd(adist_trail));
426 	}
427 	return (newfile);
428 }
429 
430 static void *
431 read_thread(void *arg __unused)
432 {
433 	struct adreq *adreq;
434 	ssize_t done;
435 	bool newfile;
436 
437 	pjdlog_debug(1, "%s started.", __func__);
438 
439 	for (;;) {
440 		newfile = read_thread_wait();
441 		QUEUE_TAKE(adreq, &adist_free_list, 0);
442 		if (newfile) {
443 			adreq_fill(adreq, ADIST_CMD_OPEN,
444 			    trail_filename(adist_trail), 0);
445 			newfile = false;
446 			goto move;
447 		}
448 
449 		done = read(trail_filefd(adist_trail), adreq->adr_data,
450 		    ADIST_BUF_SIZE);
451 		if (done == -1) {
452 			off_t offset;
453 			int error;
454 
455 			error = errno;
456 			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
457 			errno = error;
458 			pjdlog_errno(LOG_ERR,
459 			    "Error while reading \"%s/%s\" at offset %jd",
460 			    adhost->adh_directory, trail_filename(adist_trail),
461 			    offset);
462 			trail_close(adist_trail);
463 			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
464 			goto move;
465 		} else if (done == 0) {
466 			/* End of file. */
467 			pjdlog_debug(3, "End of \"%s/%s\".",
468 			    adhost->adh_directory, trail_filename(adist_trail));
469 			if (!trail_switch(adist_trail)) {
470 				/* More audit records can arrive. */
471 				mtx_lock(&adist_free_list_lock);
472 				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
473 				    adr_next);
474 				mtx_unlock(&adist_free_list_lock);
475 				wait_for_file();
476 				continue;
477 			}
478 			adreq_fill(adreq, ADIST_CMD_CLOSE,
479 			    trail_filename(adist_trail), 0);
480 			trail_close(adist_trail);
481 			goto move;
482 		}
483 
484 		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
485 move:
486 		pjdlog_debug(3,
487 		    "read thread: Moving request %p to the send queue (%hhu).",
488 		    adreq, adreq->adr_cmd);
489 		QUEUE_INSERT(adreq, &adist_send_list);
490 	}
491 	/* NOTREACHED */
492 	return (NULL);
493 }
494 
495 static void
496 keepalive_send(void)
497 {
498 	struct adreq *adreq;
499 
500 	rw_rlock(&adist_remote_lock);
501 	if (adhost->adh_remote == NULL) {
502 		rw_unlock(&adist_remote_lock);
503 		return;
504 	}
505 	rw_unlock(&adist_remote_lock);
506 
507 	mtx_lock(&adist_free_list_lock);
508 	adreq = TAILQ_FIRST(&adist_free_list);
509 	if (adreq != NULL)
510 		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
511 	mtx_unlock(&adist_free_list_lock);
512 	if (adreq == NULL)
513 		return;
514 
515 	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
516 
517 	QUEUE_INSERT(adreq, &adist_send_list);
518 
519 	pjdlog_debug(3, "keepalive_send: Request sent.");
520 }
521 
522 /*
523  * Thread sends request to secondary node.
524  */
525 static void *
526 send_thread(void *arg __unused)
527 {
528 	time_t lastcheck, now;
529 	struct adreq *adreq;
530 
531 	pjdlog_debug(1, "%s started.", __func__);
532 
533 	lastcheck = time(NULL);
534 
535 	for (;;) {
536 		pjdlog_debug(3, "send thread: Taking request.");
537 		for (;;) {
538 			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
539 			if (adreq != NULL)
540 				break;
541 			now = time(NULL);
542 			if (lastcheck + ADIST_KEEPALIVE <= now) {
543 				keepalive_send();
544 				lastcheck = now;
545 			}
546 		}
547 		PJDLOG_ASSERT(adreq != NULL);
548 		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
549 		    adreq->adr_cmd);
550 		/*
551 		 * Protect connection from disappearing.
552 		 */
553 		rw_rlock(&adist_remote_lock);
554 		/*
555 		 * Move the request to the recv queue first to avoid race
556 		 * where the recv thread receives the reply before we move
557 		 * the request to the recv queue.
558 		 */
559 		QUEUE_INSERT(adreq, &adist_recv_list);
560 		if (adhost->adh_remote == NULL ||
561 		    proto_send(adhost->adh_remote, &adreq->adr_packet,
562 		    ADPKT_SIZE(adreq)) == -1) {
563 			rw_unlock(&adist_remote_lock);
564 			pjdlog_debug(1,
565 			    "send thread: (%p) Unable to send request.", adreq);
566 			if (adhost->adh_remote != NULL)
567 				sender_disconnect();
568 			continue;
569 		} else {
570 			pjdlog_debug(3, "Request %p sent successfully.", adreq);
571 			adreq_log(LOG_DEBUG, 2, -1, adreq,
572 			    "send: (%p) Request sent: ", adreq);
573 			rw_unlock(&adist_remote_lock);
574 		}
575 	}
576 	/* NOTREACHED */
577 	return (NULL);
578 }
579 
580 static void
581 adrep_decode_header(struct adrep *adrep)
582 {
583 
584 	/* Byte-swap only is the receiver is using different byte order. */
585 	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
586 		adrep->adrp_byteorder = ADIST_BYTEORDER;
587 		adrep->adrp_seq = bswap64(adrep->adrp_seq);
588 		adrep->adrp_error = bswap16(adrep->adrp_error);
589 	}
590 }
591 
592 /*
593  * Thread receives answer from secondary node and passes it to ggate_send
594  * thread.
595  */
596 static void *
597 recv_thread(void *arg __unused)
598 {
599 	struct adrep adrep;
600 	struct adreq *adreq;
601 
602 	pjdlog_debug(1, "%s started.", __func__);
603 
604 	for (;;) {
605 		/* Wait until there is anything to receive. */
606 		QUEUE_WAIT(&adist_recv_list);
607 		pjdlog_debug(3, "recv thread: Got something.");
608 		rw_rlock(&adist_remote_lock);
609 		if (adhost->adh_remote == NULL) {
610 			/*
611 			 * Connection is dead.
612 			 * XXX: We shouldn't be here.
613 			 */
614 			rw_unlock(&adist_remote_lock);
615 			continue;
616 		}
617 		if (proto_recv(adhost->adh_remote, &adrep,
618 		    sizeof(adrep)) == -1) {
619 			rw_unlock(&adist_remote_lock);
620 			pjdlog_errno(LOG_ERR, "Unable to receive reply");
621 			sender_disconnect();
622 			continue;
623 		}
624 		rw_unlock(&adist_remote_lock);
625 		adrep_decode_header(&adrep);
626 		/*
627 		 * Find the request that was just confirmed.
628 		 */
629 		mtx_lock(&adist_recv_list_lock);
630 		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
631 			if (adreq->adr_seq == adrep.adrp_seq) {
632 				TAILQ_REMOVE(&adist_recv_list, adreq,
633 				    adr_next);
634 				break;
635 			}
636 		}
637 		if (adreq == NULL) {
638 			/*
639 			 * If we disconnected in the meantime, just continue.
640 			 * On disconnect sender_disconnect() clears the queue,
641 			 * we can use that.
642 			 */
643 			if (TAILQ_EMPTY(&adist_recv_list)) {
644 				mtx_unlock(&adist_recv_list_lock);
645 				continue;
646 			}
647 			mtx_unlock(&adist_recv_list_lock);
648 			pjdlog_error("Found no request matching received 'seq' field (%ju).",
649 			    (uintmax_t)adrep.adrp_seq);
650 			sender_disconnect();
651 			continue;
652 		}
653 		mtx_unlock(&adist_recv_list_lock);
654 		adreq_log(LOG_DEBUG, 2, -1, adreq,
655 		    "recv thread: (%p) Request confirmed: ", adreq);
656 		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
657 		    adreq->adr_cmd);
658 		if (adrep.adrp_error != 0) {
659 			pjdlog_error("Receiver returned error (%s), disconnecting.",
660 			    adist_errstr((int)adrep.adrp_error));
661 			sender_disconnect();
662 			continue;
663 		}
664 		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
665 			trail_unlink(adist_trail, adreq->adr_data);
666 		pjdlog_debug(3, "Request received successfully.");
667 		QUEUE_INSERT(adreq, &adist_free_list);
668 	}
669 	/* NOTREACHED */
670 	return (NULL);
671 }
672 
673 static void
674 guard_check_connection(void)
675 {
676 
677 	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
678 
679 	rw_rlock(&adist_remote_lock);
680 	if (adhost->adh_remote != NULL) {
681 		rw_unlock(&adist_remote_lock);
682 		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
683 		    adhost->adh_remoteaddr);
684 		return;
685 	}
686 
687 	/*
688 	 * Upgrade the lock. It doesn't have to be atomic as no other thread
689 	 * can change connection status from disconnected to connected.
690 	 */
691 	rw_unlock(&adist_remote_lock);
692 	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
693 	    adhost->adh_remoteaddr);
694 	if (sender_connect() == 0) {
695 		pjdlog_info("Successfully reconnected to %s.",
696 		    adhost->adh_remoteaddr);
697 	} else {
698 		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
699 		    adhost->adh_remoteaddr);
700 	}
701 }
702 
703 /*
704  * Thread guards remote connections and reconnects when needed, handles
705  * signals, etc.
706  */
707 static void *
708 guard_thread(void *arg __unused)
709 {
710 	struct timespec timeout;
711 	time_t lastcheck, now;
712 	sigset_t mask;
713 	int signo;
714 
715 	lastcheck = time(NULL);
716 
717 	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
718 	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
719 	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
720 
721 	timeout.tv_sec = ADIST_KEEPALIVE;
722 	timeout.tv_nsec = 0;
723 	signo = -1;
724 
725 	for (;;) {
726 		switch (signo) {
727 		case SIGINT:
728 		case SIGTERM:
729 			sigexit_received = true;
730 			pjdlog_exitx(EX_OK,
731 			    "Termination signal received, exiting.");
732 			break;
733 		default:
734 			break;
735 		}
736 
737 		pjdlog_debug(3, "remote_guard: Checking connections.");
738 		now = time(NULL);
739 		if (lastcheck + ADIST_KEEPALIVE <= now) {
740 			guard_check_connection();
741 			lastcheck = now;
742 		}
743 		signo = sigtimedwait(&mask, NULL, &timeout);
744 	}
745 	/* NOTREACHED */
746 	return (NULL);
747 }
748 
749 void
750 adist_sender(struct adist_config *config, struct adist_host *adh)
751 {
752 	pthread_t td;
753 	pid_t pid;
754 	int error, mode, debuglevel;
755 
756 	/*
757 	 * Create communication channel for sending connection requests from
758 	 * child to parent.
759 	 */
760 	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
761 		pjdlog_errno(LOG_ERR,
762 		    "Unable to create connection sockets between child and parent");
763 		return;
764 	}
765 
766 	pid = fork();
767 	if (pid == -1) {
768 		pjdlog_errno(LOG_ERR, "Unable to fork");
769 		proto_close(adh->adh_conn);
770 		adh->adh_conn = NULL;
771 		return;
772 	}
773 
774 	if (pid > 0) {
775 		/* This is parent. */
776 		adh->adh_worker_pid = pid;
777 		/* Declare that we are receiver. */
778 		proto_recv(adh->adh_conn, NULL, 0);
779 		return;
780 	}
781 
782 	adcfg = config;
783 	adhost = adh;
784 
785 	mode = pjdlog_mode_get();
786 	debuglevel = pjdlog_debug_get();
787 
788 	/* Declare that we are sender. */
789 	proto_send(adhost->adh_conn, NULL, 0);
790 
791 	descriptors_cleanup(adhost);
792 
793 #ifdef TODO
794 	descriptors_assert(adhost, mode);
795 #endif
796 
797 	pjdlog_init(mode);
798 	pjdlog_debug_set(debuglevel);
799 	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
800 	    role2str(adhost->adh_role));
801 #ifdef HAVE_SETPROCTITLE
802 	setproctitle("[%s] (%s) ", adhost->adh_name,
803 	    role2str(adhost->adh_role));
804 #endif
805 
806 	/*
807 	 * The sender process should be able to remove entries from its
808 	 * trail directory, but it should not be able to write to the
809 	 * trail files, only read from them.
810 	 */
811 	adist_trail = trail_new(adhost->adh_directory, false);
812 	if (adist_trail == NULL)
813 		exit(EX_OSFILE);
814 
815 	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
816 	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
817 		exit(EX_CONFIG);
818 	}
819 	pjdlog_info("Privileges successfully dropped.");
820 
821 	/*
822 	 * We can ignore wait_for_dir_init() failures. It will fall back to
823 	 * using sleep(3).
824 	 */
825 	(void)wait_for_dir_init(trail_dirfd(adist_trail));
826 
827 	init_environment();
828 	if (sender_connect() == 0) {
829 		pjdlog_info("Successfully connected to %s.",
830 		    adhost->adh_remoteaddr);
831 	}
832 	adhost->adh_reset = true;
833 
834 	/*
835 	 * Create the guard thread first, so we can handle signals from the
836 	 * very begining.
837 	 */
838 	error = pthread_create(&td, NULL, guard_thread, NULL);
839 	PJDLOG_ASSERT(error == 0);
840 	error = pthread_create(&td, NULL, send_thread, NULL);
841 	PJDLOG_ASSERT(error == 0);
842 	error = pthread_create(&td, NULL, recv_thread, NULL);
843 	PJDLOG_ASSERT(error == 0);
844 	(void)read_thread(NULL);
845 }
846