1 /* $OpenBSD: queue_proc.c,v 1.8 2018/12/30 23:09:58 guenther Exp $ */ 2 3 /* 4 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <errno.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <pwd.h> 32 #include <stdio.h> 33 #include <stdlib.h> 34 #include <string.h> 35 #include <time.h> 36 #include <unistd.h> 37 #include <limits.h> 38 39 #include "smtpd.h" 40 #include "log.h" 41 42 static struct imsgbuf ibuf; 43 static struct imsg imsg; 44 static size_t rlen; 45 static char *rdata; 46 47 static void 48 queue_proc_call(void) 49 { 50 ssize_t n; 51 52 if (imsg_flush(&ibuf) == -1) { 53 log_warn("warn: queue-proc: imsg_flush"); 54 fatalx("queue-proc: exiting"); 55 } 56 57 while (1) { 58 if ((n = imsg_get(&ibuf, &imsg)) == -1) { 59 log_warn("warn: queue-proc: imsg_get"); 60 break; 61 } 62 if (n) { 63 rlen = imsg.hdr.len - IMSG_HEADER_SIZE; 64 rdata = imsg.data; 65 66 if (imsg.hdr.type != PROC_QUEUE_OK) { 67 log_warnx("warn: queue-proc: bad response"); 68 break; 69 } 70 return; 71 } 72 73 if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) { 74 log_warn("warn: queue-proc: imsg_read"); 75 break; 76 } 77 78 if (n == 0) { 79 log_warnx("warn: queue-proc: pipe closed"); 80 break; 81 } 82 } 83 84 fatalx("queue-proc: exiting"); 85 } 86 87 static void 88 queue_proc_read(void *dst, size_t len) 89 { 90 if (len > rlen) { 91 log_warnx("warn: queue-proc: bad msg len"); 92 fatalx("queue-proc: exiting"); 93 } 94 95 memmove(dst, rdata, len); 96 rlen -= len; 97 rdata += len; 98 } 99 100 static void 101 queue_proc_end(void) 102 { 103 if (rlen) { 104 log_warnx("warn: queue-proc: bogus data"); 105 fatalx("queue-proc: exiting"); 106 } 107 imsg_free(&imsg); 108 } 109 110 /* 111 * API 112 */ 113 114 static int 115 queue_proc_close(void) 116 { 117 int r; 118 119 imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0); 120 121 queue_proc_call(); 122 queue_proc_read(&r, sizeof(r)); 123 queue_proc_end(); 124 125 return (r); 126 } 127 128 static int 129 queue_proc_message_create(uint32_t *msgid) 130 { 131 int r; 132 133 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0); 134 135 queue_proc_call(); 136 queue_proc_read(&r, sizeof(r)); 137 if (r == 1) 138 queue_proc_read(msgid, sizeof(*msgid)); 139 queue_proc_end(); 140 141 return (r); 142 } 143 144 static int 145 queue_proc_message_commit(uint32_t msgid, const char *path) 146 { 147 int r, fd; 148 149 fd = open(path, O_RDONLY); 150 if (fd == -1) { 151 log_warn("queue-proc: open: %s", path); 152 return (0); 153 } 154 155 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid, 156 sizeof(msgid)); 157 158 queue_proc_call(); 159 queue_proc_read(&r, sizeof(r)); 160 queue_proc_end(); 161 162 return (r); 163 } 164 165 static int 166 queue_proc_message_delete(uint32_t msgid) 167 { 168 int r; 169 170 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid, 171 sizeof(msgid)); 172 173 queue_proc_call(); 174 queue_proc_read(&r, sizeof(r)); 175 queue_proc_end(); 176 177 return (r); 178 } 179 180 static int 181 queue_proc_message_fd_r(uint32_t msgid) 182 { 183 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid, 184 sizeof(msgid)); 185 186 queue_proc_call(); 187 queue_proc_end(); 188 189 return (imsg.fd); 190 } 191 192 static int 193 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len, 194 uint64_t *evpid) 195 { 196 struct ibuf *b; 197 int r; 198 199 msgid = evpid_to_msgid(*evpid); 200 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0, 201 sizeof(msgid) + len); 202 if (imsg_add(b, &msgid, sizeof(msgid)) == -1 || 203 imsg_add(b, buf, len) == -1) 204 return (0); 205 imsg_close(&ibuf, b); 206 207 queue_proc_call(); 208 queue_proc_read(&r, sizeof(r)); 209 if (r == 1) 210 queue_proc_read(evpid, sizeof(*evpid)); 211 queue_proc_end(); 212 213 return (r); 214 } 215 216 static int 217 queue_proc_envelope_delete(uint64_t evpid) 218 { 219 int r; 220 221 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid, 222 sizeof(evpid)); 223 224 queue_proc_call(); 225 queue_proc_read(&r, sizeof(r)); 226 queue_proc_end(); 227 228 return (r); 229 } 230 231 static int 232 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len) 233 { 234 struct ibuf *b; 235 int r; 236 237 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0, 238 len + sizeof(evpid)); 239 if (imsg_add(b, &evpid, sizeof(evpid)) == -1 || 240 imsg_add(b, buf, len) == -1) 241 return (0); 242 imsg_close(&ibuf, b); 243 244 queue_proc_call(); 245 queue_proc_read(&r, sizeof(r)); 246 queue_proc_end(); 247 248 return (r); 249 } 250 251 static int 252 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len) 253 { 254 int r; 255 256 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid, 257 sizeof(evpid)); 258 259 queue_proc_call(); 260 261 if (rlen > len) { 262 log_warnx("warn: queue-proc: buf too small"); 263 fatalx("queue-proc: exiting"); 264 } 265 266 r = rlen; 267 queue_proc_read(buf, rlen); 268 queue_proc_end(); 269 270 return (r); 271 } 272 273 static int 274 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len) 275 { 276 int r; 277 278 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0); 279 280 queue_proc_call(); 281 queue_proc_read(&r, sizeof(r)); 282 283 if (r > 0) { 284 queue_proc_read(evpid, sizeof(*evpid)); 285 if (rlen > len) { 286 log_warnx("warn: queue-proc: buf too small"); 287 fatalx("queue-proc: exiting"); 288 } 289 if (r != (int)rlen) { 290 log_warnx("warn: queue-proc: len mismatch"); 291 fatalx("queue-proc: exiting"); 292 } 293 queue_proc_read(buf, rlen); 294 } 295 queue_proc_end(); 296 297 return (r); 298 } 299 300 static int 301 queue_proc_init(struct passwd *pw, int server, const char *conf) 302 { 303 uint32_t version; 304 int fd; 305 306 fd = fork_proc_backend("queue", conf, "queue-proc"); 307 if (fd == -1) 308 fatalx("queue-proc: exiting"); 309 310 imsg_init(&ibuf, fd); 311 312 version = PROC_QUEUE_API_VERSION; 313 imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1, 314 &version, sizeof(version)); 315 316 queue_api_on_close(queue_proc_close); 317 queue_api_on_message_create(queue_proc_message_create); 318 queue_api_on_message_commit(queue_proc_message_commit); 319 queue_api_on_message_delete(queue_proc_message_delete); 320 queue_api_on_message_fd_r(queue_proc_message_fd_r); 321 queue_api_on_envelope_create(queue_proc_envelope_create); 322 queue_api_on_envelope_delete(queue_proc_envelope_delete); 323 queue_api_on_envelope_update(queue_proc_envelope_update); 324 queue_api_on_envelope_load(queue_proc_envelope_load); 325 queue_api_on_envelope_walk(queue_proc_envelope_walk); 326 327 queue_proc_call(); 328 queue_proc_end(); 329 330 return (1); 331 } 332 333 struct queue_backend queue_backend_proc = { 334 queue_proc_init, 335 }; 336