1 /* $OpenBSD: queue_backend.c,v 1.66 2020/04/22 11:35:34 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <err.h> 27 #include <errno.h> 28 #include <event.h> 29 #include <fcntl.h> 30 #include <grp.h> 31 #include <imsg.h> 32 #include <limits.h> 33 #include <inttypes.h> 34 #include <pwd.h> 35 #include <stdio.h> 36 #include <stdlib.h> 37 #include <string.h> 38 #include <time.h> 39 #include <unistd.h> 40 41 #include "smtpd.h" 42 #include "log.h" 43 44 static const char* envelope_validate(struct envelope *); 45 46 extern struct queue_backend queue_backend_fs; 47 extern struct queue_backend queue_backend_null; 48 extern struct queue_backend queue_backend_proc; 49 extern struct queue_backend queue_backend_ram; 50 51 static void queue_envelope_cache_add(struct envelope *); 52 static void queue_envelope_cache_update(struct envelope *); 53 static void queue_envelope_cache_del(uint64_t evpid); 54 55 TAILQ_HEAD(evplst, envelope); 56 57 static struct tree evpcache_tree; 58 static struct evplst evpcache_list; 59 static struct queue_backend *backend; 60 61 static int (*handler_close)(void); 62 static int (*handler_message_create)(uint32_t *); 63 static int (*handler_message_commit)(uint32_t, const char*); 64 static int (*handler_message_delete)(uint32_t); 65 static int (*handler_message_fd_r)(uint32_t); 66 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); 67 static int (*handler_envelope_delete)(uint64_t); 68 static int (*handler_envelope_update)(uint64_t, const char *, size_t); 69 static int (*handler_envelope_load)(uint64_t, char *, size_t); 70 static int (*handler_envelope_walk)(uint64_t *, char *, size_t); 71 static int (*handler_message_walk)(uint64_t *, char *, size_t, 72 uint32_t, int *, void **); 73 74 #ifdef QUEUE_PROFILING 75 76 static struct { 77 struct timespec t0; 78 const char *name; 79 } profile; 80 81 static inline void profile_enter(const char *name) 82 { 83 if ((profiling & PROFILE_QUEUE) == 0) 84 return; 85 86 profile.name = name; 87 clock_gettime(CLOCK_MONOTONIC, &profile.t0); 88 } 89 90 static inline void profile_leave(void) 91 { 92 struct timespec t1, dt; 93 94 if ((profiling & PROFILE_QUEUE) == 0) 95 return; 96 97 clock_gettime(CLOCK_MONOTONIC, &t1); 98 timespecsub(&t1, &profile.t0, &dt); 99 log_debug("profile-queue: %s %lld.%09ld", profile.name, 100 (long long)dt.tv_sec, dt.tv_nsec); 101 } 102 #else 103 #define profile_enter(x) do {} while (0) 104 #define profile_leave() do {} while (0) 105 #endif 106 107 static int 108 queue_message_path(uint32_t msgid, char *buf, size_t len) 109 { 110 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); 111 } 112 113 int 114 queue_init(const char *name, int server) 115 { 116 struct passwd *pwq; 117 struct group *gr; 118 int r; 119 120 pwq = getpwnam(SMTPD_QUEUE_USER); 121 if (pwq == NULL) 122 errx(1, "unknown user %s", SMTPD_QUEUE_USER); 123 124 gr = getgrnam(SMTPD_QUEUE_GROUP); 125 if (gr == NULL) 126 errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); 127 128 tree_init(&evpcache_tree); 129 TAILQ_INIT(&evpcache_list); 130 131 if (!strcmp(name, "fs")) 132 backend = &queue_backend_fs; 133 else if (!strcmp(name, "null")) 134 backend = &queue_backend_null; 135 else if (!strcmp(name, "ram")) 136 backend = &queue_backend_ram; 137 else 138 backend = &queue_backend_proc; 139 140 if (server) { 141 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) 142 errx(1, "error in spool directory setup"); 143 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) 144 errx(1, "error in offline directory setup"); 145 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) 146 errx(1, "error in purge directory setup"); 147 148 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); 149 150 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) 151 errx(1, "error in purge directory setup"); 152 } 153 154 r = backend->init(pwq, server, name); 155 156 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); 157 158 return (r); 159 } 160 161 int 162 queue_close(void) 163 { 164 if (handler_close) 165 return (handler_close()); 166 167 return (1); 168 } 169 170 int 171 queue_message_create(uint32_t *msgid) 172 { 173 int r; 174 175 profile_enter("queue_message_create"); 176 r = handler_message_create(msgid); 177 profile_leave(); 178 179 log_trace(TRACE_QUEUE, 180 "queue-backend: queue_message_create() -> %d (%08"PRIx32")", 181 r, *msgid); 182 183 return (r); 184 } 185 186 int 187 queue_message_delete(uint32_t msgid) 188 { 189 char msgpath[PATH_MAX]; 190 uint64_t evpid; 191 void *iter; 192 int r; 193 194 profile_enter("queue_message_delete"); 195 r = handler_message_delete(msgid); 196 profile_leave(); 197 198 /* in case the message is incoming */ 199 queue_message_path(msgid, msgpath, sizeof(msgpath)); 200 unlink(msgpath); 201 202 /* remove remaining envelopes from the cache if any (on rollback) */ 203 evpid = msgid_to_evpid(msgid); 204 for (;;) { 205 iter = NULL; 206 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) 207 break; 208 if (evpid_to_msgid(evpid) != msgid) 209 break; 210 queue_envelope_cache_del(evpid); 211 } 212 213 log_trace(TRACE_QUEUE, 214 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); 215 216 return (r); 217 } 218 219 int 220 queue_message_commit(uint32_t msgid) 221 { 222 int r; 223 char msgpath[PATH_MAX]; 224 char tmppath[PATH_MAX]; 225 FILE *ifp = NULL; 226 FILE *ofp = NULL; 227 228 profile_enter("queue_message_commit"); 229 230 queue_message_path(msgid, msgpath, sizeof(msgpath)); 231 232 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 233 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); 234 ifp = fopen(msgpath, "r"); 235 ofp = fopen(tmppath, "w+"); 236 if (ifp == NULL || ofp == NULL) 237 goto err; 238 if (!compress_file(ifp, ofp)) 239 goto err; 240 fclose(ifp); 241 fclose(ofp); 242 ifp = NULL; 243 ofp = NULL; 244 245 if (rename(tmppath, msgpath) == -1) { 246 if (errno == ENOSPC) 247 return (0); 248 unlink(tmppath); 249 log_warn("rename"); 250 return (0); 251 } 252 } 253 254 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 255 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); 256 ifp = fopen(msgpath, "r"); 257 ofp = fopen(tmppath, "w+"); 258 if (ifp == NULL || ofp == NULL) 259 goto err; 260 if (!crypto_encrypt_file(ifp, ofp)) 261 goto err; 262 fclose(ifp); 263 fclose(ofp); 264 ifp = NULL; 265 ofp = NULL; 266 267 if (rename(tmppath, msgpath) == -1) { 268 if (errno == ENOSPC) 269 return (0); 270 unlink(tmppath); 271 log_warn("rename"); 272 return (0); 273 } 274 } 275 276 r = handler_message_commit(msgid, msgpath); 277 profile_leave(); 278 279 /* in case it's not done by the backend */ 280 unlink(msgpath); 281 282 log_trace(TRACE_QUEUE, 283 "queue-backend: queue_message_commit(%08"PRIx32") -> %d", 284 msgid, r); 285 286 return (r); 287 288 err: 289 if (ifp) 290 fclose(ifp); 291 if (ofp) 292 fclose(ofp); 293 return 0; 294 } 295 296 int 297 queue_message_fd_r(uint32_t msgid) 298 { 299 int fdin = -1, fdout = -1, fd = -1; 300 FILE *ifp = NULL; 301 FILE *ofp = NULL; 302 303 profile_enter("queue_message_fd_r"); 304 fdin = handler_message_fd_r(msgid); 305 profile_leave(); 306 307 log_trace(TRACE_QUEUE, 308 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); 309 310 if (fdin == -1) 311 return (-1); 312 313 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 314 if ((fdout = mktmpfile()) == -1) 315 goto err; 316 if ((fd = dup(fdout)) == -1) 317 goto err; 318 if ((ifp = fdopen(fdin, "r")) == NULL) 319 goto err; 320 fdin = fd; 321 fd = -1; 322 if ((ofp = fdopen(fdout, "w+")) == NULL) 323 goto err; 324 325 if (!crypto_decrypt_file(ifp, ofp)) 326 goto err; 327 328 fclose(ifp); 329 ifp = NULL; 330 fclose(ofp); 331 ofp = NULL; 332 lseek(fdin, SEEK_SET, 0); 333 } 334 335 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 336 if ((fdout = mktmpfile()) == -1) 337 goto err; 338 if ((fd = dup(fdout)) == -1) 339 goto err; 340 if ((ifp = fdopen(fdin, "r")) == NULL) 341 goto err; 342 fdin = fd; 343 fd = -1; 344 if ((ofp = fdopen(fdout, "w+")) == NULL) 345 goto err; 346 347 if (!uncompress_file(ifp, ofp)) 348 goto err; 349 350 fclose(ifp); 351 ifp = NULL; 352 fclose(ofp); 353 ofp = NULL; 354 lseek(fdin, SEEK_SET, 0); 355 } 356 357 return (fdin); 358 359 err: 360 if (fd != -1) 361 close(fd); 362 if (fdin != -1) 363 close(fdin); 364 if (fdout != -1) 365 close(fdout); 366 if (ifp) 367 fclose(ifp); 368 if (ofp) 369 fclose(ofp); 370 return -1; 371 } 372 373 int 374 queue_message_fd_rw(uint32_t msgid) 375 { 376 char buf[PATH_MAX]; 377 378 queue_message_path(msgid, buf, sizeof(buf)); 379 380 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); 381 } 382 383 static int 384 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 385 { 386 char *evp; 387 size_t evplen; 388 size_t complen; 389 char compbuf[sizeof(struct envelope)]; 390 size_t enclen; 391 char encbuf[sizeof(struct envelope)]; 392 393 evp = evpbuf; 394 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); 395 if (evplen == 0) 396 return (0); 397 398 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 399 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); 400 if (complen == 0) 401 return (0); 402 evp = compbuf; 403 evplen = complen; 404 } 405 406 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 407 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 408 if (enclen == 0) 409 return (0); 410 evp = encbuf; 411 evplen = enclen; 412 } 413 414 memmove(evpbuf, evp, evplen); 415 416 return (evplen); 417 } 418 419 static int 420 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) 421 { 422 char *evp; 423 size_t evplen; 424 char compbuf[sizeof(struct envelope)]; 425 size_t complen; 426 char encbuf[sizeof(struct envelope)]; 427 size_t enclen; 428 429 evp = evpbuf; 430 evplen = evpbufsize; 431 432 if (env->sc_queue_flags & QUEUE_ENCRYPTION) { 433 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); 434 if (enclen == 0) 435 return (0); 436 evp = encbuf; 437 evplen = enclen; 438 } 439 440 if (env->sc_queue_flags & QUEUE_COMPRESSION) { 441 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); 442 if (complen == 0) 443 return (0); 444 evp = compbuf; 445 evplen = complen; 446 } 447 448 return (envelope_load_buffer(ep, evp, evplen)); 449 } 450 451 static void 452 queue_envelope_cache_add(struct envelope *e) 453 { 454 struct envelope *cached; 455 456 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) 457 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); 458 459 cached = xcalloc(1, sizeof *cached); 460 *cached = *e; 461 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 462 tree_xset(&evpcache_tree, e->id, cached); 463 stat_increment("queue.evpcache.size", 1); 464 } 465 466 static void 467 queue_envelope_cache_update(struct envelope *e) 468 { 469 struct envelope *cached; 470 471 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { 472 queue_envelope_cache_add(e); 473 stat_increment("queue.evpcache.update.missed", 1); 474 } else { 475 TAILQ_REMOVE(&evpcache_list, cached, entry); 476 *cached = *e; 477 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); 478 stat_increment("queue.evpcache.update.hit", 1); 479 } 480 } 481 482 static void 483 queue_envelope_cache_del(uint64_t evpid) 484 { 485 struct envelope *cached; 486 487 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) 488 return; 489 490 TAILQ_REMOVE(&evpcache_list, cached, entry); 491 free(cached); 492 stat_decrement("queue.evpcache.size", 1); 493 } 494 495 int 496 queue_envelope_create(struct envelope *ep) 497 { 498 int r; 499 char evpbuf[sizeof(struct envelope)]; 500 size_t evplen; 501 uint64_t evpid; 502 uint32_t msgid; 503 504 ep->creation = time(NULL); 505 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 506 if (evplen == 0) 507 return (0); 508 509 evpid = ep->id; 510 msgid = evpid_to_msgid(evpid); 511 512 profile_enter("queue_envelope_create"); 513 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); 514 profile_leave(); 515 516 log_trace(TRACE_QUEUE, 517 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", 518 evpid, evplen, r, ep->id); 519 520 if (!r) { 521 ep->creation = 0; 522 ep->id = 0; 523 } 524 525 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 526 queue_envelope_cache_add(ep); 527 528 return (r); 529 } 530 531 int 532 queue_envelope_delete(uint64_t evpid) 533 { 534 int r; 535 536 if (env->sc_queue_flags & QUEUE_EVPCACHE) 537 queue_envelope_cache_del(evpid); 538 539 profile_enter("queue_envelope_delete"); 540 r = handler_envelope_delete(evpid); 541 profile_leave(); 542 543 log_trace(TRACE_QUEUE, 544 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", 545 evpid, r); 546 547 return (r); 548 } 549 550 int 551 queue_envelope_load(uint64_t evpid, struct envelope *ep) 552 { 553 const char *e; 554 char evpbuf[sizeof(struct envelope)]; 555 size_t evplen; 556 struct envelope *cached; 557 558 if ((env->sc_queue_flags & QUEUE_EVPCACHE) && 559 (cached = tree_get(&evpcache_tree, evpid))) { 560 *ep = *cached; 561 stat_increment("queue.evpcache.load.hit", 1); 562 return (1); 563 } 564 565 ep->id = evpid; 566 profile_enter("queue_envelope_load"); 567 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); 568 profile_leave(); 569 570 log_trace(TRACE_QUEUE, 571 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", 572 evpid, evplen); 573 574 if (evplen == 0) 575 return (0); 576 577 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { 578 if ((e = envelope_validate(ep)) == NULL) { 579 ep->id = evpid; 580 if (env->sc_queue_flags & QUEUE_EVPCACHE) { 581 queue_envelope_cache_add(ep); 582 stat_increment("queue.evpcache.load.missed", 1); 583 } 584 return (1); 585 } 586 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 587 evpid, e); 588 } 589 return (0); 590 } 591 592 int 593 queue_envelope_update(struct envelope *ep) 594 { 595 char evpbuf[sizeof(struct envelope)]; 596 size_t evplen; 597 int r; 598 599 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); 600 if (evplen == 0) 601 return (0); 602 603 profile_enter("queue_envelope_update"); 604 r = handler_envelope_update(ep->id, evpbuf, evplen); 605 profile_leave(); 606 607 if (r && env->sc_queue_flags & QUEUE_EVPCACHE) 608 queue_envelope_cache_update(ep); 609 610 log_trace(TRACE_QUEUE, 611 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", 612 ep->id, r); 613 614 return (r); 615 } 616 617 int 618 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) 619 { 620 char evpbuf[sizeof(struct envelope)]; 621 uint64_t evpid; 622 int r; 623 const char *e; 624 625 profile_enter("queue_message_walk"); 626 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, 627 msgid, done, data); 628 profile_leave(); 629 630 log_trace(TRACE_QUEUE, 631 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", 632 r, evpid); 633 634 if (r == -1) 635 return (r); 636 637 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 638 if ((e = envelope_validate(ep)) == NULL) { 639 ep->id = evpid; 640 /* 641 * do not cache the envelope here, while discovering 642 * envelopes one could re-run discover on already 643 * scheduled envelopes which leads to triggering of 644 * strict checks in caching. Envelopes could anyway 645 * be loaded from backend if it isn't cached. 646 */ 647 return (1); 648 } 649 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 650 evpid, e); 651 } 652 return (0); 653 } 654 655 int 656 queue_envelope_walk(struct envelope *ep) 657 { 658 const char *e; 659 uint64_t evpid; 660 char evpbuf[sizeof(struct envelope)]; 661 int r; 662 663 profile_enter("queue_envelope_walk"); 664 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); 665 profile_leave(); 666 667 log_trace(TRACE_QUEUE, 668 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", 669 r, evpid); 670 671 if (r == -1) 672 return (r); 673 674 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { 675 if ((e = envelope_validate(ep)) == NULL) { 676 ep->id = evpid; 677 if (env->sc_queue_flags & QUEUE_EVPCACHE) 678 queue_envelope_cache_add(ep); 679 return (1); 680 } 681 log_warnx("warn: invalid envelope %016" PRIx64 ": %s", 682 evpid, e); 683 } 684 return (0); 685 } 686 687 uint32_t 688 queue_generate_msgid(void) 689 { 690 uint32_t msgid; 691 692 while ((msgid = arc4random()) == 0) 693 ; 694 695 return msgid; 696 } 697 698 uint64_t 699 queue_generate_evpid(uint32_t msgid) 700 { 701 uint32_t rnd; 702 uint64_t evpid; 703 704 while ((rnd = arc4random()) == 0) 705 ; 706 707 evpid = msgid; 708 evpid <<= 32; 709 evpid |= rnd; 710 711 return evpid; 712 } 713 714 static const char* 715 envelope_validate(struct envelope *ep) 716 { 717 if (ep->version != SMTPD_ENVELOPE_VERSION) 718 return "version mismatch"; 719 720 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) 721 return "invalid helo"; 722 if (ep->helo[0] == '\0') 723 return "empty helo"; 724 725 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) 726 return "invalid hostname"; 727 if (ep->hostname[0] == '\0') 728 return "empty hostname"; 729 730 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) 731 return "invalid error line"; 732 733 if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL) 734 return "unknown dispatcher"; 735 736 return NULL; 737 } 738 739 void 740 queue_api_on_close(int(*cb)(void)) 741 { 742 handler_close = cb; 743 } 744 745 void 746 queue_api_on_message_create(int(*cb)(uint32_t *)) 747 { 748 handler_message_create = cb; 749 } 750 751 void 752 queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) 753 { 754 handler_message_commit = cb; 755 } 756 757 void 758 queue_api_on_message_delete(int(*cb)(uint32_t)) 759 { 760 handler_message_delete = cb; 761 } 762 763 void 764 queue_api_on_message_fd_r(int(*cb)(uint32_t)) 765 { 766 handler_message_fd_r = cb; 767 } 768 769 void 770 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) 771 { 772 handler_envelope_create = cb; 773 } 774 775 void 776 queue_api_on_envelope_delete(int(*cb)(uint64_t)) 777 { 778 handler_envelope_delete = cb; 779 } 780 781 void 782 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) 783 { 784 handler_envelope_update = cb; 785 } 786 787 void 788 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) 789 { 790 handler_envelope_load = cb; 791 } 792 793 void 794 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) 795 { 796 handler_envelope_walk = cb; 797 } 798 799 void 800 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, 801 uint32_t, int *, void **)) 802 { 803 handler_message_walk = cb; 804 } 805