1 /* $OpenBSD: rrdp.c,v 1.18 2021/11/24 15:24:16 claudio Exp $ */ 2 /* 3 * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com> 4 * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 #include <sys/queue.h> 19 #include <sys/stat.h> 20 21 #include <assert.h> 22 #include <err.h> 23 #include <errno.h> 24 #include <fcntl.h> 25 #include <limits.h> 26 #include <poll.h> 27 #include <string.h> 28 #include <unistd.h> 29 #include <imsg.h> 30 31 #include <expat.h> 32 #include <openssl/sha.h> 33 34 #include "extern.h" 35 #include "rrdp.h" 36 37 #define MAX_SESSIONS 12 38 #define READ_BUF_SIZE (32 * 1024) 39 40 static struct msgbuf msgq; 41 42 #define RRDP_STATE_REQ 0x01 43 #define RRDP_STATE_WAIT 0x02 44 #define RRDP_STATE_PARSE 0x04 45 #define RRDP_STATE_PARSE_ERROR 0x08 46 #define RRDP_STATE_PARSE_DONE 0x10 47 #define RRDP_STATE_HTTP_DONE 0x20 48 #define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE) 49 50 struct rrdp { 51 TAILQ_ENTRY(rrdp) entry; 52 size_t id; 53 char *notifyuri; 54 char *local; 55 char *last_mod; 56 57 struct pollfd *pfd; 58 int infd; 59 int state; 60 unsigned int file_pending; 61 unsigned int file_failed; 62 enum http_result res; 63 enum rrdp_task task; 64 65 char hash[SHA256_DIGEST_LENGTH]; 66 SHA256_CTX ctx; 67 68 struct rrdp_session repository; 69 struct rrdp_session current; 70 XML_Parser parser; 71 struct notification_xml *nxml; 72 struct snapshot_xml *sxml; 73 struct delta_xml *dxml; 74 }; 75 76 TAILQ_HEAD(,rrdp) states = TAILQ_HEAD_INITIALIZER(states); 77 78 char * 79 xstrdup(const char *s) 80 { 81 char *r; 82 if ((r = strdup(s)) == NULL) 83 err(1, "strdup"); 84 return r; 85 } 86 87 /* 88 * Report back that a RRDP request finished. 89 * ok should only be set to 1 if the cache is now up-to-date. 90 */ 91 static void 92 rrdp_done(size_t id, int ok) 93 { 94 enum rrdp_msg type = RRDP_END; 95 struct ibuf *b; 96 97 b = io_new_buffer(); 98 io_simple_buffer(b, &type, sizeof(type)); 99 io_simple_buffer(b, &id, sizeof(id)); 100 io_simple_buffer(b, &ok, sizeof(ok)); 101 io_close_buffer(&msgq, b); 102 } 103 104 /* 105 * Request an URI to be fetched via HTTPS. 106 * The main process will respond with a RRDP_HTTP_INI which includes 107 * the file descriptor to read from. RRDP_HTTP_FIN is sent at the 108 * end of the request with the HTTP status code and last modified timestamp. 109 * If the request should not set the If-Modified-Since: header then last_mod 110 * should be set to NULL, else it should point to a proper date string. 111 */ 112 static void 113 rrdp_http_req(size_t id, const char *uri, const char *last_mod) 114 { 115 enum rrdp_msg type = RRDP_HTTP_REQ; 116 struct ibuf *b; 117 118 b = io_new_buffer(); 119 io_simple_buffer(b, &type, sizeof(type)); 120 io_simple_buffer(b, &id, sizeof(id)); 121 io_str_buffer(b, uri); 122 io_str_buffer(b, last_mod); 123 io_close_buffer(&msgq, b); 124 } 125 126 /* 127 * Send the session state to the main process so it gets stored. 128 */ 129 static void 130 rrdp_state_send(struct rrdp *s) 131 { 132 enum rrdp_msg type = RRDP_SESSION; 133 struct ibuf *b; 134 135 b = io_new_buffer(); 136 io_simple_buffer(b, &type, sizeof(type)); 137 io_simple_buffer(b, &s->id, sizeof(s->id)); 138 io_str_buffer(b, s->current.session_id); 139 io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial)); 140 io_str_buffer(b, s->current.last_mod); 141 io_close_buffer(&msgq, b); 142 } 143 144 /* 145 * Send a blob of data to the main process to store it in the repository. 146 */ 147 void 148 rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml, 149 unsigned char *data, size_t datasz) 150 { 151 enum rrdp_msg type = RRDP_FILE; 152 struct ibuf *b; 153 154 /* only send files if the fetch did not fail already */ 155 if (s->file_failed == 0) { 156 b = io_new_buffer(); 157 io_simple_buffer(b, &type, sizeof(type)); 158 io_simple_buffer(b, &s->id, sizeof(s->id)); 159 io_simple_buffer(b, &pxml->type, sizeof(pxml->type)); 160 if (pxml->type != PUB_ADD) 161 io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash)); 162 io_str_buffer(b, pxml->uri); 163 io_buf_buffer(b, data, datasz); 164 io_close_buffer(&msgq, b); 165 s->file_pending++; 166 } 167 } 168 169 static struct rrdp * 170 rrdp_new(size_t id, char *local, char *notify, char *session_id, 171 long long serial, char *last_mod) 172 { 173 struct rrdp *s; 174 175 if ((s = calloc(1, sizeof(*s))) == NULL) 176 err(1, NULL); 177 178 s->infd = -1; 179 s->id = id; 180 s->local = local; 181 s->notifyuri = notify; 182 s->repository.session_id = session_id; 183 s->repository.serial = serial; 184 s->repository.last_mod = last_mod; 185 186 s->state = RRDP_STATE_REQ; 187 if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL) 188 err(1, "XML_ParserCreate"); 189 190 s->nxml = new_notification_xml(s->parser, &s->repository, &s->current, 191 notify); 192 193 TAILQ_INSERT_TAIL(&states, s, entry); 194 195 return s; 196 } 197 198 static void 199 rrdp_free(struct rrdp *s) 200 { 201 if (s == NULL) 202 return; 203 204 TAILQ_REMOVE(&states, s, entry); 205 206 free_notification_xml(s->nxml); 207 free_snapshot_xml(s->sxml); 208 free_delta_xml(s->dxml); 209 210 if (s->parser) 211 XML_ParserFree(s->parser); 212 if (s->infd != -1) 213 close(s->infd); 214 free(s->notifyuri); 215 free(s->local); 216 free(s->last_mod); 217 free(s->repository.last_mod); 218 free(s->repository.session_id); 219 free(s->current.last_mod); 220 free(s->current.session_id); 221 222 free(s); 223 } 224 225 static struct rrdp * 226 rrdp_get(size_t id) 227 { 228 struct rrdp *s; 229 230 TAILQ_FOREACH(s, &states, entry) 231 if (s->id == id) 232 break; 233 return s; 234 } 235 236 static void 237 rrdp_failed(struct rrdp *s) 238 { 239 size_t id = s->id; 240 241 /* reset file state before retrying */ 242 s->file_failed = 0; 243 244 /* XXX MUST do some cleanup in the repo here */ 245 if (s->task == DELTA) { 246 /* fallback to a snapshot as per RFC8182 */ 247 free_delta_xml(s->dxml); 248 s->dxml = NULL; 249 s->sxml = new_snapshot_xml(s->parser, &s->current, s); 250 s->task = SNAPSHOT; 251 s->state = RRDP_STATE_REQ; 252 logx("%s: delta sync failed, fallback to snapshot", s->local); 253 } else { 254 /* 255 * TODO: update state to track recurring failures 256 * and fall back to rsync after a while. 257 * This should probably happen in the main process. 258 */ 259 rrdp_free(s); 260 rrdp_done(id, 0); 261 } 262 } 263 264 static void 265 rrdp_finished(struct rrdp *s) 266 { 267 size_t id = s->id; 268 269 /* check if all parts of the process have finished */ 270 if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE) 271 return; 272 273 /* still some files pending */ 274 if (s->file_pending > 0) 275 return; 276 277 if (s->state & RRDP_STATE_PARSE_ERROR) { 278 rrdp_failed(s); 279 return; 280 } 281 282 if (s->res == HTTP_OK) { 283 XML_Parser p = s->parser; 284 285 /* 286 * Finalize parsing on success to be sure that 287 * all of the XML is correct. Needs to be done here 288 * since the call would most probably fail for non 289 * successful data fetches. 290 */ 291 if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) { 292 warnx("%s: XML error at line %llu: %s", s->local, 293 (unsigned long long)XML_GetCurrentLineNumber(p), 294 XML_ErrorString(XML_GetErrorCode(p))); 295 rrdp_failed(s); 296 return; 297 } 298 299 /* If a file caused an error fail the update */ 300 if (s->file_failed > 0) { 301 rrdp_failed(s); 302 return; 303 } 304 305 switch (s->task) { 306 case NOTIFICATION: 307 s->task = notification_done(s->nxml, s->last_mod); 308 s->last_mod = NULL; 309 switch (s->task) { 310 case NOTIFICATION: 311 logx("%s: repository not modified", s->local); 312 rrdp_state_send(s); 313 rrdp_free(s); 314 rrdp_done(id, 1); 315 break; 316 case SNAPSHOT: 317 logx("%s: downloading snapshot", s->local); 318 s->sxml = new_snapshot_xml(p, &s->current, s); 319 s->state = RRDP_STATE_REQ; 320 break; 321 case DELTA: 322 logx("%s: downloading %lld deltas", s->local, 323 s->repository.serial - s->current.serial); 324 s->dxml = new_delta_xml(p, &s->current, s); 325 s->state = RRDP_STATE_REQ; 326 break; 327 } 328 break; 329 case SNAPSHOT: 330 rrdp_state_send(s); 331 rrdp_free(s); 332 rrdp_done(id, 1); 333 break; 334 case DELTA: 335 if (notification_delta_done(s->nxml)) { 336 /* finished */ 337 rrdp_state_send(s); 338 rrdp_free(s); 339 rrdp_done(id, 1); 340 } else { 341 /* reset delta parser for next delta */ 342 free_delta_xml(s->dxml); 343 s->dxml = new_delta_xml(p, &s->current, s); 344 s->state = RRDP_STATE_REQ; 345 } 346 break; 347 } 348 } else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) { 349 logx("%s: notification file not modified", s->local); 350 /* no need to update state file */ 351 rrdp_free(s); 352 rrdp_done(id, 1); 353 } else { 354 rrdp_failed(s); 355 } 356 } 357 358 static void 359 rrdp_input_handler(int fd) 360 { 361 static struct ibuf *inbuf; 362 char *local, *notify, *session_id, *last_mod; 363 struct ibuf *b; 364 struct rrdp *s; 365 enum rrdp_msg type; 366 enum http_result res; 367 long long serial; 368 size_t id; 369 int ok; 370 371 b = io_buf_recvfd(fd, &inbuf); 372 if (b == NULL) 373 return; 374 375 io_read_buf(b, &type, sizeof(type)); 376 io_read_buf(b, &id, sizeof(id)); 377 378 switch (type) { 379 case RRDP_START: 380 io_read_str(b, &local); 381 io_read_str(b, ¬ify); 382 io_read_str(b, &session_id); 383 io_read_buf(b, &serial, sizeof(serial)); 384 io_read_str(b, &last_mod); 385 if (b->fd != -1) 386 errx(1, "received unexpected fd"); 387 388 s = rrdp_new(id, local, notify, session_id, serial, last_mod); 389 break; 390 case RRDP_HTTP_INI: 391 if (b->fd == -1) 392 errx(1, "expected fd not received"); 393 s = rrdp_get(id); 394 if (s == NULL) 395 errx(1, "rrdp session %zu does not exist", id); 396 if (s->state != RRDP_STATE_WAIT) 397 errx(1, "%s: bad internal state", s->local); 398 399 s->infd = b->fd; 400 s->state = RRDP_STATE_PARSE; 401 break; 402 case RRDP_HTTP_FIN: 403 io_read_buf(b, &res, sizeof(res)); 404 io_read_str(b, &last_mod); 405 if (b->fd != -1) 406 errx(1, "received unexpected fd"); 407 408 s = rrdp_get(id); 409 if (s == NULL) 410 errx(1, "rrdp session %zu does not exist", id); 411 if (!(s->state & RRDP_STATE_PARSE)) 412 errx(1, "%s: bad internal state", s->local); 413 414 s->res = res; 415 s->last_mod = last_mod; 416 s->state |= RRDP_STATE_HTTP_DONE; 417 rrdp_finished(s); 418 break; 419 case RRDP_FILE: 420 s = rrdp_get(id); 421 if (s == NULL) 422 errx(1, "rrdp session %zu does not exist", id); 423 if (b->fd != -1) 424 errx(1, "received unexpected fd"); 425 io_read_buf(b, &ok, sizeof(ok)); 426 if (ok != 1) 427 s->file_failed++; 428 s->file_pending--; 429 if (s->file_pending == 0) 430 rrdp_finished(s); 431 break; 432 default: 433 errx(1, "unexpected message %d", type); 434 } 435 ibuf_free(b); 436 } 437 438 static void 439 rrdp_data_handler(struct rrdp *s) 440 { 441 char buf[READ_BUF_SIZE]; 442 XML_Parser p = s->parser; 443 ssize_t len; 444 445 len = read(s->infd, buf, sizeof(buf)); 446 if (len == -1) { 447 s->state |= RRDP_STATE_PARSE_ERROR; 448 warn("%s: read failure", s->local); 449 return; 450 } 451 if ((s->state & RRDP_STATE_PARSE) == 0) 452 errx(1, "%s: bad parser state", s->local); 453 if (len == 0) { 454 /* parser stage finished */ 455 close(s->infd); 456 s->infd = -1; 457 458 if (s->task != NOTIFICATION) { 459 char h[SHA256_DIGEST_LENGTH]; 460 461 SHA256_Final(h, &s->ctx); 462 if (memcmp(s->hash, h, sizeof(s->hash)) != 0) { 463 s->state |= RRDP_STATE_PARSE_ERROR; 464 warnx("%s: bad message digest", s->local); 465 } 466 } 467 468 s->state |= RRDP_STATE_PARSE_DONE; 469 rrdp_finished(s); 470 return; 471 } 472 473 /* parse and maybe hash the bytes just read */ 474 if (s->task != NOTIFICATION) 475 SHA256_Update(&s->ctx, buf, len); 476 if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 && 477 XML_Parse(p, buf, len, 0) != XML_STATUS_OK) { 478 warnx("%s: parse error at line %llu: %s", s->local, 479 (unsigned long long)XML_GetCurrentLineNumber(p), 480 XML_ErrorString(XML_GetErrorCode(p))); 481 s->state |= RRDP_STATE_PARSE_ERROR; 482 } 483 } 484 485 void 486 proc_rrdp(int fd) 487 { 488 struct pollfd pfds[MAX_SESSIONS + 1]; 489 struct rrdp *s, *ns; 490 size_t i; 491 492 if (pledge("stdio recvfd", NULL) == -1) 493 err(1, "pledge"); 494 495 msgbuf_init(&msgq); 496 msgq.fd = fd; 497 498 for (;;) { 499 i = 1; 500 memset(&pfds, 0, sizeof(pfds)); 501 TAILQ_FOREACH(s, &states, entry) { 502 if (i >= MAX_SESSIONS + 1) { 503 /* not enough sessions, wait for better times */ 504 s->pfd = NULL; 505 continue; 506 } 507 /* request new assets when there are free sessions */ 508 if (s->state == RRDP_STATE_REQ) { 509 const char *uri; 510 switch (s->task) { 511 case NOTIFICATION: 512 rrdp_http_req(s->id, s->notifyuri, 513 s->repository.last_mod); 514 break; 515 case SNAPSHOT: 516 case DELTA: 517 uri = notification_get_next(s->nxml, 518 s->hash, sizeof(s->hash), 519 s->task); 520 SHA256_Init(&s->ctx); 521 rrdp_http_req(s->id, uri, NULL); 522 break; 523 } 524 s->state = RRDP_STATE_WAIT; 525 } 526 s->pfd = pfds + i++; 527 s->pfd->fd = s->infd; 528 s->pfd->events = POLLIN; 529 } 530 531 /* 532 * Update main fd last. 533 * The previous loop may have enqueue messages. 534 */ 535 pfds[0].fd = fd; 536 pfds[0].events = POLLIN; 537 if (msgq.queued) 538 pfds[0].events |= POLLOUT; 539 540 if (poll(pfds, i, INFTIM) == -1) 541 err(1, "poll"); 542 543 if (pfds[0].revents & POLLHUP) 544 break; 545 if (pfds[0].revents & POLLOUT) { 546 switch (msgbuf_write(&msgq)) { 547 case 0: 548 errx(1, "write: connection closed"); 549 case -1: 550 err(1, "write"); 551 } 552 } 553 if (pfds[0].revents & POLLIN) 554 rrdp_input_handler(fd); 555 556 TAILQ_FOREACH_SAFE(s, &states, entry, ns) { 557 if (s->pfd == NULL) 558 continue; 559 if (s->pfd->revents != 0) 560 rrdp_data_handler(s); 561 } 562 } 563 564 exit(0); 565 } 566