xref: /openbsd/usr.sbin/rpki-client/rrdp.c (revision b5fa5d51)
1 /*	$OpenBSD: rrdp.c,v 1.39 2024/11/21 13:32:27 claudio Exp $ */
2 /*
3  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
4  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 #include <sys/queue.h>
19 #include <sys/stat.h>
20 
21 #include <err.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <poll.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <imsg.h>
29 
30 #include <expat.h>
31 #include <openssl/sha.h>
32 
33 #include "extern.h"
34 #include "rrdp.h"
35 
36 #define MAX_SESSIONS	32
37 #define	READ_BUF_SIZE	(32 * 1024)
38 
39 static struct msgbuf	*msgq;
40 
41 #define RRDP_STATE_REQ		0x01
42 #define RRDP_STATE_WAIT		0x02
43 #define RRDP_STATE_PARSE	0x04
44 #define RRDP_STATE_PARSE_ERROR	0x08
45 #define RRDP_STATE_PARSE_DONE	0x10
46 #define RRDP_STATE_HTTP_DONE	0x20
47 #define RRDP_STATE_DONE		(RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
48 
49 struct rrdp {
50 	TAILQ_ENTRY(rrdp)	 entry;
51 	unsigned int		 id;
52 	char			*notifyuri;
53 	char			*local;
54 	char			*last_mod;
55 
56 	struct pollfd		*pfd;
57 	int			 infd;
58 	int			 state;
59 	int			 aborted;
60 	unsigned int		 file_pending;
61 	unsigned int		 file_failed;
62 	enum http_result	 res;
63 	enum rrdp_task		 task;
64 
65 	char			 hash[SHA256_DIGEST_LENGTH];
66 	SHA256_CTX		 ctx;
67 
68 	struct rrdp_session	*repository;
69 	struct rrdp_session	*current;
70 	XML_Parser		 parser;
71 	struct notification_xml	*nxml;
72 	struct snapshot_xml	*sxml;
73 	struct delta_xml	*dxml;
74 };
75 
76 static TAILQ_HEAD(, rrdp)	states = TAILQ_HEAD_INITIALIZER(states);
77 
78 char *
xstrdup(const char * s)79 xstrdup(const char *s)
80 {
81 	char *r;
82 	if ((r = strdup(s)) == NULL)
83 		err(1, "strdup");
84 	return r;
85 }
86 
87 /*
88  * Report back that a RRDP request finished.
89  * ok should only be set to 1 if the cache is now up-to-date.
90  */
91 static void
rrdp_done(unsigned int id,int ok)92 rrdp_done(unsigned int id, int ok)
93 {
94 	enum rrdp_msg type = RRDP_END;
95 	struct ibuf *b;
96 
97 	b = io_new_buffer();
98 	io_simple_buffer(b, &type, sizeof(type));
99 	io_simple_buffer(b, &id, sizeof(id));
100 	io_simple_buffer(b, &ok, sizeof(ok));
101 	io_close_buffer(msgq, b);
102 }
103 
104 /*
105  * Request an URI to be fetched via HTTPS.
106  * The main process will respond with a RRDP_HTTP_INI which includes
107  * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
108  * end of the request with the HTTP status code and last modified timestamp.
109  * If the request should not set the If-Modified-Since: header then last_mod
110  * should be set to NULL, else it should point to a proper date string.
111  */
112 static void
rrdp_http_req(unsigned int id,const char * uri,const char * last_mod)113 rrdp_http_req(unsigned int id, const char *uri, const char *last_mod)
114 {
115 	enum rrdp_msg type = RRDP_HTTP_REQ;
116 	struct ibuf *b;
117 
118 	b = io_new_buffer();
119 	io_simple_buffer(b, &type, sizeof(type));
120 	io_simple_buffer(b, &id, sizeof(id));
121 	io_str_buffer(b, uri);
122 	io_str_buffer(b, last_mod);
123 	io_close_buffer(msgq, b);
124 }
125 
126 /*
127  * Send the session state to the main process so it gets stored.
128  */
129 static void
rrdp_state_send(struct rrdp * s)130 rrdp_state_send(struct rrdp *s)
131 {
132 	enum rrdp_msg type = RRDP_SESSION;
133 	struct ibuf *b;
134 
135 	b = io_new_buffer();
136 	io_simple_buffer(b, &type, sizeof(type));
137 	io_simple_buffer(b, &s->id, sizeof(s->id));
138 	rrdp_session_buffer(b, s->current);
139 	io_close_buffer(msgq, b);
140 }
141 
142 /*
143  * Inform parent to clear the RRDP repository before start of snapshot.
144  */
145 static void
rrdp_clear_repo(struct rrdp * s)146 rrdp_clear_repo(struct rrdp *s)
147 {
148 	enum rrdp_msg type = RRDP_CLEAR;
149 	struct ibuf *b;
150 
151 	b = io_new_buffer();
152 	io_simple_buffer(b, &type, sizeof(type));
153 	io_simple_buffer(b, &s->id, sizeof(s->id));
154 	io_close_buffer(msgq, b);
155 }
156 
157 /*
158  * Send a blob of data to the main process to store it in the repository.
159  */
160 void
rrdp_publish_file(struct rrdp * s,struct publish_xml * pxml,unsigned char * data,size_t datasz)161 rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
162     unsigned char *data, size_t datasz)
163 {
164 	enum rrdp_msg type = RRDP_FILE;
165 	struct ibuf *b;
166 
167 	/* only send files if the fetch did not fail already */
168 	if (s->file_failed == 0) {
169 		b = io_new_buffer();
170 		io_simple_buffer(b, &type, sizeof(type));
171 		io_simple_buffer(b, &s->id, sizeof(s->id));
172 		io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
173 		if (pxml->type != PUB_ADD)
174 			io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
175 		io_str_buffer(b, pxml->uri);
176 		io_buf_buffer(b, data, datasz);
177 		io_close_buffer(msgq, b);
178 		s->file_pending++;
179 	}
180 }
181 
182 static void
rrdp_new(unsigned int id,char * local,char * notify,struct rrdp_session * state)183 rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
184 {
185 	struct rrdp *s;
186 
187 	if ((s = calloc(1, sizeof(*s))) == NULL)
188 		err(1, NULL);
189 
190 	s->infd = -1;
191 	s->id = id;
192 	s->local = local;
193 	s->notifyuri = notify;
194 	s->repository = state;
195 	if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
196 		err(1, NULL);
197 
198 	s->state = RRDP_STATE_REQ;
199 	if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
200 		err(1, "XML_ParserCreate");
201 
202 	s->nxml = new_notification_xml(s->parser, s->repository, s->current,
203 	    notify);
204 
205 	TAILQ_INSERT_TAIL(&states, s, entry);
206 }
207 
208 static void
rrdp_free(struct rrdp * s)209 rrdp_free(struct rrdp *s)
210 {
211 	if (s == NULL)
212 		return;
213 
214 	TAILQ_REMOVE(&states, s, entry);
215 
216 	free_notification_xml(s->nxml);
217 	free_snapshot_xml(s->sxml);
218 	free_delta_xml(s->dxml);
219 
220 	if (s->parser)
221 		XML_ParserFree(s->parser);
222 	if (s->infd != -1)
223 		close(s->infd);
224 	free(s->notifyuri);
225 	free(s->local);
226 	free(s->last_mod);
227 	rrdp_session_free(s->repository);
228 	rrdp_session_free(s->current);
229 
230 	free(s);
231 }
232 
233 static struct rrdp *
rrdp_get(unsigned int id)234 rrdp_get(unsigned int id)
235 {
236 	struct rrdp *s;
237 
238 	TAILQ_FOREACH(s, &states, entry)
239 		if (s->id == id)
240 			break;
241 	return s;
242 }
243 
244 static void
rrdp_failed(struct rrdp * s)245 rrdp_failed(struct rrdp *s)
246 {
247 	unsigned int id = s->id;
248 
249 	/* reset file state before retrying */
250 	s->file_failed = 0;
251 
252 	if (s->task == DELTA && !s->aborted) {
253 		/* fallback to a snapshot as per RFC8182 */
254 		free_delta_xml(s->dxml);
255 		s->dxml = NULL;
256 		rrdp_clear_repo(s);
257 		s->sxml = new_snapshot_xml(s->parser, s->current, s);
258 		s->task = SNAPSHOT;
259 		s->state = RRDP_STATE_REQ;
260 		logx("%s: delta sync failed, fallback to snapshot", s->local);
261 	} else {
262 		/*
263 		 * TODO: update state to track recurring failures
264 		 * and fall back to rsync after a while.
265 		 * This should probably happen in the main process.
266 		 */
267 		rrdp_free(s);
268 		rrdp_done(id, 0);
269 	}
270 }
271 
272 static void
rrdp_finished(struct rrdp * s)273 rrdp_finished(struct rrdp *s)
274 {
275 	unsigned int id = s->id;
276 
277 	/* check if all parts of the process have finished */
278 	if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
279 		return;
280 
281 	/* still some files pending */
282 	if (s->file_pending > 0)
283 		return;
284 
285 	if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) {
286 		rrdp_failed(s);
287 		return;
288 	}
289 
290 	if (s->res == HTTP_OK) {
291 		XML_Parser p = s->parser;
292 
293 		/*
294 		 * Finalize parsing on success to be sure that
295 		 * all of the XML is correct. Needs to be done here
296 		 * since the call would most probably fail for non
297 		 * successful data fetches.
298 		 */
299 		if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
300 			warnx("%s: XML error at line %llu: %s", s->local,
301 			    (unsigned long long)XML_GetCurrentLineNumber(p),
302 			    XML_ErrorString(XML_GetErrorCode(p)));
303 			rrdp_failed(s);
304 			return;
305 		}
306 
307 		/* If a file caused an error fail the update */
308 		if (s->file_failed > 0) {
309 			rrdp_failed(s);
310 			return;
311 		}
312 
313 		switch (s->task) {
314 		case NOTIFICATION:
315 			s->task = notification_done(s->nxml, s->last_mod);
316 			s->last_mod = NULL;
317 			switch (s->task) {
318 			case NOTIFICATION:
319 				logx("%s: repository not modified (%s#%lld)",
320 				    s->local, s->repository->session_id,
321 				    s->repository->serial);
322 				rrdp_state_send(s);
323 				rrdp_free(s);
324 				rrdp_done(id, 1);
325 				break;
326 			case SNAPSHOT:
327 				logx("%s: downloading snapshot (%s#%lld)",
328 				    s->local, s->current->session_id,
329 				    s->current->serial);
330 				rrdp_clear_repo(s);
331 				s->sxml = new_snapshot_xml(p, s->current, s);
332 				s->state = RRDP_STATE_REQ;
333 				break;
334 			case DELTA:
335 				logx("%s: downloading %lld deltas (%s#%lld)",
336 				    s->local,
337 				    s->repository->serial - s->current->serial,
338 				    s->current->session_id, s->current->serial);
339 				s->dxml = new_delta_xml(p, s->current, s);
340 				s->state = RRDP_STATE_REQ;
341 				break;
342 			}
343 			break;
344 		case SNAPSHOT:
345 			rrdp_state_send(s);
346 			rrdp_free(s);
347 			rrdp_done(id, 1);
348 			break;
349 		case DELTA:
350 			if (notification_delta_done(s->nxml)) {
351 				/* finished */
352 				rrdp_state_send(s);
353 				rrdp_free(s);
354 				rrdp_done(id, 1);
355 			} else {
356 				/* reset delta parser for next delta */
357 				free_delta_xml(s->dxml);
358 				s->dxml = new_delta_xml(p, s->current, s);
359 				s->state = RRDP_STATE_REQ;
360 			}
361 			break;
362 		}
363 	} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
364 		logx("%s: notification file not modified (%s#%lld)", s->local,
365 		    s->repository->session_id, s->repository->serial);
366 		/* no need to update state file */
367 		rrdp_free(s);
368 		rrdp_done(id, 1);
369 	} else {
370 		rrdp_failed(s);
371 	}
372 }
373 
374 static void
rrdp_abort_req(struct rrdp * s)375 rrdp_abort_req(struct rrdp *s)
376 {
377 	unsigned int id = s->id;
378 
379 	s->aborted = 1;
380 	if (s->state == RRDP_STATE_REQ) {
381 		/* nothing is pending, just abort */
382 		rrdp_free(s);
383 		rrdp_done(id, 1);
384 		return;
385 	}
386 	if (s->state == RRDP_STATE_WAIT)
387 		/* wait for HTTP_INI which will progress the state */
388 		return;
389 
390 	/*
391 	 * RRDP_STATE_PARSE or later, close infd, abort parser but
392 	 * wait for HTTP_FIN and file_pending to drop to 0.
393 	 */
394 	if (s->infd != -1) {
395 		close(s->infd);
396 		s->infd = -1;
397 		s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR;
398 	}
399 	rrdp_finished(s);
400 }
401 
402 static void
rrdp_input_handler(struct ibuf * b)403 rrdp_input_handler(struct ibuf *b)
404 {
405 	struct rrdp_session *state;
406 	char *local, *notify, *last_mod;
407 	struct rrdp *s;
408 	enum rrdp_msg type;
409 	enum http_result res;
410 	unsigned int id;
411 	int ok;
412 
413 	io_read_buf(b, &type, sizeof(type));
414 	io_read_buf(b, &id, sizeof(id));
415 
416 	switch (type) {
417 	case RRDP_START:
418 		if (ibuf_fd_avail(b))
419 			errx(1, "received unexpected fd");
420 		io_read_str(b, &local);
421 		io_read_str(b, &notify);
422 		state = rrdp_session_read(b);
423 		rrdp_new(id, local, notify, state);
424 		break;
425 	case RRDP_HTTP_INI:
426 		s = rrdp_get(id);
427 		if (s == NULL)
428 			errx(1, "http ini, rrdp session %u does not exist", id);
429 		if (s->state != RRDP_STATE_WAIT)
430 			errx(1, "%s: bad internal state", s->local);
431 		s->infd = ibuf_fd_get(b);
432 		if (s->infd == -1)
433 			errx(1, "expected fd not received");
434 		s->state = RRDP_STATE_PARSE;
435 		if (s->aborted) {
436 			rrdp_abort_req(s);
437 			break;
438 		}
439 		break;
440 	case RRDP_HTTP_FIN:
441 		io_read_buf(b, &res, sizeof(res));
442 		io_read_str(b, &last_mod);
443 		if (ibuf_fd_avail(b))
444 			errx(1, "received unexpected fd");
445 
446 		s = rrdp_get(id);
447 		if (s == NULL)
448 			errx(1, "http fin, rrdp session %u does not exist", id);
449 		if (!(s->state & RRDP_STATE_PARSE))
450 			errx(1, "%s: bad internal state", s->local);
451 		s->state |= RRDP_STATE_HTTP_DONE;
452 		s->res = res;
453 		free(s->last_mod);
454 		s->last_mod = last_mod;
455 		rrdp_finished(s);
456 		break;
457 	case RRDP_FILE:
458 		s = rrdp_get(id);
459 		if (s == NULL)
460 			errx(1, "file, rrdp session %u does not exist", id);
461 		if (ibuf_fd_avail(b))
462 			errx(1, "received unexpected fd");
463 		io_read_buf(b, &ok, sizeof(ok));
464 		if (ok != 1)
465 			s->file_failed++;
466 		s->file_pending--;
467 		if (s->file_pending == 0)
468 			rrdp_finished(s);
469 		break;
470 	case RRDP_ABORT:
471 		if (ibuf_fd_avail(b))
472 			errx(1, "received unexpected fd");
473 		s = rrdp_get(id);
474 		if (s != NULL)
475 			rrdp_abort_req(s);
476 		break;
477 	default:
478 		errx(1, "unexpected message %d", type);
479 	}
480 }
481 
482 static void
rrdp_data_handler(struct rrdp * s)483 rrdp_data_handler(struct rrdp *s)
484 {
485 	char buf[READ_BUF_SIZE];
486 	XML_Parser p = s->parser;
487 	ssize_t len;
488 
489 	len = read(s->infd, buf, sizeof(buf));
490 	if (len == -1) {
491 		warn("%s: read failure", s->local);
492 		rrdp_abort_req(s);
493 		return;
494 	}
495 	if ((s->state & RRDP_STATE_PARSE) == 0)
496 		errx(1, "%s: bad parser state", s->local);
497 	if (len == 0) {
498 		/* parser stage finished */
499 		close(s->infd);
500 		s->infd = -1;
501 
502 		if (s->task != NOTIFICATION) {
503 			char h[SHA256_DIGEST_LENGTH];
504 
505 			SHA256_Final(h, &s->ctx);
506 			if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
507 				s->state |= RRDP_STATE_PARSE_ERROR;
508 				warnx("%s: bad message digest", s->local);
509 			}
510 		}
511 
512 		s->state |= RRDP_STATE_PARSE_DONE;
513 		rrdp_finished(s);
514 		return;
515 	}
516 
517 	/* parse and maybe hash the bytes just read */
518 	if (s->task != NOTIFICATION)
519 		SHA256_Update(&s->ctx, buf, len);
520 	if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
521 	    XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
522 		warnx("%s: parse error at line %llu: %s", s->local,
523 		    (unsigned long long)XML_GetCurrentLineNumber(p),
524 		    XML_ErrorString(XML_GetErrorCode(p)));
525 		s->state |= RRDP_STATE_PARSE_ERROR;
526 	}
527 }
528 
529 void
proc_rrdp(int fd)530 proc_rrdp(int fd)
531 {
532 	struct pollfd pfds[MAX_SESSIONS + 1];
533 	struct rrdp *s, *ns;
534 	struct ibuf *b;
535 	size_t i;
536 
537 	if (pledge("stdio recvfd", NULL) == -1)
538 		err(1, "pledge");
539 
540 	if ((msgq = msgbuf_new_reader(sizeof(size_t), io_parse_hdr, NULL)) ==
541 	    NULL)
542 		err(1, NULL);
543 
544 	for (;;) {
545 		i = 1;
546 		memset(&pfds, 0, sizeof(pfds));
547 		TAILQ_FOREACH(s, &states, entry) {
548 			if (i >= MAX_SESSIONS + 1) {
549 				/* not enough sessions, wait for better times */
550 				s->pfd = NULL;
551 				continue;
552 			}
553 			/* request new assets when there are free sessions */
554 			if (s->state == RRDP_STATE_REQ) {
555 				const char *uri;
556 				switch (s->task) {
557 				case NOTIFICATION:
558 					rrdp_http_req(s->id, s->notifyuri,
559 					    s->repository->last_mod);
560 					break;
561 				case SNAPSHOT:
562 				case DELTA:
563 					uri = notification_get_next(s->nxml,
564 					    s->hash, sizeof(s->hash),
565 					    s->task);
566 					SHA256_Init(&s->ctx);
567 					rrdp_http_req(s->id, uri, NULL);
568 					break;
569 				}
570 				s->state = RRDP_STATE_WAIT;
571 			}
572 			s->pfd = pfds + i++;
573 			s->pfd->fd = s->infd;
574 			s->pfd->events = POLLIN;
575 		}
576 
577 		/*
578 		 * Update main fd last.
579 		 * The previous loop may have enqueue messages.
580 		 */
581 		pfds[0].fd = fd;
582 		pfds[0].events = POLLIN;
583 		if (msgbuf_queuelen(msgq) > 0)
584 			pfds[0].events |= POLLOUT;
585 
586 		if (poll(pfds, i, INFTIM) == -1) {
587 			if (errno == EINTR)
588 				continue;
589 			err(1, "poll");
590 		}
591 
592 		if (pfds[0].revents & POLLHUP)
593 			break;
594 		if (pfds[0].revents & POLLOUT) {
595 			if (msgbuf_write(fd, msgq) == -1) {
596 				if (errno == EPIPE)
597 					errx(1, "write: connection closed");
598 				else
599 					err(1, "write");
600 			}
601 		}
602 		if (pfds[0].revents & POLLIN) {
603 			switch (msgbuf_read(fd, msgq)) {
604 			case -1:
605 				err(1, "msgbuf_read");
606 			case 0:
607 				errx(1, "msgbuf_read: connection closed");
608 			}
609 			while ((b = io_buf_get(msgq)) != NULL) {
610 				rrdp_input_handler(b);
611 				ibuf_free(b);
612 			}
613 		}
614 
615 		TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
616 			if (s->pfd == NULL)
617 				continue;
618 			if (s->pfd->revents != 0)
619 				rrdp_data_handler(s);
620 		}
621 	}
622 
623 	exit(0);
624 }
625