1 /* $OpenBSD: queue_backend.c,v 1.69 2023/05/31 16:51:46 op Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <errno.h> 20 #include <fcntl.h> 21 #include <grp.h> 22 #include <inttypes.h> 23 #include <pwd.h> 24 #include <stdlib.h> 25 #include <string.h> 26 #include <time.h> 27 #include <unistd.h> 28 29 #include "smtpd.h" 30 #include "log.h" 31 32 static const char* envelope_validate(struct envelope *); 33 34 extern struct queue_backend queue_backend_fs; 35 extern struct queue_backend queue_backend_null; 36 extern struct queue_backend queue_backend_proc; 37 extern struct queue_backend queue_backend_ram; 38 39 static void queue_envelope_cache_add(struct envelope *); 40 static void queue_envelope_cache_update(struct envelope *); 41 static void queue_envelope_cache_del(uint64_t evpid); 42 43 TAILQ_HEAD(evplst, envelope); 44 45 static struct tree evpcache_tree; 46 static struct evplst evpcache_list; 47 static struct queue_backend *backend; 48 49 static int (*handler_close)(void); 50 static int (*handler_message_create)(uint32_t *); 51 static int (*handler_message_commit)(uint32_t, const char*); 52 static int (*handler_message_delete)(uint32_t); 53 static int (*handler_message_fd_r)(uint32_t); 54 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 55 static int (*handler_envelope_delete)(uint64_t); 56 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 57 static int (*handler_envelope_load)(uint64_t, char *, size_t); 58 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 59 static int (*handler_message_walk)(uint64_t *, char *, size_t, 60 uint32_t, int *, void **); 61 62 #ifdef QUEUE_PROFILING 63 64 static struct { 65 struct timespec t0; 66 const char *name; 67 } profile; 68 69 static inline void profile_enter(const char *name) 70 { 71 if ((profiling & PROFILE_QUEUE) == 0) 72 return; 73 74 profile.name = name; 75 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 76 } 77 78 static inline void profile_leave(void) 79 { 80 struct timespec t1, dt; 81 82 if ((profiling & PROFILE_QUEUE) == 0) 83 return; 84 85 clock_gettime(CLOCK_MONOTONIC, &t1); 86 timespecsub(&t1, &profile.t0, &dt); 87 log_debug("profile-queue: %s %lld.%09ld", profile.name, 88 (long long)dt.tv_sec, dt.tv_nsec); 89 } 90 #else 91 #define profile_enter(x) do {} while (0) 92 #define profile_leave() do {} while (0) 93 #endif 94 95 static int 96 queue_message_path(uint32_t msgid, char *buf, size_t len) 97 { 98 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 99 } 100 101 int 102 queue_init(const char *name, int server) 103 { 104 struct passwd *pwq; 105 struct group *gr; 106 int r; 107 108 pwq = getpwnam(SMTPD_QUEUE_USER); 109 if (pwq == NULL) 110 fatalx("unknown user %s", SMTPD_QUEUE_USER); 111 112 gr = getgrnam(SMTPD_QUEUE_GROUP); 113 if (gr == NULL) 114 fatalx("unknown group %s", SMTPD_QUEUE_GROUP); 115 116 tree_init(&evpcache_tree); 117 TAILQ_INIT(&evpcache_list); 118 119 if (!strcmp(name, "fs")) 120 backend = &queue_backend_fs; 121 else if (!strcmp(name, "null")) 122 backend = &queue_backend_null; 123 else if (!strcmp(name, "ram")) 124 backend = &queue_backend_ram; 125 else 126 backend = &queue_backend_proc; 127 128 if (server) { 129 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 130 fatalx("error in spool directory setup"); 131 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 132 fatalx("error in offline directory setup"); 133 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 134 fatalx("error in purge directory setup"); 135 136 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 137 138 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 139 fatalx("error in purge directory setup"); 140 } 141 142 r = backend->init(pwq, server, name); 143 144 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 145 146 return (r); 147 } 148 149 int 150 queue_close(void) 151 { 152 if (handler_close) 153 return (handler_close()); 154 155 return (1); 156 } 157 158 int 159 queue_message_create(uint32_t *msgid) 160 { 161 int r; 162 163 profile_enter("queue_message_create"); 164 r = handler_message_create(msgid); 165 profile_leave(); 166 167 log_trace(TRACE_QUEUE, 168 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 169 r, *msgid); 170 171 return (r); 172 } 173 174 int 175 queue_message_delete(uint32_t msgid) 176 { 177 char msgpath[PATH_MAX]; 178 uint64_t evpid; 179 void *iter; 180 int r; 181 182 profile_enter("queue_message_delete"); 183 r = handler_message_delete(msgid); 184 profile_leave(); 185 186 /* in case the message is incoming */ 187 queue_message_path(msgid, msgpath, sizeof(msgpath)); 188 unlink(msgpath); 189 190 /* remove remaining envelopes from the cache if any (on rollback) */ 191 evpid = msgid_to_evpid(msgid); 192 for (;;) { 193 iter = NULL; 194 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) 195 break; 196 if (evpid_to_msgid(evpid) != msgid) 197 break; 198 queue_envelope_cache_del(evpid); 199 } 200 201 log_trace(TRACE_QUEUE, 202 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 203 204 return (r); 205 } 206 207 int 208 queue_message_commit(uint32_t msgid) 209 { 210 int r; 211 char msgpath[PATH_MAX]; 212 char tmppath[PATH_MAX]; 213 FILE *ifp = NULL; 214 FILE *ofp = NULL; 215 216 profile_enter("queue_message_commit"); 217 218 queue_message_path(msgid, msgpath, sizeof(msgpath)); 219 220 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 221 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 222 ifp = fopen(msgpath, "r"); 223 ofp = fopen(tmppath, "w+"); 224 if (ifp == NULL || ofp == NULL) 225 goto err; 226 if (!compress_file(ifp, ofp)) 227 goto err; 228 fclose(ifp); 229 fclose(ofp); 230 ifp = NULL; 231 ofp = NULL; 232 233 if (rename(tmppath, msgpath) == -1) { 234 if (errno == ENOSPC) 235 return (0); 236 unlink(tmppath); 237 log_warn("rename"); 238 return (0); 239 } 240 } 241 242 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 243 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 244 ifp = fopen(msgpath, "r"); 245 ofp = fopen(tmppath, "w+"); 246 if (ifp == NULL || ofp == NULL) 247 goto err; 248 if (!crypto_encrypt_file(ifp, ofp)) 249 goto err; 250 fclose(ifp); 251 fclose(ofp); 252 ifp = NULL; 253 ofp = NULL; 254 255 if (rename(tmppath, msgpath) == -1) { 256 if (errno == ENOSPC) 257 return (0); 258 unlink(tmppath); 259 log_warn("rename"); 260 return (0); 261 } 262 } 263 264 r = handler_message_commit(msgid, msgpath); 265 profile_leave(); 266 267 /* in case it's not done by the backend */ 268 unlink(msgpath); 269 270 log_trace(TRACE_QUEUE, 271 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 272 msgid, r); 273 274 return (r); 275 276 err: 277 if (ifp) 278 fclose(ifp); 279 if (ofp) 280 fclose(ofp); 281 return 0; 282 } 283 284 int 285 queue_message_fd_r(uint32_t msgid) 286 { 287 int fdin = -1, fdout = -1, fd = -1; 288 FILE *ifp = NULL; 289 FILE *ofp = NULL; 290 291 profile_enter("queue_message_fd_r"); 292 fdin = handler_message_fd_r(msgid); 293 profile_leave(); 294 295 log_trace(TRACE_QUEUE, 296 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 297 298 if (fdin == -1) 299 return (-1); 300 301 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 302 if ((fdout = mktmpfile()) == -1) 303 goto err; 304 if ((fd = dup(fdout)) == -1) 305 goto err; 306 if ((ifp = fdopen(fdin, "r")) == NULL) 307 goto err; 308 fdin = fd; 309 fd = -1; 310 if ((ofp = fdopen(fdout, "w+")) == NULL) 311 goto err; 312 313 if (!crypto_decrypt_file(ifp, ofp)) 314 goto err; 315 316 fclose(ifp); 317 ifp = NULL; 318 fclose(ofp); 319 ofp = NULL; 320 lseek(fdin, SEEK_SET, 0); 321 } 322 323 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 324 if ((fdout = mktmpfile()) == -1) 325 goto err; 326 if ((fd = dup(fdout)) == -1) 327 goto err; 328 if ((ifp = fdopen(fdin, "r")) == NULL) 329 goto err; 330 fdin = fd; 331 fd = -1; 332 if ((ofp = fdopen(fdout, "w+")) == NULL) 333 goto err; 334 335 if (!uncompress_file(ifp, ofp)) 336 goto err; 337 338 fclose(ifp); 339 ifp = NULL; 340 fclose(ofp); 341 ofp = NULL; 342 lseek(fdin, SEEK_SET, 0); 343 } 344 345 return (fdin); 346 347 err: 348 if (fd != -1) 349 close(fd); 350 if (fdin != -1) 351 close(fdin); 352 if (fdout != -1) 353 close(fdout); 354 if (ifp) 355 fclose(ifp); 356 if (ofp) 357 fclose(ofp); 358 return -1; 359 } 360 361 int 362 queue_message_fd_rw(uint32_t msgid) 363 { 364 char buf[PATH_MAX]; 365 366 queue_message_path(msgid, buf, sizeof(buf)); 367 368 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 369 } 370 371 static int 372 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 373 { 374 char *evp; 375 size_t evplen; 376 size_t complen; 377 char compbuf[sizeof(struct envelope)]; 378 size_t enclen; 379 char encbuf[sizeof(struct envelope)]; 380 381 evp = evpbuf; 382 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 383 if (evplen == 0) 384 return (0); 385 386 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 387 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 388 if (complen == 0) 389 return (0); 390 evp = compbuf; 391 evplen = complen; 392 } 393 394 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 395 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 396 if (enclen == 0) 397 return (0); 398 evp = encbuf; 399 evplen = enclen; 400 } 401 402 memmove(evpbuf, evp, evplen); 403 404 return (evplen); 405 } 406 407 static int 408 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 409 { 410 char *evp; 411 size_t evplen; 412 char compbuf[sizeof(struct envelope)]; 413 size_t complen; 414 char encbuf[sizeof(struct envelope)]; 415 size_t enclen; 416 417 evp = evpbuf; 418 evplen = evpbufsize; 419 420 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 421 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 422 if (enclen == 0) 423 return (0); 424 evp = encbuf; 425 evplen = enclen; 426 } 427 428 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 429 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 430 if (complen == 0) 431 return (0); 432 evp = compbuf; 433 evplen = complen; 434 } 435 436 return (envelope_load_buffer(ep, evp, evplen)); 437 } 438 439 static void 440 queue_envelope_cache_add(struct envelope *e) 441 { 442 struct envelope *cached; 443 444 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 445 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 446 447 cached = xcalloc(1, sizeof *cached); 448 *cached = *e; 449 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 450 tree_xset(&evpcache_tree, e->id, cached); 451 stat_increment("queue.evpcache.size", 1); 452 } 453 454 static void 455 queue_envelope_cache_update(struct envelope *e) 456 { 457 struct envelope *cached; 458 459 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 460 queue_envelope_cache_add(e); 461 stat_increment("queue.evpcache.update.missed", 1); 462 } else { 463 TAILQ_REMOVE(&evpcache_list, cached, entry); 464 *cached = *e; 465 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 466 stat_increment("queue.evpcache.update.hit", 1); 467 } 468 } 469 470 static void 471 queue_envelope_cache_del(uint64_t evpid) 472 { 473 struct envelope *cached; 474 475 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 476 return; 477 478 TAILQ_REMOVE(&evpcache_list, cached, entry); 479 free(cached); 480 stat_decrement("queue.evpcache.size", 1); 481 } 482 483 int 484 queue_envelope_create(struct envelope *ep) 485 { 486 int r; 487 char evpbuf[sizeof(struct envelope)]; 488 size_t evplen; 489 uint64_t evpid; 490 uint32_t msgid; 491 492 ep->creation = time(NULL); 493 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 494 if (evplen == 0) 495 return (0); 496 497 evpid = ep->id; 498 msgid = evpid_to_msgid(evpid); 499 500 profile_enter("queue_envelope_create"); 501 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 502 profile_leave(); 503 504 log_trace(TRACE_QUEUE, 505 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 506 evpid, evplen, r, ep->id); 507 508 if (!r) { 509 ep->creation = 0; 510 ep->id = 0; 511 } 512 513 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 514 queue_envelope_cache_add(ep); 515 516 return (r); 517 } 518 519 int 520 queue_envelope_delete(uint64_t evpid) 521 { 522 int r; 523 524 if (env->sc_queue_flags & QUEUE_EVPCACHE) 525 queue_envelope_cache_del(evpid); 526 527 profile_enter("queue_envelope_delete"); 528 r = handler_envelope_delete(evpid); 529 profile_leave(); 530 531 log_trace(TRACE_QUEUE, 532 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 533 evpid, r); 534 535 return (r); 536 } 537 538 int 539 queue_envelope_load(uint64_t evpid, struct envelope *ep) 540 { 541 const char *e; 542 char evpbuf[sizeof(struct envelope)]; 543 size_t evplen; 544 struct envelope *cached; 545 546 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 547 (cached = tree_get(&evpcache_tree, evpid))) { 548 *ep = *cached; 549 stat_increment("queue.evpcache.load.hit", 1); 550 return (1); 551 } 552 553 ep->id = evpid; 554 profile_enter("queue_envelope_load"); 555 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 556 profile_leave(); 557 558 log_trace(TRACE_QUEUE, 559 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 560 evpid, evplen); 561 562 if (evplen == 0) 563 return (0); 564 565 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 566 if ((e = envelope_validate(ep)) == NULL) { 567 ep->id = evpid; 568 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 569 queue_envelope_cache_add(ep); 570 stat_increment("queue.evpcache.load.missed", 1); 571 } 572 return (1); 573 } 574 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 575 evpid, e); 576 } 577 return (0); 578 } 579 580 int 581 queue_envelope_update(struct envelope *ep) 582 { 583 char evpbuf[sizeof(struct envelope)]; 584 size_t evplen; 585 int r; 586 587 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 588 if (evplen == 0) 589 return (0); 590 591 profile_enter("queue_envelope_update"); 592 r = handler_envelope_update(ep->id, evpbuf, evplen); 593 profile_leave(); 594 595 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 596 queue_envelope_cache_update(ep); 597 598 log_trace(TRACE_QUEUE, 599 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 600 ep->id, r); 601 602 return (r); 603 } 604 605 int 606 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 607 { 608 char evpbuf[sizeof(struct envelope)]; 609 uint64_t evpid; 610 int r; 611 const char *e; 612 613 profile_enter("queue_message_walk"); 614 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 615 msgid, done, data); 616 profile_leave(); 617 618 log_trace(TRACE_QUEUE, 619 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 620 r, evpid); 621 622 if (r == -1) 623 return (r); 624 625 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 626 if ((e = envelope_validate(ep)) == NULL) { 627 ep->id = evpid; 628 /* 629 * do not cache the envelope here, while discovering 630 * envelopes one could re-run discover on already 631 * scheduled envelopes which leads to triggering of 632 * strict checks in caching. Envelopes could anyway 633 * be loaded from backend if it isn't cached. 634 */ 635 return (1); 636 } 637 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 638 evpid, e); 639 } 640 return (0); 641 } 642 643 int 644 queue_envelope_walk(struct envelope *ep) 645 { 646 const char *e; 647 uint64_t evpid; 648 char evpbuf[sizeof(struct envelope)]; 649 int r; 650 651 profile_enter("queue_envelope_walk"); 652 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 653 profile_leave(); 654 655 log_trace(TRACE_QUEUE, 656 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 657 r, evpid); 658 659 if (r == -1) 660 return (r); 661 662 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 663 if ((e = envelope_validate(ep)) == NULL) { 664 ep->id = evpid; 665 if (env->sc_queue_flags & QUEUE_EVPCACHE) 666 queue_envelope_cache_add(ep); 667 return (1); 668 } 669 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 670 evpid, e); 671 } 672 return (0); 673 } 674 675 uint32_t 676 queue_generate_msgid(void) 677 { 678 uint32_t msgid; 679 680 while ((msgid = arc4random()) == 0) 681 ; 682 683 return msgid; 684 } 685 686 uint64_t 687 queue_generate_evpid(uint32_t msgid) 688 { 689 uint32_t rnd; 690 uint64_t evpid; 691 692 while ((rnd = arc4random()) == 0) 693 ; 694 695 evpid = msgid; 696 evpid <<= 32; 697 evpid |= rnd; 698 699 return evpid; 700 } 701 702 static const char* 703 envelope_validate(struct envelope *ep) 704 { 705 if (ep->version != SMTPD_ENVELOPE_VERSION) 706 return "version mismatch"; 707 708 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 709 return "invalid helo"; 710 if (ep->helo[0] == '\0') 711 return "empty helo"; 712 713 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 714 return "invalid hostname"; 715 if (ep->hostname[0] == '\0') 716 return "empty hostname"; 717 718 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 719 return "invalid error line"; 720 721 if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL) 722 return "unknown dispatcher"; 723 724 return NULL; 725 } 726 727 void 728 queue_api_on_close(int(*cb)(void)) 729 { 730 handler_close = cb; 731 } 732 733 void 734 queue_api_on_message_create(int(*cb)(uint32_t *)) 735 { 736 handler_message_create = cb; 737 } 738 739 void 740 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 741 { 742 handler_message_commit = cb; 743 } 744 745 void 746 queue_api_on_message_delete(int(*cb)(uint32_t)) 747 { 748 handler_message_delete = cb; 749 } 750 751 void 752 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 753 { 754 handler_message_fd_r = cb; 755 } 756 757 void 758 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 759 { 760 handler_envelope_create = cb; 761 } 762 763 void 764 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 765 { 766 handler_envelope_delete = cb; 767 } 768 769 void 770 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 771 { 772 handler_envelope_update = cb; 773 } 774 775 void 776 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 777 { 778 handler_envelope_load = cb; 779 } 780 781 void 782 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 783 { 784 handler_envelope_walk = cb; 785 } 786 787 void 788 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 789 uint32_t, int *, void **)) 790 { 791 handler_message_walk = cb; 792 } 793