1 /* $OpenBSD: rrdp.c,v 1.32 2023/06/23 11:36:24 claudio Exp $ */ 2 /* 3 * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com> 4 * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 #include <sys/queue.h> 19 #include <sys/stat.h> 20 21 #include <err.h> 22 #include <errno.h> 23 #include <fcntl.h> 24 #include <limits.h> 25 #include <poll.h> 26 #include <string.h> 27 #include <unistd.h> 28 #include <imsg.h> 29 30 #include <expat.h> 31 #include <openssl/sha.h> 32 33 #include "extern.h" 34 #include "rrdp.h" 35 36 #define MAX_SESSIONS 12 37 #define READ_BUF_SIZE (32 * 1024) 38 39 static struct msgbuf msgq; 40 41 #define RRDP_STATE_REQ 0x01 42 #define RRDP_STATE_WAIT 0x02 43 #define RRDP_STATE_PARSE 0x04 44 #define RRDP_STATE_PARSE_ERROR 0x08 45 #define RRDP_STATE_PARSE_DONE 0x10 46 #define RRDP_STATE_HTTP_DONE 0x20 47 #define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE) 48 49 struct rrdp { 50 TAILQ_ENTRY(rrdp) entry; 51 unsigned int id; 52 char *notifyuri; 53 char *local; 54 char *last_mod; 55 56 struct pollfd *pfd; 57 int infd; 58 int state; 59 int aborted; 60 unsigned int file_pending; 61 unsigned int file_failed; 62 enum http_result res; 63 enum rrdp_task task; 64 65 char hash[SHA256_DIGEST_LENGTH]; 66 SHA256_CTX ctx; 67 68 struct rrdp_session *repository; 69 struct rrdp_session *current; 70 XML_Parser parser; 71 struct notification_xml *nxml; 72 struct snapshot_xml *sxml; 73 struct delta_xml *dxml; 74 }; 75 76 static TAILQ_HEAD(, rrdp) states = TAILQ_HEAD_INITIALIZER(states); 77 78 char * 79 xstrdup(const char *s) 80 { 81 char *r; 82 if ((r = strdup(s)) == NULL) 83 err(1, "strdup"); 84 return r; 85 } 86 87 /* 88 * Report back that a RRDP request finished. 89 * ok should only be set to 1 if the cache is now up-to-date. 90 */ 91 static void 92 rrdp_done(unsigned int id, int ok) 93 { 94 enum rrdp_msg type = RRDP_END; 95 struct ibuf *b; 96 97 b = io_new_buffer(); 98 io_simple_buffer(b, &type, sizeof(type)); 99 io_simple_buffer(b, &id, sizeof(id)); 100 io_simple_buffer(b, &ok, sizeof(ok)); 101 io_close_buffer(&msgq, b); 102 } 103 104 /* 105 * Request an URI to be fetched via HTTPS. 106 * The main process will respond with a RRDP_HTTP_INI which includes 107 * the file descriptor to read from. RRDP_HTTP_FIN is sent at the 108 * end of the request with the HTTP status code and last modified timestamp. 109 * If the request should not set the If-Modified-Since: header then last_mod 110 * should be set to NULL, else it should point to a proper date string. 111 */ 112 static void 113 rrdp_http_req(unsigned int id, const char *uri, const char *last_mod) 114 { 115 enum rrdp_msg type = RRDP_HTTP_REQ; 116 struct ibuf *b; 117 118 b = io_new_buffer(); 119 io_simple_buffer(b, &type, sizeof(type)); 120 io_simple_buffer(b, &id, sizeof(id)); 121 io_str_buffer(b, uri); 122 io_str_buffer(b, last_mod); 123 io_close_buffer(&msgq, b); 124 } 125 126 /* 127 * Send the session state to the main process so it gets stored. 128 */ 129 static void 130 rrdp_state_send(struct rrdp *s) 131 { 132 enum rrdp_msg type = RRDP_SESSION; 133 struct ibuf *b; 134 135 b = io_new_buffer(); 136 io_simple_buffer(b, &type, sizeof(type)); 137 io_simple_buffer(b, &s->id, sizeof(s->id)); 138 rrdp_session_buffer(b, s->current); 139 io_close_buffer(&msgq, b); 140 } 141 142 /* 143 * Inform parent to clear the RRDP repository before start of snapshot. 144 */ 145 static void 146 rrdp_clear_repo(struct rrdp *s) 147 { 148 enum rrdp_msg type = RRDP_CLEAR; 149 struct ibuf *b; 150 151 b = io_new_buffer(); 152 io_simple_buffer(b, &type, sizeof(type)); 153 io_simple_buffer(b, &s->id, sizeof(s->id)); 154 io_close_buffer(&msgq, b); 155 } 156 157 /* 158 * Send a blob of data to the main process to store it in the repository. 159 */ 160 void 161 rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml, 162 unsigned char *data, size_t datasz) 163 { 164 enum rrdp_msg type = RRDP_FILE; 165 struct ibuf *b; 166 167 /* only send files if the fetch did not fail already */ 168 if (s->file_failed == 0) { 169 b = io_new_buffer(); 170 io_simple_buffer(b, &type, sizeof(type)); 171 io_simple_buffer(b, &s->id, sizeof(s->id)); 172 io_simple_buffer(b, &pxml->type, sizeof(pxml->type)); 173 if (pxml->type != PUB_ADD) 174 io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash)); 175 io_str_buffer(b, pxml->uri); 176 io_buf_buffer(b, data, datasz); 177 io_close_buffer(&msgq, b); 178 s->file_pending++; 179 } 180 } 181 182 static void 183 rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state) 184 { 185 struct rrdp *s; 186 187 if ((s = calloc(1, sizeof(*s))) == NULL) 188 err(1, NULL); 189 190 s->infd = -1; 191 s->id = id; 192 s->local = local; 193 s->notifyuri = notify; 194 s->repository = state; 195 if ((s->current = calloc(1, sizeof(*s->current))) == NULL) 196 err(1, NULL); 197 198 s->state = RRDP_STATE_REQ; 199 if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL) 200 err(1, "XML_ParserCreate"); 201 202 s->nxml = new_notification_xml(s->parser, s->repository, s->current, 203 notify); 204 205 TAILQ_INSERT_TAIL(&states, s, entry); 206 } 207 208 static void 209 rrdp_free(struct rrdp *s) 210 { 211 if (s == NULL) 212 return; 213 214 TAILQ_REMOVE(&states, s, entry); 215 216 free_notification_xml(s->nxml); 217 free_snapshot_xml(s->sxml); 218 free_delta_xml(s->dxml); 219 220 if (s->parser) 221 XML_ParserFree(s->parser); 222 if (s->infd != -1) 223 close(s->infd); 224 free(s->notifyuri); 225 free(s->local); 226 free(s->last_mod); 227 rrdp_session_free(s->repository); 228 rrdp_session_free(s->current); 229 230 free(s); 231 } 232 233 static struct rrdp * 234 rrdp_get(unsigned int id) 235 { 236 struct rrdp *s; 237 238 TAILQ_FOREACH(s, &states, entry) 239 if (s->id == id) 240 break; 241 return s; 242 } 243 244 static void 245 rrdp_failed(struct rrdp *s) 246 { 247 unsigned int id = s->id; 248 249 /* reset file state before retrying */ 250 s->file_failed = 0; 251 252 if (s->task == DELTA && !s->aborted) { 253 /* fallback to a snapshot as per RFC8182 */ 254 free_delta_xml(s->dxml); 255 s->dxml = NULL; 256 rrdp_clear_repo(s); 257 s->sxml = new_snapshot_xml(s->parser, s->current, s); 258 s->task = SNAPSHOT; 259 s->state = RRDP_STATE_REQ; 260 logx("%s: delta sync failed, fallback to snapshot", s->local); 261 } else { 262 /* 263 * TODO: update state to track recurring failures 264 * and fall back to rsync after a while. 265 * This should probably happen in the main process. 266 */ 267 rrdp_free(s); 268 rrdp_done(id, 0); 269 } 270 } 271 272 static void 273 rrdp_finished(struct rrdp *s) 274 { 275 unsigned int id = s->id; 276 277 /* check if all parts of the process have finished */ 278 if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE) 279 return; 280 281 /* still some files pending */ 282 if (s->file_pending > 0) 283 return; 284 285 if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) { 286 rrdp_failed(s); 287 return; 288 } 289 290 if (s->res == HTTP_OK) { 291 XML_Parser p = s->parser; 292 293 /* 294 * Finalize parsing on success to be sure that 295 * all of the XML is correct. Needs to be done here 296 * since the call would most probably fail for non 297 * successful data fetches. 298 */ 299 if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) { 300 warnx("%s: XML error at line %llu: %s", s->local, 301 (unsigned long long)XML_GetCurrentLineNumber(p), 302 XML_ErrorString(XML_GetErrorCode(p))); 303 rrdp_failed(s); 304 return; 305 } 306 307 /* If a file caused an error fail the update */ 308 if (s->file_failed > 0) { 309 rrdp_failed(s); 310 return; 311 } 312 313 switch (s->task) { 314 case NOTIFICATION: 315 s->task = notification_done(s->nxml, s->last_mod); 316 s->last_mod = NULL; 317 switch (s->task) { 318 case NOTIFICATION: 319 logx("%s: repository not modified (%s#%lld)", 320 s->local, s->repository->session_id, 321 s->repository->serial); 322 rrdp_state_send(s); 323 rrdp_free(s); 324 rrdp_done(id, 1); 325 break; 326 case SNAPSHOT: 327 logx("%s: downloading snapshot (%s#%lld)", 328 s->local, s->current->session_id, 329 s->current->serial); 330 rrdp_clear_repo(s); 331 s->sxml = new_snapshot_xml(p, s->current, s); 332 s->state = RRDP_STATE_REQ; 333 break; 334 case DELTA: 335 logx("%s: downloading %lld deltas (%s#%lld)", 336 s->local, 337 s->repository->serial - s->current->serial, 338 s->current->session_id, s->current->serial); 339 s->dxml = new_delta_xml(p, s->current, s); 340 s->state = RRDP_STATE_REQ; 341 break; 342 } 343 break; 344 case SNAPSHOT: 345 rrdp_state_send(s); 346 rrdp_free(s); 347 rrdp_done(id, 1); 348 break; 349 case DELTA: 350 if (notification_delta_done(s->nxml)) { 351 /* finished */ 352 rrdp_state_send(s); 353 rrdp_free(s); 354 rrdp_done(id, 1); 355 } else { 356 /* reset delta parser for next delta */ 357 free_delta_xml(s->dxml); 358 s->dxml = new_delta_xml(p, s->current, s); 359 s->state = RRDP_STATE_REQ; 360 } 361 break; 362 } 363 } else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) { 364 logx("%s: notification file not modified (%s#%lld)", s->local, 365 s->repository->session_id, s->repository->serial); 366 /* no need to update state file */ 367 rrdp_free(s); 368 rrdp_done(id, 1); 369 } else { 370 rrdp_failed(s); 371 } 372 } 373 374 static void 375 rrdp_abort_req(struct rrdp *s) 376 { 377 unsigned int id = s->id; 378 379 s->aborted = 1; 380 if (s->state == RRDP_STATE_REQ) { 381 /* nothing is pending, just abort */ 382 rrdp_free(s); 383 rrdp_done(id, 1); 384 return; 385 } 386 if (s->state == RRDP_STATE_WAIT) 387 /* wait for HTTP_INI which will progress the state */ 388 return; 389 390 /* 391 * RRDP_STATE_PARSE or later, close infd, abort parser but 392 * wait for HTTP_FIN and file_pending to drop to 0. 393 */ 394 if (s->infd != -1) { 395 close(s->infd); 396 s->infd = -1; 397 s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR; 398 } 399 rrdp_finished(s); 400 } 401 402 static void 403 rrdp_input_handler(int fd) 404 { 405 static struct ibuf *inbuf; 406 struct rrdp_session *state; 407 char *local, *notify, *last_mod; 408 struct ibuf *b; 409 struct rrdp *s; 410 enum rrdp_msg type; 411 enum http_result res; 412 unsigned int id; 413 int ok; 414 415 b = io_buf_recvfd(fd, &inbuf); 416 if (b == NULL) 417 return; 418 419 io_read_buf(b, &type, sizeof(type)); 420 io_read_buf(b, &id, sizeof(id)); 421 422 switch (type) { 423 case RRDP_START: 424 if (ibuf_fd_avail(b)) 425 errx(1, "received unexpected fd"); 426 io_read_str(b, &local); 427 io_read_str(b, ¬ify); 428 state = rrdp_session_read(b); 429 rrdp_new(id, local, notify, state); 430 break; 431 case RRDP_HTTP_INI: 432 s = rrdp_get(id); 433 if (s == NULL) 434 errx(1, "http ini, rrdp session %u does not exist", id); 435 if (s->state != RRDP_STATE_WAIT) 436 errx(1, "%s: bad internal state", s->local); 437 s->infd = ibuf_fd_get(b); 438 if (s->infd == -1) 439 errx(1, "expected fd not received"); 440 s->state = RRDP_STATE_PARSE; 441 if (s->aborted) { 442 rrdp_abort_req(s); 443 break; 444 } 445 break; 446 case RRDP_HTTP_FIN: 447 io_read_buf(b, &res, sizeof(res)); 448 io_read_str(b, &last_mod); 449 if (ibuf_fd_avail(b)) 450 errx(1, "received unexpected fd"); 451 452 s = rrdp_get(id); 453 if (s == NULL) 454 errx(1, "http fin, rrdp session %u does not exist", id); 455 if (!(s->state & RRDP_STATE_PARSE)) 456 errx(1, "%s: bad internal state", s->local); 457 s->state |= RRDP_STATE_HTTP_DONE; 458 s->res = res; 459 free(s->last_mod); 460 s->last_mod = last_mod; 461 rrdp_finished(s); 462 break; 463 case RRDP_FILE: 464 s = rrdp_get(id); 465 if (s == NULL) 466 errx(1, "file, rrdp session %u does not exist", id);; 467 if (ibuf_fd_avail(b)) 468 errx(1, "received unexpected fd"); 469 io_read_buf(b, &ok, sizeof(ok)); 470 if (ok != 1) 471 s->file_failed++; 472 s->file_pending--; 473 if (s->file_pending == 0) 474 rrdp_finished(s); 475 break; 476 case RRDP_ABORT: 477 if (ibuf_fd_avail(b)) 478 errx(1, "received unexpected fd"); 479 s = rrdp_get(id); 480 if (s != NULL) 481 rrdp_abort_req(s); 482 break; 483 default: 484 errx(1, "unexpected message %d", type); 485 } 486 ibuf_free(b); 487 } 488 489 static void 490 rrdp_data_handler(struct rrdp *s) 491 { 492 char buf[READ_BUF_SIZE]; 493 XML_Parser p = s->parser; 494 ssize_t len; 495 496 len = read(s->infd, buf, sizeof(buf)); 497 if (len == -1) { 498 warn("%s: read failure", s->local); 499 rrdp_abort_req(s); 500 return; 501 } 502 if ((s->state & RRDP_STATE_PARSE) == 0) 503 errx(1, "%s: bad parser state", s->local); 504 if (len == 0) { 505 /* parser stage finished */ 506 close(s->infd); 507 s->infd = -1; 508 509 if (s->task != NOTIFICATION) { 510 char h[SHA256_DIGEST_LENGTH]; 511 512 SHA256_Final(h, &s->ctx); 513 if (memcmp(s->hash, h, sizeof(s->hash)) != 0) { 514 s->state |= RRDP_STATE_PARSE_ERROR; 515 warnx("%s: bad message digest", s->local); 516 } 517 } 518 519 s->state |= RRDP_STATE_PARSE_DONE; 520 rrdp_finished(s); 521 return; 522 } 523 524 /* parse and maybe hash the bytes just read */ 525 if (s->task != NOTIFICATION) 526 SHA256_Update(&s->ctx, buf, len); 527 if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 && 528 XML_Parse(p, buf, len, 0) != XML_STATUS_OK) { 529 warnx("%s: parse error at line %llu: %s", s->local, 530 (unsigned long long)XML_GetCurrentLineNumber(p), 531 XML_ErrorString(XML_GetErrorCode(p))); 532 s->state |= RRDP_STATE_PARSE_ERROR; 533 } 534 } 535 536 void 537 proc_rrdp(int fd) 538 { 539 struct pollfd pfds[MAX_SESSIONS + 1]; 540 struct rrdp *s, *ns; 541 size_t i; 542 543 if (pledge("stdio recvfd", NULL) == -1) 544 err(1, "pledge"); 545 546 msgbuf_init(&msgq); 547 msgq.fd = fd; 548 549 for (;;) { 550 i = 1; 551 memset(&pfds, 0, sizeof(pfds)); 552 TAILQ_FOREACH(s, &states, entry) { 553 if (i >= MAX_SESSIONS + 1) { 554 /* not enough sessions, wait for better times */ 555 s->pfd = NULL; 556 continue; 557 } 558 /* request new assets when there are free sessions */ 559 if (s->state == RRDP_STATE_REQ) { 560 const char *uri; 561 switch (s->task) { 562 case NOTIFICATION: 563 rrdp_http_req(s->id, s->notifyuri, 564 s->repository->last_mod); 565 break; 566 case SNAPSHOT: 567 case DELTA: 568 uri = notification_get_next(s->nxml, 569 s->hash, sizeof(s->hash), 570 s->task); 571 SHA256_Init(&s->ctx); 572 rrdp_http_req(s->id, uri, NULL); 573 break; 574 } 575 s->state = RRDP_STATE_WAIT; 576 } 577 s->pfd = pfds + i++; 578 s->pfd->fd = s->infd; 579 s->pfd->events = POLLIN; 580 } 581 582 /* 583 * Update main fd last. 584 * The previous loop may have enqueue messages. 585 */ 586 pfds[0].fd = fd; 587 pfds[0].events = POLLIN; 588 if (msgq.queued) 589 pfds[0].events |= POLLOUT; 590 591 if (poll(pfds, i, INFTIM) == -1) { 592 if (errno == EINTR) 593 continue; 594 err(1, "poll"); 595 } 596 597 if (pfds[0].revents & POLLHUP) 598 break; 599 if (pfds[0].revents & POLLOUT) { 600 switch (msgbuf_write(&msgq)) { 601 case 0: 602 errx(1, "write: connection closed"); 603 case -1: 604 err(1, "write"); 605 } 606 } 607 if (pfds[0].revents & POLLIN) 608 rrdp_input_handler(fd); 609 610 TAILQ_FOREACH_SAFE(s, &states, entry, ns) { 611 if (s->pfd == NULL) 612 continue; 613 if (s->pfd->revents != 0) 614 rrdp_data_handler(s); 615 } 616 } 617 618 exit(0); 619 } 620