xref: /openbsd/usr.sbin/smtpd/queue_proc.c (revision bf921b2a)
1 /*	$OpenBSD: queue_proc.c,v 1.14 2024/11/21 13:42:22 claudio Exp $	*/
2 
3 /*
4  * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <string.h>
22 
23 #include "smtpd.h"
24 #include "log.h"
25 
26 static struct imsgbuf	 ibuf;
27 static struct imsg	 imsg;
28 static size_t		 rlen;
29 static char		*rdata;
30 
31 static void
queue_proc_call(void)32 queue_proc_call(void)
33 {
34 	ssize_t	n;
35 
36 	if (imsgbuf_flush(&ibuf) == -1) {
37 		log_warn("warn: queue-proc: imsgbuf_flush");
38 		fatalx("queue-proc: exiting");
39 	}
40 
41 	while (1) {
42 		if ((n = imsg_get(&ibuf, &imsg)) == -1) {
43 			log_warn("warn: queue-proc: imsg_get");
44 			break;
45 		}
46 		if (n) {
47 			rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
48 			rdata = imsg.data;
49 
50 			if (imsg.hdr.type != PROC_QUEUE_OK) {
51 				log_warnx("warn: queue-proc: bad response");
52 				break;
53 			}
54 			return;
55 		}
56 
57 		if ((n = imsgbuf_read(&ibuf)) == -1) {
58 			log_warn("warn: queue-proc: imsgbuf_read");
59 			break;
60 		}
61 
62 		if (n == 0) {
63 			log_warnx("warn: queue-proc: pipe closed");
64 			break;
65 		}
66 	}
67 
68 	fatalx("queue-proc: exiting");
69 }
70 
71 static void
queue_proc_read(void * dst,size_t len)72 queue_proc_read(void *dst, size_t len)
73 {
74 	if (len > rlen) {
75 		log_warnx("warn: queue-proc: bad msg len");
76 		fatalx("queue-proc: exiting");
77 	}
78 
79 	memmove(dst, rdata, len);
80 	rlen -= len;
81 	rdata += len;
82 }
83 
84 static void
queue_proc_end(void)85 queue_proc_end(void)
86 {
87 	if (rlen) {
88 		log_warnx("warn: queue-proc: bogus data");
89 		fatalx("queue-proc: exiting");
90 	}
91 	imsg_free(&imsg);
92 }
93 
94 /*
95  * API
96  */
97 
98 static int
queue_proc_close(void)99 queue_proc_close(void)
100 {
101 	int	r;
102 
103 	imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0);
104 
105 	queue_proc_call();
106 	queue_proc_read(&r, sizeof(r));
107 	queue_proc_end();
108 
109 	return (r);
110 }
111 
112 static int
queue_proc_message_create(uint32_t * msgid)113 queue_proc_message_create(uint32_t *msgid)
114 {
115 	int	r;
116 
117 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);
118 
119 	queue_proc_call();
120 	queue_proc_read(&r, sizeof(r));
121 	if (r == 1)
122 		queue_proc_read(msgid, sizeof(*msgid));
123 	queue_proc_end();
124 
125 	return (r);
126 }
127 
128 static int
queue_proc_message_commit(uint32_t msgid,const char * path)129 queue_proc_message_commit(uint32_t msgid, const char *path)
130 {
131 	int	r, fd;
132 
133 	fd = open(path, O_RDONLY);
134 	if (fd == -1) {
135 		log_warn("queue-proc: open: %s", path);
136 		return (0);
137 	}
138 
139 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
140 	    sizeof(msgid));
141 
142 	queue_proc_call();
143 	queue_proc_read(&r, sizeof(r));
144 	queue_proc_end();
145 
146 	return (r);
147 }
148 
149 static int
queue_proc_message_delete(uint32_t msgid)150 queue_proc_message_delete(uint32_t msgid)
151 {
152 	int	r;
153 
154 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
155 	    sizeof(msgid));
156 
157 	queue_proc_call();
158 	queue_proc_read(&r, sizeof(r));
159 	queue_proc_end();
160 
161 	return (r);
162 }
163 
164 static int
queue_proc_message_fd_r(uint32_t msgid)165 queue_proc_message_fd_r(uint32_t msgid)
166 {
167 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
168 	    sizeof(msgid));
169 
170 	queue_proc_call();
171 	queue_proc_end();
172 
173 	return (imsg_get_fd(&imsg));
174 }
175 
176 static int
queue_proc_envelope_create(uint32_t msgid,const char * buf,size_t len,uint64_t * evpid)177 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
178     uint64_t *evpid)
179 {
180 	struct ibuf	*b;
181 	int		 r;
182 
183 	msgid = evpid_to_msgid(*evpid);
184 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
185 	    sizeof(msgid) + len);
186 	if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
187 	    imsg_add(b, buf, len) == -1)
188 		return (0);
189 	imsg_close(&ibuf, b);
190 
191 	queue_proc_call();
192 	queue_proc_read(&r, sizeof(r));
193 	if (r == 1)
194 		queue_proc_read(evpid, sizeof(*evpid));
195 	queue_proc_end();
196 
197 	return (r);
198 }
199 
200 static int
queue_proc_envelope_delete(uint64_t evpid)201 queue_proc_envelope_delete(uint64_t evpid)
202 {
203 	int	r;
204 
205 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
206 	    sizeof(evpid));
207 
208 	queue_proc_call();
209 	queue_proc_read(&r, sizeof(r));
210 	queue_proc_end();
211 
212 	return (r);
213 }
214 
215 static int
queue_proc_envelope_update(uint64_t evpid,const char * buf,size_t len)216 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
217 {
218 	struct ibuf	*b;
219 	int		 r;
220 
221 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
222 	    len + sizeof(evpid));
223 	if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
224 	    imsg_add(b, buf, len) == -1)
225 		return (0);
226 	imsg_close(&ibuf, b);
227 
228 	queue_proc_call();
229 	queue_proc_read(&r, sizeof(r));
230 	queue_proc_end();
231 
232 	return (r);
233 }
234 
235 static int
queue_proc_envelope_load(uint64_t evpid,char * buf,size_t len)236 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
237 {
238 	int	r;
239 
240 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
241 	    sizeof(evpid));
242 
243 	queue_proc_call();
244 
245 	if (rlen > len) {
246 		log_warnx("warn: queue-proc: buf too small");
247 		fatalx("queue-proc: exiting");
248 	}
249 
250 	r = rlen;
251 	queue_proc_read(buf, rlen);
252 	queue_proc_end();
253 
254 	return (r);
255 }
256 
257 static int
queue_proc_envelope_walk(uint64_t * evpid,char * buf,size_t len)258 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
259 {
260 	int	r;
261 
262 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);
263 
264 	queue_proc_call();
265 	queue_proc_read(&r, sizeof(r));
266 
267 	if (r > 0) {
268 		queue_proc_read(evpid, sizeof(*evpid));
269 		if (rlen > len) {
270 			log_warnx("warn: queue-proc: buf too small");
271 			fatalx("queue-proc: exiting");
272 		}
273 		if (r != (int)rlen) {
274 			log_warnx("warn: queue-proc: len mismatch");
275 			fatalx("queue-proc: exiting");
276 		}
277 		queue_proc_read(buf, rlen);
278 	}
279 	queue_proc_end();
280 
281 	return (r);
282 }
283 
284 static int
queue_proc_init(struct passwd * pw,int server,const char * conf)285 queue_proc_init(struct passwd *pw, int server, const char *conf)
286 {
287 	uint32_t	version;
288 	int		fd;
289 
290 	fd = fork_proc_backend("queue", conf, "queue-proc", 0);
291 	if (fd == -1)
292 		fatalx("queue-proc: exiting");
293 
294 	if (imsgbuf_init(&ibuf, fd) == -1)
295 		fatal("queue-proc: exiting");
296 	imsgbuf_allow_fdpass(&ibuf);
297 
298 	version = PROC_QUEUE_API_VERSION;
299 	imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
300 	    &version, sizeof(version));
301 
302 	queue_api_on_close(queue_proc_close);
303 	queue_api_on_message_create(queue_proc_message_create);
304 	queue_api_on_message_commit(queue_proc_message_commit);
305 	queue_api_on_message_delete(queue_proc_message_delete);
306 	queue_api_on_message_fd_r(queue_proc_message_fd_r);
307 	queue_api_on_envelope_create(queue_proc_envelope_create);
308 	queue_api_on_envelope_delete(queue_proc_envelope_delete);
309 	queue_api_on_envelope_update(queue_proc_envelope_update);
310 	queue_api_on_envelope_load(queue_proc_envelope_load);
311 	queue_api_on_envelope_walk(queue_proc_envelope_walk);
312 
313 	queue_proc_call();
314 	queue_proc_end();
315 
316 	return (1);
317 }
318 
319 struct queue_backend	queue_backend_proc = {
320 	queue_proc_init,
321 };
322