xref: /openbsd/usr.sbin/rpki-client/rrdp.c (revision 73471bf0)
1 /*	$OpenBSD: rrdp.c,v 1.18 2021/11/24 15:24:16 claudio Exp $ */
2 /*
3  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
4  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 #include <sys/queue.h>
19 #include <sys/stat.h>
20 
21 #include <assert.h>
22 #include <err.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <limits.h>
26 #include <poll.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <imsg.h>
30 
31 #include <expat.h>
32 #include <openssl/sha.h>
33 
34 #include "extern.h"
35 #include "rrdp.h"
36 
37 #define MAX_SESSIONS	12
38 #define	READ_BUF_SIZE	(32 * 1024)
39 
40 static struct msgbuf	msgq;
41 
42 #define RRDP_STATE_REQ		0x01
43 #define RRDP_STATE_WAIT		0x02
44 #define RRDP_STATE_PARSE	0x04
45 #define RRDP_STATE_PARSE_ERROR	0x08
46 #define RRDP_STATE_PARSE_DONE	0x10
47 #define RRDP_STATE_HTTP_DONE	0x20
48 #define RRDP_STATE_DONE		(RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
49 
50 struct rrdp {
51 	TAILQ_ENTRY(rrdp)	 entry;
52 	size_t			 id;
53 	char			*notifyuri;
54 	char			*local;
55 	char			*last_mod;
56 
57 	struct pollfd		*pfd;
58 	int			 infd;
59 	int			 state;
60 	unsigned int		 file_pending;
61 	unsigned int		 file_failed;
62 	enum http_result	 res;
63 	enum rrdp_task		 task;
64 
65 	char			 hash[SHA256_DIGEST_LENGTH];
66 	SHA256_CTX		 ctx;
67 
68 	struct rrdp_session	 repository;
69 	struct rrdp_session	 current;
70 	XML_Parser		 parser;
71 	struct notification_xml	*nxml;
72 	struct snapshot_xml	*sxml;
73 	struct delta_xml	*dxml;
74 };
75 
76 TAILQ_HEAD(,rrdp)	states = TAILQ_HEAD_INITIALIZER(states);
77 
78 char *
79 xstrdup(const char *s)
80 {
81 	char *r;
82 	if ((r = strdup(s)) == NULL)
83 		err(1, "strdup");
84 	return r;
85 }
86 
87 /*
88  * Report back that a RRDP request finished.
89  * ok should only be set to 1 if the cache is now up-to-date.
90  */
91 static void
92 rrdp_done(size_t id, int ok)
93 {
94 	enum rrdp_msg type = RRDP_END;
95 	struct ibuf *b;
96 
97 	b = io_new_buffer();
98 	io_simple_buffer(b, &type, sizeof(type));
99 	io_simple_buffer(b, &id, sizeof(id));
100 	io_simple_buffer(b, &ok, sizeof(ok));
101 	io_close_buffer(&msgq, b);
102 }
103 
104 /*
105  * Request an URI to be fetched via HTTPS.
106  * The main process will respond with a RRDP_HTTP_INI which includes
107  * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
108  * end of the request with the HTTP status code and last modified timestamp.
109  * If the request should not set the If-Modified-Since: header then last_mod
110  * should be set to NULL, else it should point to a proper date string.
111  */
112 static void
113 rrdp_http_req(size_t id, const char *uri, const char *last_mod)
114 {
115 	enum rrdp_msg type = RRDP_HTTP_REQ;
116 	struct ibuf *b;
117 
118 	b = io_new_buffer();
119 	io_simple_buffer(b, &type, sizeof(type));
120 	io_simple_buffer(b, &id, sizeof(id));
121 	io_str_buffer(b, uri);
122 	io_str_buffer(b, last_mod);
123 	io_close_buffer(&msgq, b);
124 }
125 
126 /*
127  * Send the session state to the main process so it gets stored.
128  */
129 static void
130 rrdp_state_send(struct rrdp *s)
131 {
132 	enum rrdp_msg type = RRDP_SESSION;
133 	struct ibuf *b;
134 
135 	b = io_new_buffer();
136 	io_simple_buffer(b, &type, sizeof(type));
137 	io_simple_buffer(b, &s->id, sizeof(s->id));
138 	io_str_buffer(b, s->current.session_id);
139 	io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial));
140 	io_str_buffer(b, s->current.last_mod);
141 	io_close_buffer(&msgq, b);
142 }
143 
144 /*
145  * Send a blob of data to the main process to store it in the repository.
146  */
147 void
148 rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
149     unsigned char *data, size_t datasz)
150 {
151 	enum rrdp_msg type = RRDP_FILE;
152 	struct ibuf *b;
153 
154 	/* only send files if the fetch did not fail already */
155 	if (s->file_failed == 0) {
156 		b = io_new_buffer();
157 		io_simple_buffer(b, &type, sizeof(type));
158 		io_simple_buffer(b, &s->id, sizeof(s->id));
159 		io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
160 		if (pxml->type != PUB_ADD)
161 			io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
162 		io_str_buffer(b, pxml->uri);
163 		io_buf_buffer(b, data, datasz);
164 		io_close_buffer(&msgq, b);
165 		s->file_pending++;
166 	}
167 }
168 
169 static struct rrdp *
170 rrdp_new(size_t id, char *local, char *notify, char *session_id,
171     long long serial, char *last_mod)
172 {
173 	struct rrdp *s;
174 
175 	if ((s = calloc(1, sizeof(*s))) == NULL)
176 		err(1, NULL);
177 
178 	s->infd = -1;
179 	s->id = id;
180 	s->local = local;
181 	s->notifyuri = notify;
182 	s->repository.session_id = session_id;
183 	s->repository.serial = serial;
184 	s->repository.last_mod = last_mod;
185 
186 	s->state = RRDP_STATE_REQ;
187 	if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
188 		err(1, "XML_ParserCreate");
189 
190 	s->nxml = new_notification_xml(s->parser, &s->repository, &s->current,
191 	    notify);
192 
193 	TAILQ_INSERT_TAIL(&states, s, entry);
194 
195 	return s;
196 }
197 
198 static void
199 rrdp_free(struct rrdp *s)
200 {
201 	if (s == NULL)
202 		return;
203 
204 	TAILQ_REMOVE(&states, s, entry);
205 
206 	free_notification_xml(s->nxml);
207 	free_snapshot_xml(s->sxml);
208 	free_delta_xml(s->dxml);
209 
210 	if (s->parser)
211 		XML_ParserFree(s->parser);
212 	if (s->infd != -1)
213 		close(s->infd);
214 	free(s->notifyuri);
215 	free(s->local);
216 	free(s->last_mod);
217 	free(s->repository.last_mod);
218 	free(s->repository.session_id);
219 	free(s->current.last_mod);
220 	free(s->current.session_id);
221 
222 	free(s);
223 }
224 
225 static struct rrdp *
226 rrdp_get(size_t id)
227 {
228 	struct rrdp *s;
229 
230 	TAILQ_FOREACH(s, &states, entry)
231 		if (s->id == id)
232 			break;
233 	return s;
234 }
235 
236 static void
237 rrdp_failed(struct rrdp *s)
238 {
239 	size_t id = s->id;
240 
241 	/* reset file state before retrying */
242 	s->file_failed = 0;
243 
244 	/* XXX MUST do some cleanup in the repo here */
245 	if (s->task == DELTA) {
246 		/* fallback to a snapshot as per RFC8182 */
247 		free_delta_xml(s->dxml);
248 		s->dxml = NULL;
249 		s->sxml = new_snapshot_xml(s->parser, &s->current, s);
250 		s->task = SNAPSHOT;
251 		s->state = RRDP_STATE_REQ;
252 		logx("%s: delta sync failed, fallback to snapshot", s->local);
253 	} else {
254 		/*
255 		 * TODO: update state to track recurring failures
256 		 * and fall back to rsync after a while.
257 		 * This should probably happen in the main process.
258 		 */
259 		rrdp_free(s);
260 		rrdp_done(id, 0);
261 	}
262 }
263 
264 static void
265 rrdp_finished(struct rrdp *s)
266 {
267 	size_t id = s->id;
268 
269 	/* check if all parts of the process have finished */
270 	if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
271 		return;
272 
273 	/* still some files pending */
274 	if (s->file_pending > 0)
275 		return;
276 
277 	if (s->state & RRDP_STATE_PARSE_ERROR) {
278 		rrdp_failed(s);
279 		return;
280 	}
281 
282 	if (s->res == HTTP_OK) {
283 		XML_Parser p = s->parser;
284 
285 		/*
286 		 * Finalize parsing on success to be sure that
287 		 * all of the XML is correct. Needs to be done here
288 		 * since the call would most probably fail for non
289 		 * successful data fetches.
290 		 */
291 		if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
292 			warnx("%s: XML error at line %llu: %s", s->local,
293 			    (unsigned long long)XML_GetCurrentLineNumber(p),
294 			    XML_ErrorString(XML_GetErrorCode(p)));
295 			rrdp_failed(s);
296 			return;
297 		}
298 
299 		/* If a file caused an error fail the update */
300 		if (s->file_failed > 0) {
301 			rrdp_failed(s);
302 			return;
303 		}
304 
305 		switch (s->task) {
306 		case NOTIFICATION:
307 			s->task = notification_done(s->nxml, s->last_mod);
308 			s->last_mod = NULL;
309 			switch (s->task) {
310 			case NOTIFICATION:
311 				logx("%s: repository not modified", s->local);
312 				rrdp_state_send(s);
313 				rrdp_free(s);
314 				rrdp_done(id, 1);
315 				break;
316 			case SNAPSHOT:
317 				logx("%s: downloading snapshot", s->local);
318 				s->sxml = new_snapshot_xml(p, &s->current, s);
319 				s->state = RRDP_STATE_REQ;
320 				break;
321 			case DELTA:
322 				logx("%s: downloading %lld deltas", s->local,
323 				    s->repository.serial - s->current.serial);
324 				s->dxml = new_delta_xml(p, &s->current, s);
325 				s->state = RRDP_STATE_REQ;
326 				break;
327 			}
328 			break;
329 		case SNAPSHOT:
330 			rrdp_state_send(s);
331 			rrdp_free(s);
332 			rrdp_done(id, 1);
333 			break;
334 		case DELTA:
335 			if (notification_delta_done(s->nxml)) {
336 				/* finished */
337 				rrdp_state_send(s);
338 				rrdp_free(s);
339 				rrdp_done(id, 1);
340 			} else {
341 				/* reset delta parser for next delta */
342 				free_delta_xml(s->dxml);
343 				s->dxml = new_delta_xml(p, &s->current, s);
344 				s->state = RRDP_STATE_REQ;
345 			}
346 			break;
347 		}
348 	} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
349 		logx("%s: notification file not modified", s->local);
350 		/* no need to update state file */
351 		rrdp_free(s);
352 		rrdp_done(id, 1);
353 	} else {
354 		rrdp_failed(s);
355 	}
356 }
357 
358 static void
359 rrdp_input_handler(int fd)
360 {
361 	static struct ibuf *inbuf;
362 	char *local, *notify, *session_id, *last_mod;
363 	struct ibuf *b;
364 	struct rrdp *s;
365 	enum rrdp_msg type;
366 	enum http_result res;
367 	long long serial;
368 	size_t id;
369 	int ok;
370 
371 	b = io_buf_recvfd(fd, &inbuf);
372 	if (b == NULL)
373 		return;
374 
375 	io_read_buf(b, &type, sizeof(type));
376 	io_read_buf(b, &id, sizeof(id));
377 
378 	switch (type) {
379 	case RRDP_START:
380 		io_read_str(b, &local);
381 		io_read_str(b, &notify);
382 		io_read_str(b, &session_id);
383 		io_read_buf(b, &serial, sizeof(serial));
384 		io_read_str(b, &last_mod);
385 		if (b->fd != -1)
386 			errx(1, "received unexpected fd");
387 
388 		s = rrdp_new(id, local, notify, session_id, serial, last_mod);
389 		break;
390 	case RRDP_HTTP_INI:
391 		if (b->fd == -1)
392 			errx(1, "expected fd not received");
393 		s = rrdp_get(id);
394 		if (s == NULL)
395 			errx(1, "rrdp session %zu does not exist", id);
396 		if (s->state != RRDP_STATE_WAIT)
397 			errx(1, "%s: bad internal state", s->local);
398 
399 		s->infd = b->fd;
400 		s->state = RRDP_STATE_PARSE;
401 		break;
402 	case RRDP_HTTP_FIN:
403 		io_read_buf(b, &res, sizeof(res));
404 		io_read_str(b, &last_mod);
405 		if (b->fd != -1)
406 			errx(1, "received unexpected fd");
407 
408 		s = rrdp_get(id);
409 		if (s == NULL)
410 			errx(1, "rrdp session %zu does not exist", id);
411 		if (!(s->state & RRDP_STATE_PARSE))
412 			errx(1, "%s: bad internal state", s->local);
413 
414 		s->res = res;
415 		s->last_mod = last_mod;
416 		s->state |= RRDP_STATE_HTTP_DONE;
417 		rrdp_finished(s);
418 		break;
419 	case RRDP_FILE:
420 		s = rrdp_get(id);
421 		if (s == NULL)
422 			errx(1, "rrdp session %zu does not exist", id);
423 		if (b->fd != -1)
424 			errx(1, "received unexpected fd");
425 		io_read_buf(b, &ok, sizeof(ok));
426 		if (ok != 1)
427 			s->file_failed++;
428 		s->file_pending--;
429 		if (s->file_pending == 0)
430 			rrdp_finished(s);
431 		break;
432 	default:
433 		errx(1, "unexpected message %d", type);
434 	}
435 	ibuf_free(b);
436 }
437 
438 static void
439 rrdp_data_handler(struct rrdp *s)
440 {
441 	char buf[READ_BUF_SIZE];
442 	XML_Parser p = s->parser;
443 	ssize_t len;
444 
445 	len = read(s->infd, buf, sizeof(buf));
446 	if (len == -1) {
447 		s->state |= RRDP_STATE_PARSE_ERROR;
448 		warn("%s: read failure", s->local);
449 		return;
450 	}
451 	if ((s->state & RRDP_STATE_PARSE) == 0)
452 		errx(1, "%s: bad parser state", s->local);
453 	if (len == 0) {
454 		/* parser stage finished */
455 		close(s->infd);
456 		s->infd = -1;
457 
458 		if (s->task != NOTIFICATION) {
459 			char h[SHA256_DIGEST_LENGTH];
460 
461 			SHA256_Final(h, &s->ctx);
462 			if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
463 				s->state |= RRDP_STATE_PARSE_ERROR;
464 				warnx("%s: bad message digest", s->local);
465 			}
466 		}
467 
468 		s->state |= RRDP_STATE_PARSE_DONE;
469 		rrdp_finished(s);
470 		return;
471 	}
472 
473 	/* parse and maybe hash the bytes just read */
474 	if (s->task != NOTIFICATION)
475 		SHA256_Update(&s->ctx, buf, len);
476 	if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
477 	    XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
478 		warnx("%s: parse error at line %llu: %s", s->local,
479 		    (unsigned long long)XML_GetCurrentLineNumber(p),
480 		    XML_ErrorString(XML_GetErrorCode(p)));
481 		s->state |= RRDP_STATE_PARSE_ERROR;
482 	}
483 }
484 
485 void
486 proc_rrdp(int fd)
487 {
488 	struct pollfd pfds[MAX_SESSIONS + 1];
489 	struct rrdp *s, *ns;
490 	size_t i;
491 
492 	if (pledge("stdio recvfd", NULL) == -1)
493 		err(1, "pledge");
494 
495 	msgbuf_init(&msgq);
496 	msgq.fd = fd;
497 
498 	for (;;) {
499 		i = 1;
500 		memset(&pfds, 0, sizeof(pfds));
501 		TAILQ_FOREACH(s, &states, entry) {
502 			if (i >= MAX_SESSIONS + 1) {
503 				/* not enough sessions, wait for better times */
504 				s->pfd = NULL;
505 				continue;
506 			}
507 			/* request new assets when there are free sessions */
508 			if (s->state == RRDP_STATE_REQ) {
509 				const char *uri;
510 				switch (s->task) {
511 				case NOTIFICATION:
512 					rrdp_http_req(s->id, s->notifyuri,
513 					    s->repository.last_mod);
514 					break;
515 				case SNAPSHOT:
516 				case DELTA:
517 					uri = notification_get_next(s->nxml,
518 					    s->hash, sizeof(s->hash),
519 					    s->task);
520 					SHA256_Init(&s->ctx);
521 					rrdp_http_req(s->id, uri, NULL);
522 					break;
523 				}
524 				s->state = RRDP_STATE_WAIT;
525 			}
526 			s->pfd = pfds + i++;
527 			s->pfd->fd = s->infd;
528 			s->pfd->events = POLLIN;
529 		}
530 
531 		/*
532 		 * Update main fd last.
533 		 * The previous loop may have enqueue messages.
534 		 */
535 		pfds[0].fd = fd;
536 		pfds[0].events = POLLIN;
537 		if (msgq.queued)
538 			pfds[0].events |= POLLOUT;
539 
540 		if (poll(pfds, i, INFTIM) == -1)
541 			err(1, "poll");
542 
543 		if (pfds[0].revents & POLLHUP)
544 			break;
545 		if (pfds[0].revents & POLLOUT) {
546 			switch (msgbuf_write(&msgq)) {
547 			case 0:
548 				errx(1, "write: connection closed");
549 			case -1:
550 				err(1, "write");
551 			}
552 		}
553 		if (pfds[0].revents & POLLIN)
554 			rrdp_input_handler(fd);
555 
556 		TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
557 			if (s->pfd == NULL)
558 				continue;
559 			if (s->pfd->revents != 0)
560 				rrdp_data_handler(s);
561 		}
562 	}
563 
564 	exit(0);
565 }
566