1 /* $OpenBSD: queue_backend.c,v 1.69 2023/05/31 16:51:46 op Exp $ */
2
3 /*
4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <grp.h>
22 #include <inttypes.h>
23 #include <pwd.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28
29 #include "smtpd.h"
30 #include "log.h"
31
32 static const char* envelope_validate(struct envelope *);
33
34 extern struct queue_backend queue_backend_fs;
35 extern struct queue_backend queue_backend_null;
36 extern struct queue_backend queue_backend_proc;
37 extern struct queue_backend queue_backend_ram;
38
39 static void queue_envelope_cache_add(struct envelope *);
40 static void queue_envelope_cache_update(struct envelope *);
41 static void queue_envelope_cache_del(uint64_t evpid);
42
43 TAILQ_HEAD(evplst, envelope);
44
45 static struct tree evpcache_tree;
46 static struct evplst evpcache_list;
47 static struct queue_backend *backend;
48
49 static int (*handler_close)(void);
50 static int (*handler_message_create)(uint32_t *);
51 static int (*handler_message_commit)(uint32_t, const char*);
52 static int (*handler_message_delete)(uint32_t);
53 static int (*handler_message_fd_r)(uint32_t);
54 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
55 static int (*handler_envelope_delete)(uint64_t);
56 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
57 static int (*handler_envelope_load)(uint64_t, char *, size_t);
58 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
59 static int (*handler_message_walk)(uint64_t *, char *, size_t,
60 uint32_t, int *, void **);
61
62 #ifdef QUEUE_PROFILING
63
64 static struct {
65 struct timespec t0;
66 const char *name;
67 } profile;
68
profile_enter(const char * name)69 static inline void profile_enter(const char *name)
70 {
71 if ((profiling & PROFILE_QUEUE) == 0)
72 return;
73
74 profile.name = name;
75 clock_gettime(CLOCK_MONOTONIC, &profile.t0);
76 }
77
profile_leave(void)78 static inline void profile_leave(void)
79 {
80 struct timespec t1, dt;
81
82 if ((profiling & PROFILE_QUEUE) == 0)
83 return;
84
85 clock_gettime(CLOCK_MONOTONIC, &t1);
86 timespecsub(&t1, &profile.t0, &dt);
87 log_debug("profile-queue: %s %lld.%09ld", profile.name,
88 (long long)dt.tv_sec, dt.tv_nsec);
89 }
90 #else
91 #define profile_enter(x) do {} while (0)
92 #define profile_leave() do {} while (0)
93 #endif
94
95 static int
queue_message_path(uint32_t msgid,char * buf,size_t len)96 queue_message_path(uint32_t msgid, char *buf, size_t len)
97 {
98 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
99 }
100
101 int
queue_init(const char * name,int server)102 queue_init(const char *name, int server)
103 {
104 struct passwd *pwq;
105 struct group *gr;
106 int r;
107
108 pwq = getpwnam(SMTPD_QUEUE_USER);
109 if (pwq == NULL)
110 fatalx("unknown user %s", SMTPD_QUEUE_USER);
111
112 gr = getgrnam(SMTPD_QUEUE_GROUP);
113 if (gr == NULL)
114 fatalx("unknown group %s", SMTPD_QUEUE_GROUP);
115
116 tree_init(&evpcache_tree);
117 TAILQ_INIT(&evpcache_list);
118
119 if (!strcmp(name, "fs"))
120 backend = &queue_backend_fs;
121 else if (!strcmp(name, "null"))
122 backend = &queue_backend_null;
123 else if (!strcmp(name, "ram"))
124 backend = &queue_backend_ram;
125 else
126 backend = &queue_backend_proc;
127
128 if (server) {
129 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
130 fatalx("error in spool directory setup");
131 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
132 fatalx("error in offline directory setup");
133 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
134 fatalx("error in purge directory setup");
135
136 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
137
138 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
139 fatalx("error in purge directory setup");
140 }
141
142 r = backend->init(pwq, server, name);
143
144 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
145
146 return (r);
147 }
148
149 int
queue_close(void)150 queue_close(void)
151 {
152 if (handler_close)
153 return (handler_close());
154
155 return (1);
156 }
157
158 int
queue_message_create(uint32_t * msgid)159 queue_message_create(uint32_t *msgid)
160 {
161 int r;
162
163 profile_enter("queue_message_create");
164 r = handler_message_create(msgid);
165 profile_leave();
166
167 log_trace(TRACE_QUEUE,
168 "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
169 r, *msgid);
170
171 return (r);
172 }
173
174 int
queue_message_delete(uint32_t msgid)175 queue_message_delete(uint32_t msgid)
176 {
177 char msgpath[PATH_MAX];
178 uint64_t evpid;
179 void *iter;
180 int r;
181
182 profile_enter("queue_message_delete");
183 r = handler_message_delete(msgid);
184 profile_leave();
185
186 /* in case the message is incoming */
187 queue_message_path(msgid, msgpath, sizeof(msgpath));
188 unlink(msgpath);
189
190 /* remove remaining envelopes from the cache if any (on rollback) */
191 evpid = msgid_to_evpid(msgid);
192 for (;;) {
193 iter = NULL;
194 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
195 break;
196 if (evpid_to_msgid(evpid) != msgid)
197 break;
198 queue_envelope_cache_del(evpid);
199 }
200
201 log_trace(TRACE_QUEUE,
202 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
203
204 return (r);
205 }
206
207 int
queue_message_commit(uint32_t msgid)208 queue_message_commit(uint32_t msgid)
209 {
210 int r;
211 char msgpath[PATH_MAX];
212 char tmppath[PATH_MAX];
213 FILE *ifp = NULL;
214 FILE *ofp = NULL;
215
216 profile_enter("queue_message_commit");
217
218 queue_message_path(msgid, msgpath, sizeof(msgpath));
219
220 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
221 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
222 ifp = fopen(msgpath, "r");
223 ofp = fopen(tmppath, "w+");
224 if (ifp == NULL || ofp == NULL)
225 goto err;
226 if (!compress_file(ifp, ofp))
227 goto err;
228 fclose(ifp);
229 fclose(ofp);
230 ifp = NULL;
231 ofp = NULL;
232
233 if (rename(tmppath, msgpath) == -1) {
234 if (errno == ENOSPC)
235 return (0);
236 unlink(tmppath);
237 log_warn("rename");
238 return (0);
239 }
240 }
241
242 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
243 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
244 ifp = fopen(msgpath, "r");
245 ofp = fopen(tmppath, "w+");
246 if (ifp == NULL || ofp == NULL)
247 goto err;
248 if (!crypto_encrypt_file(ifp, ofp))
249 goto err;
250 fclose(ifp);
251 fclose(ofp);
252 ifp = NULL;
253 ofp = NULL;
254
255 if (rename(tmppath, msgpath) == -1) {
256 if (errno == ENOSPC)
257 return (0);
258 unlink(tmppath);
259 log_warn("rename");
260 return (0);
261 }
262 }
263
264 r = handler_message_commit(msgid, msgpath);
265 profile_leave();
266
267 /* in case it's not done by the backend */
268 unlink(msgpath);
269
270 log_trace(TRACE_QUEUE,
271 "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
272 msgid, r);
273
274 return (r);
275
276 err:
277 if (ifp)
278 fclose(ifp);
279 if (ofp)
280 fclose(ofp);
281 return 0;
282 }
283
284 int
queue_message_fd_r(uint32_t msgid)285 queue_message_fd_r(uint32_t msgid)
286 {
287 int fdin = -1, fdout = -1, fd = -1;
288 FILE *ifp = NULL;
289 FILE *ofp = NULL;
290
291 profile_enter("queue_message_fd_r");
292 fdin = handler_message_fd_r(msgid);
293 profile_leave();
294
295 log_trace(TRACE_QUEUE,
296 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
297
298 if (fdin == -1)
299 return (-1);
300
301 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
302 if ((fdout = mktmpfile()) == -1)
303 goto err;
304 if ((fd = dup(fdout)) == -1)
305 goto err;
306 if ((ifp = fdopen(fdin, "r")) == NULL)
307 goto err;
308 fdin = fd;
309 fd = -1;
310 if ((ofp = fdopen(fdout, "w+")) == NULL)
311 goto err;
312
313 if (!crypto_decrypt_file(ifp, ofp))
314 goto err;
315
316 fclose(ifp);
317 ifp = NULL;
318 fclose(ofp);
319 ofp = NULL;
320 lseek(fdin, SEEK_SET, 0);
321 }
322
323 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
324 if ((fdout = mktmpfile()) == -1)
325 goto err;
326 if ((fd = dup(fdout)) == -1)
327 goto err;
328 if ((ifp = fdopen(fdin, "r")) == NULL)
329 goto err;
330 fdin = fd;
331 fd = -1;
332 if ((ofp = fdopen(fdout, "w+")) == NULL)
333 goto err;
334
335 if (!uncompress_file(ifp, ofp))
336 goto err;
337
338 fclose(ifp);
339 ifp = NULL;
340 fclose(ofp);
341 ofp = NULL;
342 lseek(fdin, SEEK_SET, 0);
343 }
344
345 return (fdin);
346
347 err:
348 if (fd != -1)
349 close(fd);
350 if (fdin != -1)
351 close(fdin);
352 if (fdout != -1)
353 close(fdout);
354 if (ifp)
355 fclose(ifp);
356 if (ofp)
357 fclose(ofp);
358 return -1;
359 }
360
361 int
queue_message_fd_rw(uint32_t msgid)362 queue_message_fd_rw(uint32_t msgid)
363 {
364 char buf[PATH_MAX];
365
366 queue_message_path(msgid, buf, sizeof(buf));
367
368 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
369 }
370
371 static int
queue_envelope_dump_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)372 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
373 {
374 char *evp;
375 size_t evplen;
376 size_t complen;
377 char compbuf[sizeof(struct envelope)];
378 size_t enclen;
379 char encbuf[sizeof(struct envelope)];
380
381 evp = evpbuf;
382 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
383 if (evplen == 0)
384 return (0);
385
386 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
387 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
388 if (complen == 0)
389 return (0);
390 evp = compbuf;
391 evplen = complen;
392 }
393
394 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
395 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
396 if (enclen == 0)
397 return (0);
398 evp = encbuf;
399 evplen = enclen;
400 }
401
402 memmove(evpbuf, evp, evplen);
403
404 return (evplen);
405 }
406
407 static int
queue_envelope_load_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)408 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
409 {
410 char *evp;
411 size_t evplen;
412 char compbuf[sizeof(struct envelope)];
413 size_t complen;
414 char encbuf[sizeof(struct envelope)];
415 size_t enclen;
416
417 evp = evpbuf;
418 evplen = evpbufsize;
419
420 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
421 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
422 if (enclen == 0)
423 return (0);
424 evp = encbuf;
425 evplen = enclen;
426 }
427
428 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
429 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
430 if (complen == 0)
431 return (0);
432 evp = compbuf;
433 evplen = complen;
434 }
435
436 return (envelope_load_buffer(ep, evp, evplen));
437 }
438
439 static void
queue_envelope_cache_add(struct envelope * e)440 queue_envelope_cache_add(struct envelope *e)
441 {
442 struct envelope *cached;
443
444 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
445 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
446
447 cached = xcalloc(1, sizeof *cached);
448 *cached = *e;
449 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
450 tree_xset(&evpcache_tree, e->id, cached);
451 stat_increment("queue.evpcache.size", 1);
452 }
453
454 static void
queue_envelope_cache_update(struct envelope * e)455 queue_envelope_cache_update(struct envelope *e)
456 {
457 struct envelope *cached;
458
459 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
460 queue_envelope_cache_add(e);
461 stat_increment("queue.evpcache.update.missed", 1);
462 } else {
463 TAILQ_REMOVE(&evpcache_list, cached, entry);
464 *cached = *e;
465 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
466 stat_increment("queue.evpcache.update.hit", 1);
467 }
468 }
469
470 static void
queue_envelope_cache_del(uint64_t evpid)471 queue_envelope_cache_del(uint64_t evpid)
472 {
473 struct envelope *cached;
474
475 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
476 return;
477
478 TAILQ_REMOVE(&evpcache_list, cached, entry);
479 free(cached);
480 stat_decrement("queue.evpcache.size", 1);
481 }
482
483 int
queue_envelope_create(struct envelope * ep)484 queue_envelope_create(struct envelope *ep)
485 {
486 int r;
487 char evpbuf[sizeof(struct envelope)];
488 size_t evplen;
489 uint64_t evpid;
490 uint32_t msgid;
491
492 ep->creation = time(NULL);
493 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
494 if (evplen == 0)
495 return (0);
496
497 evpid = ep->id;
498 msgid = evpid_to_msgid(evpid);
499
500 profile_enter("queue_envelope_create");
501 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
502 profile_leave();
503
504 log_trace(TRACE_QUEUE,
505 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
506 evpid, evplen, r, ep->id);
507
508 if (!r) {
509 ep->creation = 0;
510 ep->id = 0;
511 }
512
513 if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
514 queue_envelope_cache_add(ep);
515
516 return (r);
517 }
518
519 int
queue_envelope_delete(uint64_t evpid)520 queue_envelope_delete(uint64_t evpid)
521 {
522 int r;
523
524 if (env->sc_queue_flags & QUEUE_EVPCACHE)
525 queue_envelope_cache_del(evpid);
526
527 profile_enter("queue_envelope_delete");
528 r = handler_envelope_delete(evpid);
529 profile_leave();
530
531 log_trace(TRACE_QUEUE,
532 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
533 evpid, r);
534
535 return (r);
536 }
537
538 int
queue_envelope_load(uint64_t evpid,struct envelope * ep)539 queue_envelope_load(uint64_t evpid, struct envelope *ep)
540 {
541 const char *e;
542 char evpbuf[sizeof(struct envelope)];
543 size_t evplen;
544 struct envelope *cached;
545
546 if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
547 (cached = tree_get(&evpcache_tree, evpid))) {
548 *ep = *cached;
549 stat_increment("queue.evpcache.load.hit", 1);
550 return (1);
551 }
552
553 ep->id = evpid;
554 profile_enter("queue_envelope_load");
555 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
556 profile_leave();
557
558 log_trace(TRACE_QUEUE,
559 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
560 evpid, evplen);
561
562 if (evplen == 0)
563 return (0);
564
565 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
566 if ((e = envelope_validate(ep)) == NULL) {
567 ep->id = evpid;
568 if (env->sc_queue_flags & QUEUE_EVPCACHE) {
569 queue_envelope_cache_add(ep);
570 stat_increment("queue.evpcache.load.missed", 1);
571 }
572 return (1);
573 }
574 log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
575 evpid, e);
576 }
577 return (0);
578 }
579
580 int
queue_envelope_update(struct envelope * ep)581 queue_envelope_update(struct envelope *ep)
582 {
583 char evpbuf[sizeof(struct envelope)];
584 size_t evplen;
585 int r;
586
587 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
588 if (evplen == 0)
589 return (0);
590
591 profile_enter("queue_envelope_update");
592 r = handler_envelope_update(ep->id, evpbuf, evplen);
593 profile_leave();
594
595 if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
596 queue_envelope_cache_update(ep);
597
598 log_trace(TRACE_QUEUE,
599 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
600 ep->id, r);
601
602 return (r);
603 }
604
605 int
queue_message_walk(struct envelope * ep,uint32_t msgid,int * done,void ** data)606 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
607 {
608 char evpbuf[sizeof(struct envelope)];
609 uint64_t evpid;
610 int r;
611 const char *e;
612
613 profile_enter("queue_message_walk");
614 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
615 msgid, done, data);
616 profile_leave();
617
618 log_trace(TRACE_QUEUE,
619 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
620 r, evpid);
621
622 if (r == -1)
623 return (r);
624
625 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
626 if ((e = envelope_validate(ep)) == NULL) {
627 ep->id = evpid;
628 /*
629 * do not cache the envelope here, while discovering
630 * envelopes one could re-run discover on already
631 * scheduled envelopes which leads to triggering of
632 * strict checks in caching. Envelopes could anyway
633 * be loaded from backend if it isn't cached.
634 */
635 return (1);
636 }
637 log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
638 evpid, e);
639 }
640 return (0);
641 }
642
643 int
queue_envelope_walk(struct envelope * ep)644 queue_envelope_walk(struct envelope *ep)
645 {
646 const char *e;
647 uint64_t evpid;
648 char evpbuf[sizeof(struct envelope)];
649 int r;
650
651 profile_enter("queue_envelope_walk");
652 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
653 profile_leave();
654
655 log_trace(TRACE_QUEUE,
656 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
657 r, evpid);
658
659 if (r == -1)
660 return (r);
661
662 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
663 if ((e = envelope_validate(ep)) == NULL) {
664 ep->id = evpid;
665 if (env->sc_queue_flags & QUEUE_EVPCACHE)
666 queue_envelope_cache_add(ep);
667 return (1);
668 }
669 log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
670 evpid, e);
671 }
672 return (0);
673 }
674
675 uint32_t
queue_generate_msgid(void)676 queue_generate_msgid(void)
677 {
678 uint32_t msgid;
679
680 while ((msgid = arc4random()) == 0)
681 ;
682
683 return msgid;
684 }
685
686 uint64_t
queue_generate_evpid(uint32_t msgid)687 queue_generate_evpid(uint32_t msgid)
688 {
689 uint32_t rnd;
690 uint64_t evpid;
691
692 while ((rnd = arc4random()) == 0)
693 ;
694
695 evpid = msgid;
696 evpid <<= 32;
697 evpid |= rnd;
698
699 return evpid;
700 }
701
702 static const char*
envelope_validate(struct envelope * ep)703 envelope_validate(struct envelope *ep)
704 {
705 if (ep->version != SMTPD_ENVELOPE_VERSION)
706 return "version mismatch";
707
708 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
709 return "invalid helo";
710 if (ep->helo[0] == '\0')
711 return "empty helo";
712
713 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
714 return "invalid hostname";
715 if (ep->hostname[0] == '\0')
716 return "empty hostname";
717
718 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
719 return "invalid error line";
720
721 if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
722 return "unknown dispatcher";
723
724 return NULL;
725 }
726
727 void
queue_api_on_close(int (* cb)(void))728 queue_api_on_close(int(*cb)(void))
729 {
730 handler_close = cb;
731 }
732
733 void
queue_api_on_message_create(int (* cb)(uint32_t *))734 queue_api_on_message_create(int(*cb)(uint32_t *))
735 {
736 handler_message_create = cb;
737 }
738
739 void
queue_api_on_message_commit(int (* cb)(uint32_t,const char *))740 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
741 {
742 handler_message_commit = cb;
743 }
744
745 void
queue_api_on_message_delete(int (* cb)(uint32_t))746 queue_api_on_message_delete(int(*cb)(uint32_t))
747 {
748 handler_message_delete = cb;
749 }
750
751 void
queue_api_on_message_fd_r(int (* cb)(uint32_t))752 queue_api_on_message_fd_r(int(*cb)(uint32_t))
753 {
754 handler_message_fd_r = cb;
755 }
756
757 void
queue_api_on_envelope_create(int (* cb)(uint32_t,const char *,size_t,uint64_t *))758 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
759 {
760 handler_envelope_create = cb;
761 }
762
763 void
queue_api_on_envelope_delete(int (* cb)(uint64_t))764 queue_api_on_envelope_delete(int(*cb)(uint64_t))
765 {
766 handler_envelope_delete = cb;
767 }
768
769 void
queue_api_on_envelope_update(int (* cb)(uint64_t,const char *,size_t))770 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
771 {
772 handler_envelope_update = cb;
773 }
774
775 void
queue_api_on_envelope_load(int (* cb)(uint64_t,char *,size_t))776 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
777 {
778 handler_envelope_load = cb;
779 }
780
781 void
queue_api_on_envelope_walk(int (* cb)(uint64_t *,char *,size_t))782 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
783 {
784 handler_envelope_walk = cb;
785 }
786
787 void
queue_api_on_message_walk(int (* cb)(uint64_t *,char *,size_t,uint32_t,int *,void **))788 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
789 uint32_t, int *, void **))
790 {
791 handler_message_walk = cb;
792 }
793