xref: /openbsd/usr.sbin/smtpd/queue_proc.c (revision 264ca280)
1 /*	$OpenBSD: queue_proc.c,v 1.6 2015/12/05 13:14:21 claudio Exp $	*/
2 
3 /*
4  * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/queue.h>
21 #include <sys/tree.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 
25 #include <ctype.h>
26 #include <errno.h>
27 #include <event.h>
28 #include <fcntl.h>
29 #include <imsg.h>
30 #include <inttypes.h>
31 #include <libgen.h>
32 #include <pwd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <time.h>
37 #include <unistd.h>
38 #include <limits.h>
39 
40 #include "smtpd.h"
41 #include "log.h"
42 
43 static struct imsgbuf	 ibuf;
44 static struct imsg	 imsg;
45 static size_t		 rlen;
46 static char		*rdata;
47 
48 static void
49 queue_proc_call(void)
50 {
51 	ssize_t	n;
52 
53 	if (imsg_flush(&ibuf) == -1) {
54 		log_warn("warn: queue-proc: imsg_flush");
55 		fatalx("queue-proc: exiting");
56 	}
57 
58 	while (1) {
59 		if ((n = imsg_get(&ibuf, &imsg)) == -1) {
60 			log_warn("warn: queue-proc: imsg_get");
61 			break;
62 		}
63 		if (n) {
64 			rlen = imsg.hdr.len - IMSG_HEADER_SIZE;
65 			rdata = imsg.data;
66 
67 			if (imsg.hdr.type != PROC_QUEUE_OK) {
68 				log_warnx("warn: queue-proc: bad response");
69 				break;
70 			}
71 			return;
72 		}
73 
74 		if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) {
75 			log_warn("warn: queue-proc: imsg_read");
76 			break;
77 		}
78 
79 		if (n == 0) {
80 			log_warnx("warn: queue-proc: pipe closed");
81 			break;
82 		}
83 	}
84 
85 	fatalx("queue-proc: exiting");
86 }
87 
88 static void
89 queue_proc_read(void *dst, size_t len)
90 {
91 	if (len > rlen) {
92 		log_warnx("warn: queue-proc: bad msg len");
93 		fatalx("queue-proc: exiting");
94 	}
95 
96 	memmove(dst, rdata, len);
97 	rlen -= len;
98 	rdata += len;
99 }
100 
101 static void
102 queue_proc_end(void)
103 {
104 	if (rlen) {
105 		log_warnx("warn: queue-proc: bogus data");
106 		fatalx("queue-proc: exiting");
107 	}
108 	imsg_free(&imsg);
109 }
110 
111 /*
112  * API
113  */
114 
115 static int
116 queue_proc_close(void)
117 {
118 	int	r;
119 
120 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, NULL, 0);
121 
122 	queue_proc_call();
123 	queue_proc_read(&r, sizeof(r));
124 	queue_proc_end();
125 
126 	return (r);
127 }
128 
129 static int
130 queue_proc_message_create(uint32_t *msgid)
131 {
132 	int	r;
133 
134 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0);
135 
136 	queue_proc_call();
137 	queue_proc_read(&r, sizeof(r));
138 	if (r == 1)
139 		queue_proc_read(msgid, sizeof(*msgid));
140 	queue_proc_end();
141 
142 	return (r);
143 }
144 
145 static int
146 queue_proc_message_commit(uint32_t msgid, const char *path)
147 {
148 	int	r, fd;
149 
150 	fd = open(path, O_RDONLY);
151 	if (fd == -1) {
152 		log_warn("queue-proc: open: %s", path);
153 		return (0);
154 	}
155 
156 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid,
157 	    sizeof(msgid));
158 
159 	queue_proc_call();
160 	queue_proc_read(&r, sizeof(r));
161 	queue_proc_end();
162 
163 	return (r);
164 }
165 
166 static int
167 queue_proc_message_delete(uint32_t msgid)
168 {
169 	int	r;
170 
171 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid,
172 	    sizeof(msgid));
173 
174 	queue_proc_call();
175 	queue_proc_read(&r, sizeof(r));
176 	queue_proc_end();
177 
178 	return (r);
179 }
180 
181 static int
182 queue_proc_message_fd_r(uint32_t msgid)
183 {
184 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid,
185 	    sizeof(msgid));
186 
187 	queue_proc_call();
188 	queue_proc_end();
189 
190 	return (imsg.fd);
191 }
192 
193 static int
194 queue_proc_message_corrupt(uint32_t msgid)
195 {
196 	int	r;
197 
198 	imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, &msgid,
199 	    sizeof(msgid));
200 
201 	queue_proc_call();
202 	queue_proc_read(&r, sizeof(r));
203 	queue_proc_end();
204 
205 	return (r);
206 }
207 
208 static int
209 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len,
210     uint64_t *evpid)
211 {
212 	struct ibuf	*b;
213 	int		 r;
214 
215 	msgid = evpid_to_msgid(*evpid);
216 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0,
217 	    sizeof(msgid) + len);
218 	if (imsg_add(b, &msgid, sizeof(msgid)) == -1 ||
219 	    imsg_add(b, buf, len) == -1)
220 		return (0);
221 	imsg_close(&ibuf, b);
222 
223 	queue_proc_call();
224 	queue_proc_read(&r, sizeof(r));
225 	if (r == 1)
226 		queue_proc_read(evpid, sizeof(*evpid));
227 	queue_proc_end();
228 
229 	return (r);
230 }
231 
232 static int
233 queue_proc_envelope_delete(uint64_t evpid)
234 {
235 	int	r;
236 
237 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid,
238 	    sizeof(evpid));
239 
240 	queue_proc_call();
241 	queue_proc_read(&r, sizeof(r));
242 	queue_proc_end();
243 
244 	return (r);
245 }
246 
247 static int
248 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len)
249 {
250 	struct ibuf	*b;
251 	int		 r;
252 
253 	b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0,
254 	    len + sizeof(evpid));
255 	if (imsg_add(b, &evpid, sizeof(evpid)) == -1 ||
256 	    imsg_add(b, buf, len) == -1)
257 		return (0);
258 	imsg_close(&ibuf, b);
259 
260 	queue_proc_call();
261 	queue_proc_read(&r, sizeof(r));
262 	queue_proc_end();
263 
264 	return (r);
265 }
266 
267 static int
268 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len)
269 {
270 	int	r;
271 
272 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid,
273 	    sizeof(evpid));
274 
275 	queue_proc_call();
276 
277 	if (rlen > len) {
278 		log_warnx("warn: queue-proc: buf too small");
279 		fatalx("queue-proc: exiting");
280 	}
281 
282 	r = rlen;
283 	queue_proc_read(buf, rlen);
284 	queue_proc_end();
285 
286 	return (r);
287 }
288 
289 static int
290 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len)
291 {
292 	int	r;
293 
294 	imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0);
295 
296 	queue_proc_call();
297 	queue_proc_read(&r, sizeof(r));
298 
299 	if (r > 0) {
300 		queue_proc_read(evpid, sizeof(*evpid));
301 		if (rlen > len) {
302 			log_warnx("warn: queue-proc: buf too small");
303 			fatalx("queue-proc: exiting");
304 		}
305 		if (r != (int)rlen) {
306 			log_warnx("warn: queue-proc: len mismatch");
307 			fatalx("queue-proc: exiting");
308 		}
309 		queue_proc_read(buf, rlen);
310 	}
311 	queue_proc_end();
312 
313 	return (r);
314 }
315 
316 static int
317 queue_proc_init(struct passwd *pw, int server, const char *conf)
318 {
319 	uint32_t	version;
320 	int		fd;
321 
322 	fd = fork_proc_backend("queue", conf, "queue-proc");
323 	if (fd == -1)
324 		fatalx("queue-proc: exiting");
325 
326 	imsg_init(&ibuf, fd);
327 
328 	version = PROC_QUEUE_API_VERSION;
329 	imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1,
330 	    &version, sizeof(version));
331 
332 	queue_api_on_close(queue_proc_close);
333 	queue_api_on_message_create(queue_proc_message_create);
334 	queue_api_on_message_commit(queue_proc_message_commit);
335 	queue_api_on_message_delete(queue_proc_message_delete);
336 	queue_api_on_message_fd_r(queue_proc_message_fd_r);
337 	queue_api_on_message_corrupt(queue_proc_message_corrupt);
338 	queue_api_on_envelope_create(queue_proc_envelope_create);
339 	queue_api_on_envelope_delete(queue_proc_envelope_delete);
340 	queue_api_on_envelope_update(queue_proc_envelope_update);
341 	queue_api_on_envelope_load(queue_proc_envelope_load);
342 	queue_api_on_envelope_walk(queue_proc_envelope_walk);
343 
344 	queue_proc_call();
345 	queue_proc_end();
346 
347 	return (1);
348 }
349 
350 struct queue_backend	queue_backend_proc = {
351 	queue_proc_init,
352 };
353