1 /* $OpenBSD: queue_backend.c,v 1.66 2020/04/22 11:35:34 eric Exp $ */
2
3 /*
4 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include "includes.h"
20
21 #include <sys/types.h>
22 #include <sys/queue.h>
23 #include <sys/tree.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26
27 #include <ctype.h>
28 #include <err.h>
29 #include <errno.h>
30 #include <event.h>
31 #include <fcntl.h>
32 #include <grp.h>
33 #include <imsg.h>
34 #include <limits.h>
35 #include <inttypes.h>
36 #include <pwd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 #include <unistd.h>
42
43 #include "smtpd.h"
44 #include "log.h"
45
46 static const char* envelope_validate(struct envelope *);
47
48 extern struct queue_backend queue_backend_fs;
49 extern struct queue_backend queue_backend_null;
50 extern struct queue_backend queue_backend_proc;
51 extern struct queue_backend queue_backend_ram;
52
53 static void queue_envelope_cache_add(struct envelope *);
54 static void queue_envelope_cache_update(struct envelope *);
55 static void queue_envelope_cache_del(uint64_t evpid);
56
57 TAILQ_HEAD(evplst, envelope);
58
59 static struct tree evpcache_tree;
60 static struct evplst evpcache_list;
61 static struct queue_backend *backend;
62
63 static int (*handler_close)(void);
64 static int (*handler_message_create)(uint32_t *);
65 static int (*handler_message_commit)(uint32_t, const char*);
66 static int (*handler_message_delete)(uint32_t);
67 static int (*handler_message_fd_r)(uint32_t);
68 static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
69 static int (*handler_envelope_delete)(uint64_t);
70 static int (*handler_envelope_update)(uint64_t, const char *, size_t);
71 static int (*handler_envelope_load)(uint64_t, char *, size_t);
72 static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
73 static int (*handler_message_walk)(uint64_t *, char *, size_t,
74 uint32_t, int *, void **);
75
76 #ifdef QUEUE_PROFILING
77
78 static struct {
79 struct timespec t0;
80 const char *name;
81 } profile;
82
profile_enter(const char * name)83 static inline void profile_enter(const char *name)
84 {
85 if ((profiling & PROFILE_QUEUE) == 0)
86 return;
87
88 profile.name = name;
89 clock_gettime(CLOCK_MONOTONIC, &profile.t0);
90 }
91
profile_leave(void)92 static inline void profile_leave(void)
93 {
94 struct timespec t1, dt;
95
96 if ((profiling & PROFILE_QUEUE) == 0)
97 return;
98
99 clock_gettime(CLOCK_MONOTONIC, &t1);
100 timespecsub(&t1, &profile.t0, &dt);
101 log_debug("profile-queue: %s %lld.%09ld", profile.name,
102 (long long)dt.tv_sec, dt.tv_nsec);
103 }
104 #else
105 #define profile_enter(x) do {} while (0)
106 #define profile_leave() do {} while (0)
107 #endif
108
109 static int
queue_message_path(uint32_t msgid,char * buf,size_t len)110 queue_message_path(uint32_t msgid, char *buf, size_t len)
111 {
112 return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
113 }
114
115 int
queue_init(const char * name,int server)116 queue_init(const char *name, int server)
117 {
118 struct passwd *pwq;
119 struct group *gr;
120 int r;
121
122 pwq = getpwnam(SMTPD_QUEUE_USER);
123 if (pwq == NULL)
124 errx(1, "unknown user %s", SMTPD_QUEUE_USER);
125
126 gr = getgrnam(SMTPD_QUEUE_GROUP);
127 if (gr == NULL)
128 errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
129
130 tree_init(&evpcache_tree);
131 TAILQ_INIT(&evpcache_list);
132
133 if (!strcmp(name, "fs"))
134 backend = &queue_backend_fs;
135 else if (!strcmp(name, "null"))
136 backend = &queue_backend_null;
137 else if (!strcmp(name, "ram"))
138 backend = &queue_backend_ram;
139 else
140 backend = &queue_backend_proc;
141
142 if (server) {
143 if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
144 errx(1, "error in spool directory setup");
145 if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
146 errx(1, "error in offline directory setup");
147 if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
148 errx(1, "error in purge directory setup");
149
150 mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
151
152 if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
153 errx(1, "error in purge directory setup");
154 }
155
156 r = backend->init(pwq, server, name);
157
158 log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
159
160 return (r);
161 }
162
163 int
queue_close(void)164 queue_close(void)
165 {
166 if (handler_close)
167 return (handler_close());
168
169 return (1);
170 }
171
172 int
queue_message_create(uint32_t * msgid)173 queue_message_create(uint32_t *msgid)
174 {
175 int r;
176
177 profile_enter("queue_message_create");
178 r = handler_message_create(msgid);
179 profile_leave();
180
181 log_trace(TRACE_QUEUE,
182 "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
183 r, *msgid);
184
185 return (r);
186 }
187
188 int
queue_message_delete(uint32_t msgid)189 queue_message_delete(uint32_t msgid)
190 {
191 char msgpath[PATH_MAX];
192 uint64_t evpid;
193 void *iter;
194 int r;
195
196 profile_enter("queue_message_delete");
197 r = handler_message_delete(msgid);
198 profile_leave();
199
200 /* in case the message is incoming */
201 queue_message_path(msgid, msgpath, sizeof(msgpath));
202 unlink(msgpath);
203
204 /* remove remaining envelopes from the cache if any (on rollback) */
205 evpid = msgid_to_evpid(msgid);
206 for (;;) {
207 iter = NULL;
208 if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
209 break;
210 if (evpid_to_msgid(evpid) != msgid)
211 break;
212 queue_envelope_cache_del(evpid);
213 }
214
215 log_trace(TRACE_QUEUE,
216 "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
217
218 return (r);
219 }
220
221 int
queue_message_commit(uint32_t msgid)222 queue_message_commit(uint32_t msgid)
223 {
224 int r;
225 char msgpath[PATH_MAX];
226 char tmppath[PATH_MAX];
227 FILE *ifp = NULL;
228 FILE *ofp = NULL;
229
230 profile_enter("queue_message_commit");
231
232 queue_message_path(msgid, msgpath, sizeof(msgpath));
233
234 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
235 bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
236 ifp = fopen(msgpath, "r");
237 ofp = fopen(tmppath, "w+");
238 if (ifp == NULL || ofp == NULL)
239 goto err;
240 if (!compress_file(ifp, ofp))
241 goto err;
242 fclose(ifp);
243 fclose(ofp);
244 ifp = NULL;
245 ofp = NULL;
246
247 if (rename(tmppath, msgpath) == -1) {
248 if (errno == ENOSPC)
249 return (0);
250 unlink(tmppath);
251 log_warn("rename");
252 return (0);
253 }
254 }
255
256 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
257 bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
258 ifp = fopen(msgpath, "r");
259 ofp = fopen(tmppath, "w+");
260 if (ifp == NULL || ofp == NULL)
261 goto err;
262 if (!crypto_encrypt_file(ifp, ofp))
263 goto err;
264 fclose(ifp);
265 fclose(ofp);
266 ifp = NULL;
267 ofp = NULL;
268
269 if (rename(tmppath, msgpath) == -1) {
270 if (errno == ENOSPC)
271 return (0);
272 unlink(tmppath);
273 log_warn("rename");
274 return (0);
275 }
276 }
277
278 r = handler_message_commit(msgid, msgpath);
279 profile_leave();
280
281 /* in case it's not done by the backend */
282 unlink(msgpath);
283
284 log_trace(TRACE_QUEUE,
285 "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
286 msgid, r);
287
288 return (r);
289
290 err:
291 if (ifp)
292 fclose(ifp);
293 if (ofp)
294 fclose(ofp);
295 return 0;
296 }
297
298 int
queue_message_fd_r(uint32_t msgid)299 queue_message_fd_r(uint32_t msgid)
300 {
301 int fdin = -1, fdout = -1, fd = -1;
302 FILE *ifp = NULL;
303 FILE *ofp = NULL;
304
305 profile_enter("queue_message_fd_r");
306 fdin = handler_message_fd_r(msgid);
307 profile_leave();
308
309 log_trace(TRACE_QUEUE,
310 "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
311
312 if (fdin == -1)
313 return (-1);
314
315 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
316 if ((fdout = mktmpfile()) == -1)
317 goto err;
318 if ((fd = dup(fdout)) == -1)
319 goto err;
320 if ((ifp = fdopen(fdin, "r")) == NULL)
321 goto err;
322 fdin = fd;
323 fd = -1;
324 if ((ofp = fdopen(fdout, "w+")) == NULL)
325 goto err;
326
327 if (!crypto_decrypt_file(ifp, ofp))
328 goto err;
329
330 fclose(ifp);
331 ifp = NULL;
332 fclose(ofp);
333 ofp = NULL;
334 lseek(fdin, SEEK_SET, 0);
335 }
336
337 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
338 if ((fdout = mktmpfile()) == -1)
339 goto err;
340 if ((fd = dup(fdout)) == -1)
341 goto err;
342 if ((ifp = fdopen(fdin, "r")) == NULL)
343 goto err;
344 fdin = fd;
345 fd = -1;
346 if ((ofp = fdopen(fdout, "w+")) == NULL)
347 goto err;
348
349 if (!uncompress_file(ifp, ofp))
350 goto err;
351
352 fclose(ifp);
353 ifp = NULL;
354 fclose(ofp);
355 ofp = NULL;
356 lseek(fdin, SEEK_SET, 0);
357 }
358
359 return (fdin);
360
361 err:
362 if (fd != -1)
363 close(fd);
364 if (fdin != -1)
365 close(fdin);
366 if (fdout != -1)
367 close(fdout);
368 if (ifp)
369 fclose(ifp);
370 if (ofp)
371 fclose(ofp);
372 return -1;
373 }
374
375 int
queue_message_fd_rw(uint32_t msgid)376 queue_message_fd_rw(uint32_t msgid)
377 {
378 char buf[PATH_MAX];
379
380 queue_message_path(msgid, buf, sizeof(buf));
381
382 return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
383 }
384
385 static int
queue_envelope_dump_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)386 queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
387 {
388 char *evp;
389 size_t evplen;
390 size_t complen;
391 char compbuf[sizeof(struct envelope)];
392 size_t enclen;
393 char encbuf[sizeof(struct envelope)];
394
395 evp = evpbuf;
396 evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
397 if (evplen == 0)
398 return (0);
399
400 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
401 complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
402 if (complen == 0)
403 return (0);
404 evp = compbuf;
405 evplen = complen;
406 }
407
408 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
409 enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
410 if (enclen == 0)
411 return (0);
412 evp = encbuf;
413 evplen = enclen;
414 }
415
416 memmove(evpbuf, evp, evplen);
417
418 return (evplen);
419 }
420
421 static int
queue_envelope_load_buffer(struct envelope * ep,char * evpbuf,size_t evpbufsize)422 queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
423 {
424 char *evp;
425 size_t evplen;
426 char compbuf[sizeof(struct envelope)];
427 size_t complen;
428 char encbuf[sizeof(struct envelope)];
429 size_t enclen;
430
431 evp = evpbuf;
432 evplen = evpbufsize;
433
434 if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
435 enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
436 if (enclen == 0)
437 return (0);
438 evp = encbuf;
439 evplen = enclen;
440 }
441
442 if (env->sc_queue_flags & QUEUE_COMPRESSION) {
443 complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
444 if (complen == 0)
445 return (0);
446 evp = compbuf;
447 evplen = complen;
448 }
449
450 return (envelope_load_buffer(ep, evp, evplen));
451 }
452
453 static void
queue_envelope_cache_add(struct envelope * e)454 queue_envelope_cache_add(struct envelope *e)
455 {
456 struct envelope *cached;
457
458 while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
459 queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
460
461 cached = xcalloc(1, sizeof *cached);
462 *cached = *e;
463 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
464 tree_xset(&evpcache_tree, e->id, cached);
465 stat_increment("queue.evpcache.size", 1);
466 }
467
468 static void
queue_envelope_cache_update(struct envelope * e)469 queue_envelope_cache_update(struct envelope *e)
470 {
471 struct envelope *cached;
472
473 if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
474 queue_envelope_cache_add(e);
475 stat_increment("queue.evpcache.update.missed", 1);
476 } else {
477 TAILQ_REMOVE(&evpcache_list, cached, entry);
478 *cached = *e;
479 TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
480 stat_increment("queue.evpcache.update.hit", 1);
481 }
482 }
483
484 static void
queue_envelope_cache_del(uint64_t evpid)485 queue_envelope_cache_del(uint64_t evpid)
486 {
487 struct envelope *cached;
488
489 if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
490 return;
491
492 TAILQ_REMOVE(&evpcache_list, cached, entry);
493 free(cached);
494 stat_decrement("queue.evpcache.size", 1);
495 }
496
497 int
queue_envelope_create(struct envelope * ep)498 queue_envelope_create(struct envelope *ep)
499 {
500 int r;
501 char evpbuf[sizeof(struct envelope)];
502 size_t evplen;
503 uint64_t evpid;
504 uint32_t msgid;
505
506 ep->creation = time(NULL);
507 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
508 if (evplen == 0)
509 return (0);
510
511 evpid = ep->id;
512 msgid = evpid_to_msgid(evpid);
513
514 profile_enter("queue_envelope_create");
515 r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
516 profile_leave();
517
518 log_trace(TRACE_QUEUE,
519 "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
520 evpid, evplen, r, ep->id);
521
522 if (!r) {
523 ep->creation = 0;
524 ep->id = 0;
525 }
526
527 if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
528 queue_envelope_cache_add(ep);
529
530 return (r);
531 }
532
533 int
queue_envelope_delete(uint64_t evpid)534 queue_envelope_delete(uint64_t evpid)
535 {
536 int r;
537
538 if (env->sc_queue_flags & QUEUE_EVPCACHE)
539 queue_envelope_cache_del(evpid);
540
541 profile_enter("queue_envelope_delete");
542 r = handler_envelope_delete(evpid);
543 profile_leave();
544
545 log_trace(TRACE_QUEUE,
546 "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
547 evpid, r);
548
549 return (r);
550 }
551
552 int
queue_envelope_load(uint64_t evpid,struct envelope * ep)553 queue_envelope_load(uint64_t evpid, struct envelope *ep)
554 {
555 const char *e;
556 char evpbuf[sizeof(struct envelope)];
557 size_t evplen;
558 struct envelope *cached;
559
560 if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
561 (cached = tree_get(&evpcache_tree, evpid))) {
562 *ep = *cached;
563 stat_increment("queue.evpcache.load.hit", 1);
564 return (1);
565 }
566
567 ep->id = evpid;
568 profile_enter("queue_envelope_load");
569 evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
570 profile_leave();
571
572 log_trace(TRACE_QUEUE,
573 "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
574 evpid, evplen);
575
576 if (evplen == 0)
577 return (0);
578
579 if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
580 if ((e = envelope_validate(ep)) == NULL) {
581 ep->id = evpid;
582 if (env->sc_queue_flags & QUEUE_EVPCACHE) {
583 queue_envelope_cache_add(ep);
584 stat_increment("queue.evpcache.load.missed", 1);
585 }
586 return (1);
587 }
588 log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
589 evpid, e);
590 }
591 return (0);
592 }
593
594 int
queue_envelope_update(struct envelope * ep)595 queue_envelope_update(struct envelope *ep)
596 {
597 char evpbuf[sizeof(struct envelope)];
598 size_t evplen;
599 int r;
600
601 evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
602 if (evplen == 0)
603 return (0);
604
605 profile_enter("queue_envelope_update");
606 r = handler_envelope_update(ep->id, evpbuf, evplen);
607 profile_leave();
608
609 if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
610 queue_envelope_cache_update(ep);
611
612 log_trace(TRACE_QUEUE,
613 "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
614 ep->id, r);
615
616 return (r);
617 }
618
619 int
queue_message_walk(struct envelope * ep,uint32_t msgid,int * done,void ** data)620 queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
621 {
622 char evpbuf[sizeof(struct envelope)];
623 uint64_t evpid;
624 int r;
625 const char *e;
626
627 profile_enter("queue_message_walk");
628 r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
629 msgid, done, data);
630 profile_leave();
631
632 log_trace(TRACE_QUEUE,
633 "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
634 r, evpid);
635
636 if (r == -1)
637 return (r);
638
639 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
640 if ((e = envelope_validate(ep)) == NULL) {
641 ep->id = evpid;
642 /*
643 * do not cache the envelope here, while discovering
644 * envelopes one could re-run discover on already
645 * scheduled envelopes which leads to triggering of
646 * strict checks in caching. Envelopes could anyway
647 * be loaded from backend if it isn't cached.
648 */
649 return (1);
650 }
651 log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
652 evpid, e);
653 }
654 return (0);
655 }
656
657 int
queue_envelope_walk(struct envelope * ep)658 queue_envelope_walk(struct envelope *ep)
659 {
660 const char *e;
661 uint64_t evpid;
662 char evpbuf[sizeof(struct envelope)];
663 int r;
664
665 profile_enter("queue_envelope_walk");
666 r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
667 profile_leave();
668
669 log_trace(TRACE_QUEUE,
670 "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
671 r, evpid);
672
673 if (r == -1)
674 return (r);
675
676 if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
677 if ((e = envelope_validate(ep)) == NULL) {
678 ep->id = evpid;
679 if (env->sc_queue_flags & QUEUE_EVPCACHE)
680 queue_envelope_cache_add(ep);
681 return (1);
682 }
683 log_warnx("warn: invalid envelope %016" PRIx64 ": %s",
684 evpid, e);
685 }
686 return (0);
687 }
688
689 uint32_t
queue_generate_msgid(void)690 queue_generate_msgid(void)
691 {
692 uint32_t msgid;
693
694 while ((msgid = arc4random()) == 0)
695 ;
696
697 return msgid;
698 }
699
700 uint64_t
queue_generate_evpid(uint32_t msgid)701 queue_generate_evpid(uint32_t msgid)
702 {
703 uint32_t rnd;
704 uint64_t evpid;
705
706 while ((rnd = arc4random()) == 0)
707 ;
708
709 evpid = msgid;
710 evpid <<= 32;
711 evpid |= rnd;
712
713 return evpid;
714 }
715
716 static const char*
envelope_validate(struct envelope * ep)717 envelope_validate(struct envelope *ep)
718 {
719 if (ep->version != SMTPD_ENVELOPE_VERSION)
720 return "version mismatch";
721
722 if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
723 return "invalid helo";
724 if (ep->helo[0] == '\0')
725 return "empty helo";
726
727 if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
728 return "invalid hostname";
729 if (ep->hostname[0] == '\0')
730 return "empty hostname";
731
732 if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
733 return "invalid error line";
734
735 if (dict_get(env->sc_dispatchers, ep->dispatcher) == NULL)
736 return "unknown dispatcher";
737
738 return NULL;
739 }
740
741 void
queue_api_on_close(int (* cb)(void))742 queue_api_on_close(int(*cb)(void))
743 {
744 handler_close = cb;
745 }
746
747 void
queue_api_on_message_create(int (* cb)(uint32_t *))748 queue_api_on_message_create(int(*cb)(uint32_t *))
749 {
750 handler_message_create = cb;
751 }
752
753 void
queue_api_on_message_commit(int (* cb)(uint32_t,const char *))754 queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
755 {
756 handler_message_commit = cb;
757 }
758
759 void
queue_api_on_message_delete(int (* cb)(uint32_t))760 queue_api_on_message_delete(int(*cb)(uint32_t))
761 {
762 handler_message_delete = cb;
763 }
764
765 void
queue_api_on_message_fd_r(int (* cb)(uint32_t))766 queue_api_on_message_fd_r(int(*cb)(uint32_t))
767 {
768 handler_message_fd_r = cb;
769 }
770
771 void
queue_api_on_envelope_create(int (* cb)(uint32_t,const char *,size_t,uint64_t *))772 queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
773 {
774 handler_envelope_create = cb;
775 }
776
777 void
queue_api_on_envelope_delete(int (* cb)(uint64_t))778 queue_api_on_envelope_delete(int(*cb)(uint64_t))
779 {
780 handler_envelope_delete = cb;
781 }
782
783 void
queue_api_on_envelope_update(int (* cb)(uint64_t,const char *,size_t))784 queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
785 {
786 handler_envelope_update = cb;
787 }
788
789 void
queue_api_on_envelope_load(int (* cb)(uint64_t,char *,size_t))790 queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
791 {
792 handler_envelope_load = cb;
793 }
794
795 void
queue_api_on_envelope_walk(int (* cb)(uint64_t *,char *,size_t))796 queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
797 {
798 handler_envelope_walk = cb;
799 }
800
801 void
queue_api_on_message_walk(int (* cb)(uint64_t *,char *,size_t,uint32_t,int *,void **))802 queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
803 uint32_t, int *, void **))
804 {
805 handler_message_walk = cb;
806 }
807