1 /* $OpenBSD: rrdp.c,v 1.39 2024/11/21 13:32:27 claudio Exp $ */
2 /*
3 * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
4 * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 #include <sys/queue.h>
19 #include <sys/stat.h>
20
21 #include <err.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <poll.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <imsg.h>
29
30 #include <expat.h>
31 #include <openssl/sha.h>
32
33 #include "extern.h"
34 #include "rrdp.h"
35
36 #define MAX_SESSIONS 32
37 #define READ_BUF_SIZE (32 * 1024)
38
39 static struct msgbuf *msgq;
40
41 #define RRDP_STATE_REQ 0x01
42 #define RRDP_STATE_WAIT 0x02
43 #define RRDP_STATE_PARSE 0x04
44 #define RRDP_STATE_PARSE_ERROR 0x08
45 #define RRDP_STATE_PARSE_DONE 0x10
46 #define RRDP_STATE_HTTP_DONE 0x20
47 #define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
48
49 struct rrdp {
50 TAILQ_ENTRY(rrdp) entry;
51 unsigned int id;
52 char *notifyuri;
53 char *local;
54 char *last_mod;
55
56 struct pollfd *pfd;
57 int infd;
58 int state;
59 int aborted;
60 unsigned int file_pending;
61 unsigned int file_failed;
62 enum http_result res;
63 enum rrdp_task task;
64
65 char hash[SHA256_DIGEST_LENGTH];
66 SHA256_CTX ctx;
67
68 struct rrdp_session *repository;
69 struct rrdp_session *current;
70 XML_Parser parser;
71 struct notification_xml *nxml;
72 struct snapshot_xml *sxml;
73 struct delta_xml *dxml;
74 };
75
76 static TAILQ_HEAD(, rrdp) states = TAILQ_HEAD_INITIALIZER(states);
77
78 char *
xstrdup(const char * s)79 xstrdup(const char *s)
80 {
81 char *r;
82 if ((r = strdup(s)) == NULL)
83 err(1, "strdup");
84 return r;
85 }
86
87 /*
88 * Report back that a RRDP request finished.
89 * ok should only be set to 1 if the cache is now up-to-date.
90 */
91 static void
rrdp_done(unsigned int id,int ok)92 rrdp_done(unsigned int id, int ok)
93 {
94 enum rrdp_msg type = RRDP_END;
95 struct ibuf *b;
96
97 b = io_new_buffer();
98 io_simple_buffer(b, &type, sizeof(type));
99 io_simple_buffer(b, &id, sizeof(id));
100 io_simple_buffer(b, &ok, sizeof(ok));
101 io_close_buffer(msgq, b);
102 }
103
104 /*
105 * Request an URI to be fetched via HTTPS.
106 * The main process will respond with a RRDP_HTTP_INI which includes
107 * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
108 * end of the request with the HTTP status code and last modified timestamp.
109 * If the request should not set the If-Modified-Since: header then last_mod
110 * should be set to NULL, else it should point to a proper date string.
111 */
112 static void
rrdp_http_req(unsigned int id,const char * uri,const char * last_mod)113 rrdp_http_req(unsigned int id, const char *uri, const char *last_mod)
114 {
115 enum rrdp_msg type = RRDP_HTTP_REQ;
116 struct ibuf *b;
117
118 b = io_new_buffer();
119 io_simple_buffer(b, &type, sizeof(type));
120 io_simple_buffer(b, &id, sizeof(id));
121 io_str_buffer(b, uri);
122 io_str_buffer(b, last_mod);
123 io_close_buffer(msgq, b);
124 }
125
126 /*
127 * Send the session state to the main process so it gets stored.
128 */
129 static void
rrdp_state_send(struct rrdp * s)130 rrdp_state_send(struct rrdp *s)
131 {
132 enum rrdp_msg type = RRDP_SESSION;
133 struct ibuf *b;
134
135 b = io_new_buffer();
136 io_simple_buffer(b, &type, sizeof(type));
137 io_simple_buffer(b, &s->id, sizeof(s->id));
138 rrdp_session_buffer(b, s->current);
139 io_close_buffer(msgq, b);
140 }
141
142 /*
143 * Inform parent to clear the RRDP repository before start of snapshot.
144 */
145 static void
rrdp_clear_repo(struct rrdp * s)146 rrdp_clear_repo(struct rrdp *s)
147 {
148 enum rrdp_msg type = RRDP_CLEAR;
149 struct ibuf *b;
150
151 b = io_new_buffer();
152 io_simple_buffer(b, &type, sizeof(type));
153 io_simple_buffer(b, &s->id, sizeof(s->id));
154 io_close_buffer(msgq, b);
155 }
156
157 /*
158 * Send a blob of data to the main process to store it in the repository.
159 */
160 void
rrdp_publish_file(struct rrdp * s,struct publish_xml * pxml,unsigned char * data,size_t datasz)161 rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
162 unsigned char *data, size_t datasz)
163 {
164 enum rrdp_msg type = RRDP_FILE;
165 struct ibuf *b;
166
167 /* only send files if the fetch did not fail already */
168 if (s->file_failed == 0) {
169 b = io_new_buffer();
170 io_simple_buffer(b, &type, sizeof(type));
171 io_simple_buffer(b, &s->id, sizeof(s->id));
172 io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
173 if (pxml->type != PUB_ADD)
174 io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
175 io_str_buffer(b, pxml->uri);
176 io_buf_buffer(b, data, datasz);
177 io_close_buffer(msgq, b);
178 s->file_pending++;
179 }
180 }
181
182 static void
rrdp_new(unsigned int id,char * local,char * notify,struct rrdp_session * state)183 rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
184 {
185 struct rrdp *s;
186
187 if ((s = calloc(1, sizeof(*s))) == NULL)
188 err(1, NULL);
189
190 s->infd = -1;
191 s->id = id;
192 s->local = local;
193 s->notifyuri = notify;
194 s->repository = state;
195 if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
196 err(1, NULL);
197
198 s->state = RRDP_STATE_REQ;
199 if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
200 err(1, "XML_ParserCreate");
201
202 s->nxml = new_notification_xml(s->parser, s->repository, s->current,
203 notify);
204
205 TAILQ_INSERT_TAIL(&states, s, entry);
206 }
207
208 static void
rrdp_free(struct rrdp * s)209 rrdp_free(struct rrdp *s)
210 {
211 if (s == NULL)
212 return;
213
214 TAILQ_REMOVE(&states, s, entry);
215
216 free_notification_xml(s->nxml);
217 free_snapshot_xml(s->sxml);
218 free_delta_xml(s->dxml);
219
220 if (s->parser)
221 XML_ParserFree(s->parser);
222 if (s->infd != -1)
223 close(s->infd);
224 free(s->notifyuri);
225 free(s->local);
226 free(s->last_mod);
227 rrdp_session_free(s->repository);
228 rrdp_session_free(s->current);
229
230 free(s);
231 }
232
233 static struct rrdp *
rrdp_get(unsigned int id)234 rrdp_get(unsigned int id)
235 {
236 struct rrdp *s;
237
238 TAILQ_FOREACH(s, &states, entry)
239 if (s->id == id)
240 break;
241 return s;
242 }
243
244 static void
rrdp_failed(struct rrdp * s)245 rrdp_failed(struct rrdp *s)
246 {
247 unsigned int id = s->id;
248
249 /* reset file state before retrying */
250 s->file_failed = 0;
251
252 if (s->task == DELTA && !s->aborted) {
253 /* fallback to a snapshot as per RFC8182 */
254 free_delta_xml(s->dxml);
255 s->dxml = NULL;
256 rrdp_clear_repo(s);
257 s->sxml = new_snapshot_xml(s->parser, s->current, s);
258 s->task = SNAPSHOT;
259 s->state = RRDP_STATE_REQ;
260 logx("%s: delta sync failed, fallback to snapshot", s->local);
261 } else {
262 /*
263 * TODO: update state to track recurring failures
264 * and fall back to rsync after a while.
265 * This should probably happen in the main process.
266 */
267 rrdp_free(s);
268 rrdp_done(id, 0);
269 }
270 }
271
272 static void
rrdp_finished(struct rrdp * s)273 rrdp_finished(struct rrdp *s)
274 {
275 unsigned int id = s->id;
276
277 /* check if all parts of the process have finished */
278 if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
279 return;
280
281 /* still some files pending */
282 if (s->file_pending > 0)
283 return;
284
285 if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) {
286 rrdp_failed(s);
287 return;
288 }
289
290 if (s->res == HTTP_OK) {
291 XML_Parser p = s->parser;
292
293 /*
294 * Finalize parsing on success to be sure that
295 * all of the XML is correct. Needs to be done here
296 * since the call would most probably fail for non
297 * successful data fetches.
298 */
299 if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
300 warnx("%s: XML error at line %llu: %s", s->local,
301 (unsigned long long)XML_GetCurrentLineNumber(p),
302 XML_ErrorString(XML_GetErrorCode(p)));
303 rrdp_failed(s);
304 return;
305 }
306
307 /* If a file caused an error fail the update */
308 if (s->file_failed > 0) {
309 rrdp_failed(s);
310 return;
311 }
312
313 switch (s->task) {
314 case NOTIFICATION:
315 s->task = notification_done(s->nxml, s->last_mod);
316 s->last_mod = NULL;
317 switch (s->task) {
318 case NOTIFICATION:
319 logx("%s: repository not modified (%s#%lld)",
320 s->local, s->repository->session_id,
321 s->repository->serial);
322 rrdp_state_send(s);
323 rrdp_free(s);
324 rrdp_done(id, 1);
325 break;
326 case SNAPSHOT:
327 logx("%s: downloading snapshot (%s#%lld)",
328 s->local, s->current->session_id,
329 s->current->serial);
330 rrdp_clear_repo(s);
331 s->sxml = new_snapshot_xml(p, s->current, s);
332 s->state = RRDP_STATE_REQ;
333 break;
334 case DELTA:
335 logx("%s: downloading %lld deltas (%s#%lld)",
336 s->local,
337 s->repository->serial - s->current->serial,
338 s->current->session_id, s->current->serial);
339 s->dxml = new_delta_xml(p, s->current, s);
340 s->state = RRDP_STATE_REQ;
341 break;
342 }
343 break;
344 case SNAPSHOT:
345 rrdp_state_send(s);
346 rrdp_free(s);
347 rrdp_done(id, 1);
348 break;
349 case DELTA:
350 if (notification_delta_done(s->nxml)) {
351 /* finished */
352 rrdp_state_send(s);
353 rrdp_free(s);
354 rrdp_done(id, 1);
355 } else {
356 /* reset delta parser for next delta */
357 free_delta_xml(s->dxml);
358 s->dxml = new_delta_xml(p, s->current, s);
359 s->state = RRDP_STATE_REQ;
360 }
361 break;
362 }
363 } else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
364 logx("%s: notification file not modified (%s#%lld)", s->local,
365 s->repository->session_id, s->repository->serial);
366 /* no need to update state file */
367 rrdp_free(s);
368 rrdp_done(id, 1);
369 } else {
370 rrdp_failed(s);
371 }
372 }
373
374 static void
rrdp_abort_req(struct rrdp * s)375 rrdp_abort_req(struct rrdp *s)
376 {
377 unsigned int id = s->id;
378
379 s->aborted = 1;
380 if (s->state == RRDP_STATE_REQ) {
381 /* nothing is pending, just abort */
382 rrdp_free(s);
383 rrdp_done(id, 1);
384 return;
385 }
386 if (s->state == RRDP_STATE_WAIT)
387 /* wait for HTTP_INI which will progress the state */
388 return;
389
390 /*
391 * RRDP_STATE_PARSE or later, close infd, abort parser but
392 * wait for HTTP_FIN and file_pending to drop to 0.
393 */
394 if (s->infd != -1) {
395 close(s->infd);
396 s->infd = -1;
397 s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR;
398 }
399 rrdp_finished(s);
400 }
401
402 static void
rrdp_input_handler(struct ibuf * b)403 rrdp_input_handler(struct ibuf *b)
404 {
405 struct rrdp_session *state;
406 char *local, *notify, *last_mod;
407 struct rrdp *s;
408 enum rrdp_msg type;
409 enum http_result res;
410 unsigned int id;
411 int ok;
412
413 io_read_buf(b, &type, sizeof(type));
414 io_read_buf(b, &id, sizeof(id));
415
416 switch (type) {
417 case RRDP_START:
418 if (ibuf_fd_avail(b))
419 errx(1, "received unexpected fd");
420 io_read_str(b, &local);
421 io_read_str(b, ¬ify);
422 state = rrdp_session_read(b);
423 rrdp_new(id, local, notify, state);
424 break;
425 case RRDP_HTTP_INI:
426 s = rrdp_get(id);
427 if (s == NULL)
428 errx(1, "http ini, rrdp session %u does not exist", id);
429 if (s->state != RRDP_STATE_WAIT)
430 errx(1, "%s: bad internal state", s->local);
431 s->infd = ibuf_fd_get(b);
432 if (s->infd == -1)
433 errx(1, "expected fd not received");
434 s->state = RRDP_STATE_PARSE;
435 if (s->aborted) {
436 rrdp_abort_req(s);
437 break;
438 }
439 break;
440 case RRDP_HTTP_FIN:
441 io_read_buf(b, &res, sizeof(res));
442 io_read_str(b, &last_mod);
443 if (ibuf_fd_avail(b))
444 errx(1, "received unexpected fd");
445
446 s = rrdp_get(id);
447 if (s == NULL)
448 errx(1, "http fin, rrdp session %u does not exist", id);
449 if (!(s->state & RRDP_STATE_PARSE))
450 errx(1, "%s: bad internal state", s->local);
451 s->state |= RRDP_STATE_HTTP_DONE;
452 s->res = res;
453 free(s->last_mod);
454 s->last_mod = last_mod;
455 rrdp_finished(s);
456 break;
457 case RRDP_FILE:
458 s = rrdp_get(id);
459 if (s == NULL)
460 errx(1, "file, rrdp session %u does not exist", id);
461 if (ibuf_fd_avail(b))
462 errx(1, "received unexpected fd");
463 io_read_buf(b, &ok, sizeof(ok));
464 if (ok != 1)
465 s->file_failed++;
466 s->file_pending--;
467 if (s->file_pending == 0)
468 rrdp_finished(s);
469 break;
470 case RRDP_ABORT:
471 if (ibuf_fd_avail(b))
472 errx(1, "received unexpected fd");
473 s = rrdp_get(id);
474 if (s != NULL)
475 rrdp_abort_req(s);
476 break;
477 default:
478 errx(1, "unexpected message %d", type);
479 }
480 }
481
482 static void
rrdp_data_handler(struct rrdp * s)483 rrdp_data_handler(struct rrdp *s)
484 {
485 char buf[READ_BUF_SIZE];
486 XML_Parser p = s->parser;
487 ssize_t len;
488
489 len = read(s->infd, buf, sizeof(buf));
490 if (len == -1) {
491 warn("%s: read failure", s->local);
492 rrdp_abort_req(s);
493 return;
494 }
495 if ((s->state & RRDP_STATE_PARSE) == 0)
496 errx(1, "%s: bad parser state", s->local);
497 if (len == 0) {
498 /* parser stage finished */
499 close(s->infd);
500 s->infd = -1;
501
502 if (s->task != NOTIFICATION) {
503 char h[SHA256_DIGEST_LENGTH];
504
505 SHA256_Final(h, &s->ctx);
506 if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
507 s->state |= RRDP_STATE_PARSE_ERROR;
508 warnx("%s: bad message digest", s->local);
509 }
510 }
511
512 s->state |= RRDP_STATE_PARSE_DONE;
513 rrdp_finished(s);
514 return;
515 }
516
517 /* parse and maybe hash the bytes just read */
518 if (s->task != NOTIFICATION)
519 SHA256_Update(&s->ctx, buf, len);
520 if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
521 XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
522 warnx("%s: parse error at line %llu: %s", s->local,
523 (unsigned long long)XML_GetCurrentLineNumber(p),
524 XML_ErrorString(XML_GetErrorCode(p)));
525 s->state |= RRDP_STATE_PARSE_ERROR;
526 }
527 }
528
529 void
proc_rrdp(int fd)530 proc_rrdp(int fd)
531 {
532 struct pollfd pfds[MAX_SESSIONS + 1];
533 struct rrdp *s, *ns;
534 struct ibuf *b;
535 size_t i;
536
537 if (pledge("stdio recvfd", NULL) == -1)
538 err(1, "pledge");
539
540 if ((msgq = msgbuf_new_reader(sizeof(size_t), io_parse_hdr, NULL)) ==
541 NULL)
542 err(1, NULL);
543
544 for (;;) {
545 i = 1;
546 memset(&pfds, 0, sizeof(pfds));
547 TAILQ_FOREACH(s, &states, entry) {
548 if (i >= MAX_SESSIONS + 1) {
549 /* not enough sessions, wait for better times */
550 s->pfd = NULL;
551 continue;
552 }
553 /* request new assets when there are free sessions */
554 if (s->state == RRDP_STATE_REQ) {
555 const char *uri;
556 switch (s->task) {
557 case NOTIFICATION:
558 rrdp_http_req(s->id, s->notifyuri,
559 s->repository->last_mod);
560 break;
561 case SNAPSHOT:
562 case DELTA:
563 uri = notification_get_next(s->nxml,
564 s->hash, sizeof(s->hash),
565 s->task);
566 SHA256_Init(&s->ctx);
567 rrdp_http_req(s->id, uri, NULL);
568 break;
569 }
570 s->state = RRDP_STATE_WAIT;
571 }
572 s->pfd = pfds + i++;
573 s->pfd->fd = s->infd;
574 s->pfd->events = POLLIN;
575 }
576
577 /*
578 * Update main fd last.
579 * The previous loop may have enqueue messages.
580 */
581 pfds[0].fd = fd;
582 pfds[0].events = POLLIN;
583 if (msgbuf_queuelen(msgq) > 0)
584 pfds[0].events |= POLLOUT;
585
586 if (poll(pfds, i, INFTIM) == -1) {
587 if (errno == EINTR)
588 continue;
589 err(1, "poll");
590 }
591
592 if (pfds[0].revents & POLLHUP)
593 break;
594 if (pfds[0].revents & POLLOUT) {
595 if (msgbuf_write(fd, msgq) == -1) {
596 if (errno == EPIPE)
597 errx(1, "write: connection closed");
598 else
599 err(1, "write");
600 }
601 }
602 if (pfds[0].revents & POLLIN) {
603 switch (msgbuf_read(fd, msgq)) {
604 case -1:
605 err(1, "msgbuf_read");
606 case 0:
607 errx(1, "msgbuf_read: connection closed");
608 }
609 while ((b = io_buf_get(msgq)) != NULL) {
610 rrdp_input_handler(b);
611 ibuf_free(b);
612 }
613 }
614
615 TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
616 if (s->pfd == NULL)
617 continue;
618 if (s->pfd->revents != 0)
619 rrdp_data_handler(s);
620 }
621 }
622
623 exit(0);
624 }
625