xref: /openbsd/usr.sbin/rpki-client/rrdp.c (revision 4bdff4be)
1 /*	$OpenBSD: rrdp.c,v 1.32 2023/06/23 11:36:24 claudio Exp $ */
2 /*
3  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
4  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 #include <sys/queue.h>
19 #include <sys/stat.h>
20 
21 #include <err.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <poll.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <imsg.h>
29 
30 #include <expat.h>
31 #include <openssl/sha.h>
32 
33 #include "extern.h"
34 #include "rrdp.h"
35 
36 #define MAX_SESSIONS	12
37 #define	READ_BUF_SIZE	(32 * 1024)
38 
39 static struct msgbuf	msgq;
40 
41 #define RRDP_STATE_REQ		0x01
42 #define RRDP_STATE_WAIT		0x02
43 #define RRDP_STATE_PARSE	0x04
44 #define RRDP_STATE_PARSE_ERROR	0x08
45 #define RRDP_STATE_PARSE_DONE	0x10
46 #define RRDP_STATE_HTTP_DONE	0x20
47 #define RRDP_STATE_DONE		(RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
48 
49 struct rrdp {
50 	TAILQ_ENTRY(rrdp)	 entry;
51 	unsigned int		 id;
52 	char			*notifyuri;
53 	char			*local;
54 	char			*last_mod;
55 
56 	struct pollfd		*pfd;
57 	int			 infd;
58 	int			 state;
59 	int			 aborted;
60 	unsigned int		 file_pending;
61 	unsigned int		 file_failed;
62 	enum http_result	 res;
63 	enum rrdp_task		 task;
64 
65 	char			 hash[SHA256_DIGEST_LENGTH];
66 	SHA256_CTX		 ctx;
67 
68 	struct rrdp_session	*repository;
69 	struct rrdp_session	*current;
70 	XML_Parser		 parser;
71 	struct notification_xml	*nxml;
72 	struct snapshot_xml	*sxml;
73 	struct delta_xml	*dxml;
74 };
75 
76 static TAILQ_HEAD(, rrdp)	states = TAILQ_HEAD_INITIALIZER(states);
77 
78 char *
79 xstrdup(const char *s)
80 {
81 	char *r;
82 	if ((r = strdup(s)) == NULL)
83 		err(1, "strdup");
84 	return r;
85 }
86 
87 /*
88  * Report back that a RRDP request finished.
89  * ok should only be set to 1 if the cache is now up-to-date.
90  */
91 static void
92 rrdp_done(unsigned int id, int ok)
93 {
94 	enum rrdp_msg type = RRDP_END;
95 	struct ibuf *b;
96 
97 	b = io_new_buffer();
98 	io_simple_buffer(b, &type, sizeof(type));
99 	io_simple_buffer(b, &id, sizeof(id));
100 	io_simple_buffer(b, &ok, sizeof(ok));
101 	io_close_buffer(&msgq, b);
102 }
103 
104 /*
105  * Request an URI to be fetched via HTTPS.
106  * The main process will respond with a RRDP_HTTP_INI which includes
107  * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
108  * end of the request with the HTTP status code and last modified timestamp.
109  * If the request should not set the If-Modified-Since: header then last_mod
110  * should be set to NULL, else it should point to a proper date string.
111  */
112 static void
113 rrdp_http_req(unsigned int id, const char *uri, const char *last_mod)
114 {
115 	enum rrdp_msg type = RRDP_HTTP_REQ;
116 	struct ibuf *b;
117 
118 	b = io_new_buffer();
119 	io_simple_buffer(b, &type, sizeof(type));
120 	io_simple_buffer(b, &id, sizeof(id));
121 	io_str_buffer(b, uri);
122 	io_str_buffer(b, last_mod);
123 	io_close_buffer(&msgq, b);
124 }
125 
126 /*
127  * Send the session state to the main process so it gets stored.
128  */
129 static void
130 rrdp_state_send(struct rrdp *s)
131 {
132 	enum rrdp_msg type = RRDP_SESSION;
133 	struct ibuf *b;
134 
135 	b = io_new_buffer();
136 	io_simple_buffer(b, &type, sizeof(type));
137 	io_simple_buffer(b, &s->id, sizeof(s->id));
138 	rrdp_session_buffer(b, s->current);
139 	io_close_buffer(&msgq, b);
140 }
141 
142 /*
143  * Inform parent to clear the RRDP repository before start of snapshot.
144  */
145 static void
146 rrdp_clear_repo(struct rrdp *s)
147 {
148 	enum rrdp_msg type = RRDP_CLEAR;
149 	struct ibuf *b;
150 
151 	b = io_new_buffer();
152 	io_simple_buffer(b, &type, sizeof(type));
153 	io_simple_buffer(b, &s->id, sizeof(s->id));
154 	io_close_buffer(&msgq, b);
155 }
156 
157 /*
158  * Send a blob of data to the main process to store it in the repository.
159  */
160 void
161 rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
162     unsigned char *data, size_t datasz)
163 {
164 	enum rrdp_msg type = RRDP_FILE;
165 	struct ibuf *b;
166 
167 	/* only send files if the fetch did not fail already */
168 	if (s->file_failed == 0) {
169 		b = io_new_buffer();
170 		io_simple_buffer(b, &type, sizeof(type));
171 		io_simple_buffer(b, &s->id, sizeof(s->id));
172 		io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
173 		if (pxml->type != PUB_ADD)
174 			io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
175 		io_str_buffer(b, pxml->uri);
176 		io_buf_buffer(b, data, datasz);
177 		io_close_buffer(&msgq, b);
178 		s->file_pending++;
179 	}
180 }
181 
182 static void
183 rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
184 {
185 	struct rrdp *s;
186 
187 	if ((s = calloc(1, sizeof(*s))) == NULL)
188 		err(1, NULL);
189 
190 	s->infd = -1;
191 	s->id = id;
192 	s->local = local;
193 	s->notifyuri = notify;
194 	s->repository = state;
195 	if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
196 		err(1, NULL);
197 
198 	s->state = RRDP_STATE_REQ;
199 	if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
200 		err(1, "XML_ParserCreate");
201 
202 	s->nxml = new_notification_xml(s->parser, s->repository, s->current,
203 	    notify);
204 
205 	TAILQ_INSERT_TAIL(&states, s, entry);
206 }
207 
208 static void
209 rrdp_free(struct rrdp *s)
210 {
211 	if (s == NULL)
212 		return;
213 
214 	TAILQ_REMOVE(&states, s, entry);
215 
216 	free_notification_xml(s->nxml);
217 	free_snapshot_xml(s->sxml);
218 	free_delta_xml(s->dxml);
219 
220 	if (s->parser)
221 		XML_ParserFree(s->parser);
222 	if (s->infd != -1)
223 		close(s->infd);
224 	free(s->notifyuri);
225 	free(s->local);
226 	free(s->last_mod);
227 	rrdp_session_free(s->repository);
228 	rrdp_session_free(s->current);
229 
230 	free(s);
231 }
232 
233 static struct rrdp *
234 rrdp_get(unsigned int id)
235 {
236 	struct rrdp *s;
237 
238 	TAILQ_FOREACH(s, &states, entry)
239 		if (s->id == id)
240 			break;
241 	return s;
242 }
243 
244 static void
245 rrdp_failed(struct rrdp *s)
246 {
247 	unsigned int id = s->id;
248 
249 	/* reset file state before retrying */
250 	s->file_failed = 0;
251 
252 	if (s->task == DELTA && !s->aborted) {
253 		/* fallback to a snapshot as per RFC8182 */
254 		free_delta_xml(s->dxml);
255 		s->dxml = NULL;
256 		rrdp_clear_repo(s);
257 		s->sxml = new_snapshot_xml(s->parser, s->current, s);
258 		s->task = SNAPSHOT;
259 		s->state = RRDP_STATE_REQ;
260 		logx("%s: delta sync failed, fallback to snapshot", s->local);
261 	} else {
262 		/*
263 		 * TODO: update state to track recurring failures
264 		 * and fall back to rsync after a while.
265 		 * This should probably happen in the main process.
266 		 */
267 		rrdp_free(s);
268 		rrdp_done(id, 0);
269 	}
270 }
271 
272 static void
273 rrdp_finished(struct rrdp *s)
274 {
275 	unsigned int id = s->id;
276 
277 	/* check if all parts of the process have finished */
278 	if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
279 		return;
280 
281 	/* still some files pending */
282 	if (s->file_pending > 0)
283 		return;
284 
285 	if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) {
286 		rrdp_failed(s);
287 		return;
288 	}
289 
290 	if (s->res == HTTP_OK) {
291 		XML_Parser p = s->parser;
292 
293 		/*
294 		 * Finalize parsing on success to be sure that
295 		 * all of the XML is correct. Needs to be done here
296 		 * since the call would most probably fail for non
297 		 * successful data fetches.
298 		 */
299 		if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
300 			warnx("%s: XML error at line %llu: %s", s->local,
301 			    (unsigned long long)XML_GetCurrentLineNumber(p),
302 			    XML_ErrorString(XML_GetErrorCode(p)));
303 			rrdp_failed(s);
304 			return;
305 		}
306 
307 		/* If a file caused an error fail the update */
308 		if (s->file_failed > 0) {
309 			rrdp_failed(s);
310 			return;
311 		}
312 
313 		switch (s->task) {
314 		case NOTIFICATION:
315 			s->task = notification_done(s->nxml, s->last_mod);
316 			s->last_mod = NULL;
317 			switch (s->task) {
318 			case NOTIFICATION:
319 				logx("%s: repository not modified (%s#%lld)",
320 				    s->local, s->repository->session_id,
321 				    s->repository->serial);
322 				rrdp_state_send(s);
323 				rrdp_free(s);
324 				rrdp_done(id, 1);
325 				break;
326 			case SNAPSHOT:
327 				logx("%s: downloading snapshot (%s#%lld)",
328 				    s->local, s->current->session_id,
329 				    s->current->serial);
330 				rrdp_clear_repo(s);
331 				s->sxml = new_snapshot_xml(p, s->current, s);
332 				s->state = RRDP_STATE_REQ;
333 				break;
334 			case DELTA:
335 				logx("%s: downloading %lld deltas (%s#%lld)",
336 				    s->local,
337 				    s->repository->serial - s->current->serial,
338 				    s->current->session_id, s->current->serial);
339 				s->dxml = new_delta_xml(p, s->current, s);
340 				s->state = RRDP_STATE_REQ;
341 				break;
342 			}
343 			break;
344 		case SNAPSHOT:
345 			rrdp_state_send(s);
346 			rrdp_free(s);
347 			rrdp_done(id, 1);
348 			break;
349 		case DELTA:
350 			if (notification_delta_done(s->nxml)) {
351 				/* finished */
352 				rrdp_state_send(s);
353 				rrdp_free(s);
354 				rrdp_done(id, 1);
355 			} else {
356 				/* reset delta parser for next delta */
357 				free_delta_xml(s->dxml);
358 				s->dxml = new_delta_xml(p, s->current, s);
359 				s->state = RRDP_STATE_REQ;
360 			}
361 			break;
362 		}
363 	} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
364 		logx("%s: notification file not modified (%s#%lld)", s->local,
365 		    s->repository->session_id, s->repository->serial);
366 		/* no need to update state file */
367 		rrdp_free(s);
368 		rrdp_done(id, 1);
369 	} else {
370 		rrdp_failed(s);
371 	}
372 }
373 
374 static void
375 rrdp_abort_req(struct rrdp *s)
376 {
377 	unsigned int id = s->id;
378 
379 	s->aborted = 1;
380 	if (s->state == RRDP_STATE_REQ) {
381 		/* nothing is pending, just abort */
382 		rrdp_free(s);
383 		rrdp_done(id, 1);
384 		return;
385 	}
386 	if (s->state == RRDP_STATE_WAIT)
387 		/* wait for HTTP_INI which will progress the state */
388 		return;
389 
390 	/*
391 	 * RRDP_STATE_PARSE or later, close infd, abort parser but
392 	 * wait for HTTP_FIN and file_pending to drop to 0.
393 	 */
394 	if (s->infd != -1) {
395 		close(s->infd);
396 		s->infd = -1;
397 		s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR;
398 	}
399 	rrdp_finished(s);
400 }
401 
402 static void
403 rrdp_input_handler(int fd)
404 {
405 	static struct ibuf *inbuf;
406 	struct rrdp_session *state;
407 	char *local, *notify, *last_mod;
408 	struct ibuf *b;
409 	struct rrdp *s;
410 	enum rrdp_msg type;
411 	enum http_result res;
412 	unsigned int id;
413 	int ok;
414 
415 	b = io_buf_recvfd(fd, &inbuf);
416 	if (b == NULL)
417 		return;
418 
419 	io_read_buf(b, &type, sizeof(type));
420 	io_read_buf(b, &id, sizeof(id));
421 
422 	switch (type) {
423 	case RRDP_START:
424 		if (ibuf_fd_avail(b))
425 			errx(1, "received unexpected fd");
426 		io_read_str(b, &local);
427 		io_read_str(b, &notify);
428 		state = rrdp_session_read(b);
429 		rrdp_new(id, local, notify, state);
430 		break;
431 	case RRDP_HTTP_INI:
432 		s = rrdp_get(id);
433 		if (s == NULL)
434 			errx(1, "http ini, rrdp session %u does not exist", id);
435 		if (s->state != RRDP_STATE_WAIT)
436 			errx(1, "%s: bad internal state", s->local);
437 		s->infd = ibuf_fd_get(b);
438 		if (s->infd == -1)
439 			errx(1, "expected fd not received");
440 		s->state = RRDP_STATE_PARSE;
441 		if (s->aborted) {
442 			rrdp_abort_req(s);
443 			break;
444 		}
445 		break;
446 	case RRDP_HTTP_FIN:
447 		io_read_buf(b, &res, sizeof(res));
448 		io_read_str(b, &last_mod);
449 		if (ibuf_fd_avail(b))
450 			errx(1, "received unexpected fd");
451 
452 		s = rrdp_get(id);
453 		if (s == NULL)
454 			errx(1, "http fin, rrdp session %u does not exist", id);
455 		if (!(s->state & RRDP_STATE_PARSE))
456 			errx(1, "%s: bad internal state", s->local);
457 		s->state |= RRDP_STATE_HTTP_DONE;
458 		s->res = res;
459 		free(s->last_mod);
460 		s->last_mod = last_mod;
461 		rrdp_finished(s);
462 		break;
463 	case RRDP_FILE:
464 		s = rrdp_get(id);
465 		if (s == NULL)
466 			errx(1, "file, rrdp session %u does not exist", id);;
467 		if (ibuf_fd_avail(b))
468 			errx(1, "received unexpected fd");
469 		io_read_buf(b, &ok, sizeof(ok));
470 		if (ok != 1)
471 			s->file_failed++;
472 		s->file_pending--;
473 		if (s->file_pending == 0)
474 			rrdp_finished(s);
475 		break;
476 	case RRDP_ABORT:
477 		if (ibuf_fd_avail(b))
478 			errx(1, "received unexpected fd");
479 		s = rrdp_get(id);
480 		if (s != NULL)
481 			rrdp_abort_req(s);
482 		break;
483 	default:
484 		errx(1, "unexpected message %d", type);
485 	}
486 	ibuf_free(b);
487 }
488 
489 static void
490 rrdp_data_handler(struct rrdp *s)
491 {
492 	char buf[READ_BUF_SIZE];
493 	XML_Parser p = s->parser;
494 	ssize_t len;
495 
496 	len = read(s->infd, buf, sizeof(buf));
497 	if (len == -1) {
498 		warn("%s: read failure", s->local);
499 		rrdp_abort_req(s);
500 		return;
501 	}
502 	if ((s->state & RRDP_STATE_PARSE) == 0)
503 		errx(1, "%s: bad parser state", s->local);
504 	if (len == 0) {
505 		/* parser stage finished */
506 		close(s->infd);
507 		s->infd = -1;
508 
509 		if (s->task != NOTIFICATION) {
510 			char h[SHA256_DIGEST_LENGTH];
511 
512 			SHA256_Final(h, &s->ctx);
513 			if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
514 				s->state |= RRDP_STATE_PARSE_ERROR;
515 				warnx("%s: bad message digest", s->local);
516 			}
517 		}
518 
519 		s->state |= RRDP_STATE_PARSE_DONE;
520 		rrdp_finished(s);
521 		return;
522 	}
523 
524 	/* parse and maybe hash the bytes just read */
525 	if (s->task != NOTIFICATION)
526 		SHA256_Update(&s->ctx, buf, len);
527 	if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
528 	    XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
529 		warnx("%s: parse error at line %llu: %s", s->local,
530 		    (unsigned long long)XML_GetCurrentLineNumber(p),
531 		    XML_ErrorString(XML_GetErrorCode(p)));
532 		s->state |= RRDP_STATE_PARSE_ERROR;
533 	}
534 }
535 
536 void
537 proc_rrdp(int fd)
538 {
539 	struct pollfd pfds[MAX_SESSIONS + 1];
540 	struct rrdp *s, *ns;
541 	size_t i;
542 
543 	if (pledge("stdio recvfd", NULL) == -1)
544 		err(1, "pledge");
545 
546 	msgbuf_init(&msgq);
547 	msgq.fd = fd;
548 
549 	for (;;) {
550 		i = 1;
551 		memset(&pfds, 0, sizeof(pfds));
552 		TAILQ_FOREACH(s, &states, entry) {
553 			if (i >= MAX_SESSIONS + 1) {
554 				/* not enough sessions, wait for better times */
555 				s->pfd = NULL;
556 				continue;
557 			}
558 			/* request new assets when there are free sessions */
559 			if (s->state == RRDP_STATE_REQ) {
560 				const char *uri;
561 				switch (s->task) {
562 				case NOTIFICATION:
563 					rrdp_http_req(s->id, s->notifyuri,
564 					    s->repository->last_mod);
565 					break;
566 				case SNAPSHOT:
567 				case DELTA:
568 					uri = notification_get_next(s->nxml,
569 					    s->hash, sizeof(s->hash),
570 					    s->task);
571 					SHA256_Init(&s->ctx);
572 					rrdp_http_req(s->id, uri, NULL);
573 					break;
574 				}
575 				s->state = RRDP_STATE_WAIT;
576 			}
577 			s->pfd = pfds + i++;
578 			s->pfd->fd = s->infd;
579 			s->pfd->events = POLLIN;
580 		}
581 
582 		/*
583 		 * Update main fd last.
584 		 * The previous loop may have enqueue messages.
585 		 */
586 		pfds[0].fd = fd;
587 		pfds[0].events = POLLIN;
588 		if (msgq.queued)
589 			pfds[0].events |= POLLOUT;
590 
591 		if (poll(pfds, i, INFTIM) == -1) {
592 			if (errno == EINTR)
593 				continue;
594 			err(1, "poll");
595 		}
596 
597 		if (pfds[0].revents & POLLHUP)
598 			break;
599 		if (pfds[0].revents & POLLOUT) {
600 			switch (msgbuf_write(&msgq)) {
601 			case 0:
602 				errx(1, "write: connection closed");
603 			case -1:
604 				err(1, "write");
605 			}
606 		}
607 		if (pfds[0].revents & POLLIN)
608 			rrdp_input_handler(fd);
609 
610 		TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
611 			if (s->pfd == NULL)
612 				continue;
613 			if (s->pfd->revents != 0)
614 				rrdp_data_handler(s);
615 		}
616 	}
617 
618 	exit(0);
619 }
620