xref: /openbsd/usr.sbin/smtpd/queue_proc.c (revision 09467b48)
1 /*	$OpenBSD: queue_proc.c,v 1.8 2018/12/30 23:09:58 guenther Exp $	*/
2 
3 /*
4  * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <errno.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <pwd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <time.h>
36 #include <unistd.h>
37 #include <limits.h>
38 
39 #include "smtpd.h"
40 #include "log.h"
41 
42 static struct imsgbuf	 ibuf;
43 static struct imsg	 imsg;
44 static size_t		 rlen;
45 static char		*rdata;
46 
47 static void
48 queue_proc_call(void)
49 {
50 	ssize_t	n;
51 
52 	if (imsg_flush(&ibuf) == -1) {
53 		log_warn("warn: queue-proc: imsg_flush");
54 		fatalx("queue-proc: exiting");
55 	}
56 
57 	while (1) {
58 		if ((n = imsg_get(&ibuf, &imsg)) == -1) {
59 			log_warn("warn: queue-proc: imsg_get");
60 			break;
61 		}
62 		if (n) {
63 			rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
64 			rdata = imsg.data;
65 
66 			if (imsg.hdr.type != PROC_QUEUE_OK) {
67 				log_warnx("warn: queue-proc: bad response");
68 				break;
69 			}
70 			return;
71 		}
72 
73 		if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) {
74 			log_warn("warn: queue-proc: imsg_read");
75 			break;
76 		}
77 
78 		if (n == 0) {
79 			log_warnx("warn: queue-proc: pipe closed");
80 			break;
81 		}
82 	}
83 
84 	fatalx("queue-proc: exiting");
85 }
86 
87 static void
88 queue_proc_read(void *dst, size_t len)
89 {
90 	if (len > rlen) {
91 		log_warnx("warn: queue-proc: bad msg len");
92 		fatalx("queue-proc: exiting");
93 	}
94 
95 	memmove(dst, rdata, len);
96 	rlen -= len;
97 	rdata += len;
98 }
99 
100 static void
101 queue_proc_end(void)
102 {
103 	if (rlen) {
104 		log_warnx("warn: queue-proc: bogus data");
105 		fatalx("queue-proc: exiting");
106 	}
107 	imsg_free(&imsg);
108 }
109 
110 /*
111  * API
112  */
113 
114 static int
115 queue_proc_close(void)
116 {
117 	int	r;
118 
119 	imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0);
120 
121 	queue_proc_call();
122 	queue_proc_read(&r, sizeof(r));
123 	queue_proc_end();
124 
125 	return (r);
126 }
127 
128 static int
129 queue_proc_message_create(uint32_t *msgid)
130 {
131 	int	r;
132 
133 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);
134 
135 	queue_proc_call();
136 	queue_proc_read(&r, sizeof(r));
137 	if (r == 1)
138 		queue_proc_read(msgid, sizeof(*msgid));
139 	queue_proc_end();
140 
141 	return (r);
142 }
143 
144 static int
145 queue_proc_message_commit(uint32_t msgid, const char *path)
146 {
147 	int	r, fd;
148 
149 	fd = open(path, O_RDONLY);
150 	if (fd == -1) {
151 		log_warn("queue-proc: open: %s", path);
152 		return (0);
153 	}
154 
155 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
156 	    sizeof(msgid));
157 
158 	queue_proc_call();
159 	queue_proc_read(&r, sizeof(r));
160 	queue_proc_end();
161 
162 	return (r);
163 }
164 
165 static int
166 queue_proc_message_delete(uint32_t msgid)
167 {
168 	int	r;
169 
170 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
171 	    sizeof(msgid));
172 
173 	queue_proc_call();
174 	queue_proc_read(&r, sizeof(r));
175 	queue_proc_end();
176 
177 	return (r);
178 }
179 
180 static int
181 queue_proc_message_fd_r(uint32_t msgid)
182 {
183 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
184 	    sizeof(msgid));
185 
186 	queue_proc_call();
187 	queue_proc_end();
188 
189 	return (imsg.fd);
190 }
191 
192 static int
193 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
194     uint64_t *evpid)
195 {
196 	struct ibuf	*b;
197 	int		 r;
198 
199 	msgid = evpid_to_msgid(*evpid);
200 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
201 	    sizeof(msgid) + len);
202 	if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
203 	    imsg_add(b, buf, len) == -1)
204 		return (0);
205 	imsg_close(&ibuf, b);
206 
207 	queue_proc_call();
208 	queue_proc_read(&r, sizeof(r));
209 	if (r == 1)
210 		queue_proc_read(evpid, sizeof(*evpid));
211 	queue_proc_end();
212 
213 	return (r);
214 }
215 
216 static int
217 queue_proc_envelope_delete(uint64_t evpid)
218 {
219 	int	r;
220 
221 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
222 	    sizeof(evpid));
223 
224 	queue_proc_call();
225 	queue_proc_read(&r, sizeof(r));
226 	queue_proc_end();
227 
228 	return (r);
229 }
230 
231 static int
232 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
233 {
234 	struct ibuf	*b;
235 	int		 r;
236 
237 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
238 	    len + sizeof(evpid));
239 	if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
240 	    imsg_add(b, buf, len) == -1)
241 		return (0);
242 	imsg_close(&ibuf, b);
243 
244 	queue_proc_call();
245 	queue_proc_read(&r, sizeof(r));
246 	queue_proc_end();
247 
248 	return (r);
249 }
250 
251 static int
252 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
253 {
254 	int	r;
255 
256 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
257 	    sizeof(evpid));
258 
259 	queue_proc_call();
260 
261 	if (rlen > len) {
262 		log_warnx("warn: queue-proc: buf too small");
263 		fatalx("queue-proc: exiting");
264 	}
265 
266 	r = rlen;
267 	queue_proc_read(buf, rlen);
268 	queue_proc_end();
269 
270 	return (r);
271 }
272 
273 static int
274 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
275 {
276 	int	r;
277 
278 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);
279 
280 	queue_proc_call();
281 	queue_proc_read(&r, sizeof(r));
282 
283 	if (r > 0) {
284 		queue_proc_read(evpid, sizeof(*evpid));
285 		if (rlen > len) {
286 			log_warnx("warn: queue-proc: buf too small");
287 			fatalx("queue-proc: exiting");
288 		}
289 		if (r != (int)rlen) {
290 			log_warnx("warn: queue-proc: len mismatch");
291 			fatalx("queue-proc: exiting");
292 		}
293 		queue_proc_read(buf, rlen);
294 	}
295 	queue_proc_end();
296 
297 	return (r);
298 }
299 
300 static int
301 queue_proc_init(struct passwd *pw, int server, const char *conf)
302 {
303 	uint32_t	version;
304 	int		fd;
305 
306 	fd = fork_proc_backend("queue", conf, "queue-proc");
307 	if (fd == -1)
308 		fatalx("queue-proc: exiting");
309 
310 	imsg_init(&ibuf, fd);
311 
312 	version = PROC_QUEUE_API_VERSION;
313 	imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
314 	    &version, sizeof(version));
315 
316 	queue_api_on_close(queue_proc_close);
317 	queue_api_on_message_create(queue_proc_message_create);
318 	queue_api_on_message_commit(queue_proc_message_commit);
319 	queue_api_on_message_delete(queue_proc_message_delete);
320 	queue_api_on_message_fd_r(queue_proc_message_fd_r);
321 	queue_api_on_envelope_create(queue_proc_envelope_create);
322 	queue_api_on_envelope_delete(queue_proc_envelope_delete);
323 	queue_api_on_envelope_update(queue_proc_envelope_update);
324 	queue_api_on_envelope_load(queue_proc_envelope_load);
325 	queue_api_on_envelope_walk(queue_proc_envelope_walk);
326 
327 	queue_proc_call();
328 	queue_proc_end();
329 
330 	return (1);
331 }
332 
333 struct queue_backend	queue_backend_proc = {
334 	queue_proc_init,
335 };
336