1 /* $OpenBSD: queue_proc.c,v 1.14 2024/11/21 13:42:22 claudio Exp $ */
2
3 /*
4 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <string.h>
22
23 #include "smtpd.h"
24 #include "log.h"
25
26 static struct imsgbuf ibuf;
27 static struct imsg imsg;
28 static size_t rlen;
29 static char *rdata;
30
31 static void
queue_proc_call(void)32 queue_proc_call(void)
33 {
34 ssize_t n;
35
36 if (imsgbuf_flush(&ibuf) == -1) {
37 log_warn("warn: queue-proc: imsgbuf_flush");
38 fatalx("queue-proc: exiting");
39 }
40
41 while (1) {
42 if ((n = imsg_get(&ibuf, &imsg)) == -1) {
43 log_warn("warn: queue-proc: imsg_get");
44 break;
45 }
46 if (n) {
47 rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
48 rdata = imsg.data;
49
50 if (imsg.hdr.type != PROC_QUEUE_OK) {
51 log_warnx("warn: queue-proc: bad response");
52 break;
53 }
54 return;
55 }
56
57 if ((n = imsgbuf_read(&ibuf)) == -1) {
58 log_warn("warn: queue-proc: imsgbuf_read");
59 break;
60 }
61
62 if (n == 0) {
63 log_warnx("warn: queue-proc: pipe closed");
64 break;
65 }
66 }
67
68 fatalx("queue-proc: exiting");
69 }
70
71 static void
queue_proc_read(void * dst,size_t len)72 queue_proc_read(void *dst, size_t len)
73 {
74 if (len > rlen) {
75 log_warnx("warn: queue-proc: bad msg len");
76 fatalx("queue-proc: exiting");
77 }
78
79 memmove(dst, rdata, len);
80 rlen -= len;
81 rdata += len;
82 }
83
84 static void
queue_proc_end(void)85 queue_proc_end(void)
86 {
87 if (rlen) {
88 log_warnx("warn: queue-proc: bogus data");
89 fatalx("queue-proc: exiting");
90 }
91 imsg_free(&imsg);
92 }
93
94 /*
95 * API
96 */
97
98 static int
queue_proc_close(void)99 queue_proc_close(void)
100 {
101 int r;
102
103 imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0);
104
105 queue_proc_call();
106 queue_proc_read(&r, sizeof(r));
107 queue_proc_end();
108
109 return (r);
110 }
111
112 static int
queue_proc_message_create(uint32_t * msgid)113 queue_proc_message_create(uint32_t *msgid)
114 {
115 int r;
116
117 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);
118
119 queue_proc_call();
120 queue_proc_read(&r, sizeof(r));
121 if (r == 1)
122 queue_proc_read(msgid, sizeof(*msgid));
123 queue_proc_end();
124
125 return (r);
126 }
127
128 static int
queue_proc_message_commit(uint32_t msgid,const char * path)129 queue_proc_message_commit(uint32_t msgid, const char *path)
130 {
131 int r, fd;
132
133 fd = open(path, O_RDONLY);
134 if (fd == -1) {
135 log_warn("queue-proc: open: %s", path);
136 return (0);
137 }
138
139 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
140 sizeof(msgid));
141
142 queue_proc_call();
143 queue_proc_read(&r, sizeof(r));
144 queue_proc_end();
145
146 return (r);
147 }
148
149 static int
queue_proc_message_delete(uint32_t msgid)150 queue_proc_message_delete(uint32_t msgid)
151 {
152 int r;
153
154 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
155 sizeof(msgid));
156
157 queue_proc_call();
158 queue_proc_read(&r, sizeof(r));
159 queue_proc_end();
160
161 return (r);
162 }
163
164 static int
queue_proc_message_fd_r(uint32_t msgid)165 queue_proc_message_fd_r(uint32_t msgid)
166 {
167 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
168 sizeof(msgid));
169
170 queue_proc_call();
171 queue_proc_end();
172
173 return (imsg_get_fd(&imsg));
174 }
175
176 static int
queue_proc_envelope_create(uint32_t msgid,const char * buf,size_t len,uint64_t * evpid)177 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
178 uint64_t *evpid)
179 {
180 struct ibuf *b;
181 int r;
182
183 msgid = evpid_to_msgid(*evpid);
184 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
185 sizeof(msgid) + len);
186 if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
187 imsg_add(b, buf, len) == -1)
188 return (0);
189 imsg_close(&ibuf, b);
190
191 queue_proc_call();
192 queue_proc_read(&r, sizeof(r));
193 if (r == 1)
194 queue_proc_read(evpid, sizeof(*evpid));
195 queue_proc_end();
196
197 return (r);
198 }
199
200 static int
queue_proc_envelope_delete(uint64_t evpid)201 queue_proc_envelope_delete(uint64_t evpid)
202 {
203 int r;
204
205 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
206 sizeof(evpid));
207
208 queue_proc_call();
209 queue_proc_read(&r, sizeof(r));
210 queue_proc_end();
211
212 return (r);
213 }
214
215 static int
queue_proc_envelope_update(uint64_t evpid,const char * buf,size_t len)216 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
217 {
218 struct ibuf *b;
219 int r;
220
221 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
222 len + sizeof(evpid));
223 if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
224 imsg_add(b, buf, len) == -1)
225 return (0);
226 imsg_close(&ibuf, b);
227
228 queue_proc_call();
229 queue_proc_read(&r, sizeof(r));
230 queue_proc_end();
231
232 return (r);
233 }
234
235 static int
queue_proc_envelope_load(uint64_t evpid,char * buf,size_t len)236 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
237 {
238 int r;
239
240 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
241 sizeof(evpid));
242
243 queue_proc_call();
244
245 if (rlen > len) {
246 log_warnx("warn: queue-proc: buf too small");
247 fatalx("queue-proc: exiting");
248 }
249
250 r = rlen;
251 queue_proc_read(buf, rlen);
252 queue_proc_end();
253
254 return (r);
255 }
256
257 static int
queue_proc_envelope_walk(uint64_t * evpid,char * buf,size_t len)258 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
259 {
260 int r;
261
262 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);
263
264 queue_proc_call();
265 queue_proc_read(&r, sizeof(r));
266
267 if (r > 0) {
268 queue_proc_read(evpid, sizeof(*evpid));
269 if (rlen > len) {
270 log_warnx("warn: queue-proc: buf too small");
271 fatalx("queue-proc: exiting");
272 }
273 if (r != (int)rlen) {
274 log_warnx("warn: queue-proc: len mismatch");
275 fatalx("queue-proc: exiting");
276 }
277 queue_proc_read(buf, rlen);
278 }
279 queue_proc_end();
280
281 return (r);
282 }
283
284 static int
queue_proc_init(struct passwd * pw,int server,const char * conf)285 queue_proc_init(struct passwd *pw, int server, const char *conf)
286 {
287 uint32_t version;
288 int fd;
289
290 fd = fork_proc_backend("queue", conf, "queue-proc", 0);
291 if (fd == -1)
292 fatalx("queue-proc: exiting");
293
294 if (imsgbuf_init(&ibuf, fd) == -1)
295 fatal("queue-proc: exiting");
296 imsgbuf_allow_fdpass(&ibuf);
297
298 version = PROC_QUEUE_API_VERSION;
299 imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
300 &version, sizeof(version));
301
302 queue_api_on_close(queue_proc_close);
303 queue_api_on_message_create(queue_proc_message_create);
304 queue_api_on_message_commit(queue_proc_message_commit);
305 queue_api_on_message_delete(queue_proc_message_delete);
306 queue_api_on_message_fd_r(queue_proc_message_fd_r);
307 queue_api_on_envelope_create(queue_proc_envelope_create);
308 queue_api_on_envelope_delete(queue_proc_envelope_delete);
309 queue_api_on_envelope_update(queue_proc_envelope_update);
310 queue_api_on_envelope_load(queue_proc_envelope_load);
311 queue_api_on_envelope_walk(queue_proc_envelope_walk);
312
313 queue_proc_call();
314 queue_proc_end();
315
316 return (1);
317 }
318
319 struct queue_backend queue_backend_proc = {
320 queue_proc_init,
321 };
322