1 /* $OpenBSD: queue_proc.c,v 1.6 2015/12/05 13:14:21 claudio Exp $ */ 2 3 /* 4 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/types.h> 20 #include <sys/queue.h> 21 #include <sys/tree.h> 22 #include <sys/socket.h> 23 #include <sys/stat.h> 24 25 #include <ctype.h> 26 #include <errno.h> 27 #include <event.h> 28 #include <fcntl.h> 29 #include <imsg.h> 30 #include <inttypes.h> 31 #include <libgen.h> 32 #include <pwd.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <time.h> 37 #include <unistd.h> 38 #include <limits.h> 39 40 #include "smtpd.h" 41 #include "log.h" 42 43 static struct imsgbuf ibuf; 44 static struct imsg imsg; 45 static size_t rlen; 46 static char *rdata; 47 48 static void 49 queue_proc_call(void) 50 { 51 ssize_t n; 52 53 if (imsg_flush(&ibuf) == -1) { 54 log_warn("warn: queue-proc: imsg_flush"); 55 fatalx("queue-proc: exiting"); 56 } 57 58 while (1) { 59 if ((n = imsg_get(&ibuf, &imsg)) == -1) { 60 log_warn("warn: queue-proc: imsg_get"); 61 break; 62 } 63 if (n) { 64 rlen = imsg.hdr.len - IMSG_HEADER_SIZE; 65 rdata = imsg.data; 66 67 if (imsg.hdr.type != PROC_QUEUE_OK) { 68 log_warnx("warn: queue-proc: bad response"); 69 break; 70 } 71 return; 72 } 73 74 if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) { 75 log_warn("warn: queue-proc: imsg_read"); 76 break; 77 } 78 79 if (n == 0) { 80 log_warnx("warn: queue-proc: pipe closed"); 81 break; 82 } 83 } 84 85 fatalx("queue-proc: exiting"); 86 } 87 88 static void 89 queue_proc_read(void *dst, size_t len) 90 { 91 if (len > rlen) { 92 log_warnx("warn: queue-proc: bad msg len"); 93 fatalx("queue-proc: exiting"); 94 } 95 96 memmove(dst, rdata, len); 97 rlen -= len; 98 rdata += len; 99 } 100 101 static void 102 queue_proc_end(void) 103 { 104 if (rlen) { 105 log_warnx("warn: queue-proc: bogus data"); 106 fatalx("queue-proc: exiting"); 107 } 108 imsg_free(&imsg); 109 } 110 111 /* 112 * API 113 */ 114 115 static int 116 queue_proc_close(void) 117 { 118 int r; 119 120 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, NULL, 0); 121 122 queue_proc_call(); 123 queue_proc_read(&r, sizeof(r)); 124 queue_proc_end(); 125 126 return (r); 127 } 128 129 static int 130 queue_proc_message_create(uint32_t *msgid) 131 { 132 int r; 133 134 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0); 135 136 queue_proc_call(); 137 queue_proc_read(&r, sizeof(r)); 138 if (r == 1) 139 queue_proc_read(msgid, sizeof(*msgid)); 140 queue_proc_end(); 141 142 return (r); 143 } 144 145 static int 146 queue_proc_message_commit(uint32_t msgid, const char *path) 147 { 148 int r, fd; 149 150 fd = open(path, O_RDONLY); 151 if (fd == -1) { 152 log_warn("queue-proc: open: %s", path); 153 return (0); 154 } 155 156 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid, 157 sizeof(msgid)); 158 159 queue_proc_call(); 160 queue_proc_read(&r, sizeof(r)); 161 queue_proc_end(); 162 163 return (r); 164 } 165 166 static int 167 queue_proc_message_delete(uint32_t msgid) 168 { 169 int r; 170 171 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid, 172 sizeof(msgid)); 173 174 queue_proc_call(); 175 queue_proc_read(&r, sizeof(r)); 176 queue_proc_end(); 177 178 return (r); 179 } 180 181 static int 182 queue_proc_message_fd_r(uint32_t msgid) 183 { 184 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid, 185 sizeof(msgid)); 186 187 queue_proc_call(); 188 queue_proc_end(); 189 190 return (imsg.fd); 191 } 192 193 static int 194 queue_proc_message_corrupt(uint32_t msgid) 195 { 196 int r; 197 198 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CORRUPT, 0, 0, -1, &msgid, 199 sizeof(msgid)); 200 201 queue_proc_call(); 202 queue_proc_read(&r, sizeof(r)); 203 queue_proc_end(); 204 205 return (r); 206 } 207 208 static int 209 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len, 210 uint64_t *evpid) 211 { 212 struct ibuf *b; 213 int r; 214 215 msgid = evpid_to_msgid(*evpid); 216 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0, 217 sizeof(msgid) + len); 218 if (imsg_add(b, &msgid, sizeof(msgid)) == -1 || 219 imsg_add(b, buf, len) == -1) 220 return (0); 221 imsg_close(&ibuf, b); 222 223 queue_proc_call(); 224 queue_proc_read(&r, sizeof(r)); 225 if (r == 1) 226 queue_proc_read(evpid, sizeof(*evpid)); 227 queue_proc_end(); 228 229 return (r); 230 } 231 232 static int 233 queue_proc_envelope_delete(uint64_t evpid) 234 { 235 int r; 236 237 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid, 238 sizeof(evpid)); 239 240 queue_proc_call(); 241 queue_proc_read(&r, sizeof(r)); 242 queue_proc_end(); 243 244 return (r); 245 } 246 247 static int 248 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len) 249 { 250 struct ibuf *b; 251 int r; 252 253 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0, 254 len + sizeof(evpid)); 255 if (imsg_add(b, &evpid, sizeof(evpid)) == -1 || 256 imsg_add(b, buf, len) == -1) 257 return (0); 258 imsg_close(&ibuf, b); 259 260 queue_proc_call(); 261 queue_proc_read(&r, sizeof(r)); 262 queue_proc_end(); 263 264 return (r); 265 } 266 267 static int 268 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len) 269 { 270 int r; 271 272 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid, 273 sizeof(evpid)); 274 275 queue_proc_call(); 276 277 if (rlen > len) { 278 log_warnx("warn: queue-proc: buf too small"); 279 fatalx("queue-proc: exiting"); 280 } 281 282 r = rlen; 283 queue_proc_read(buf, rlen); 284 queue_proc_end(); 285 286 return (r); 287 } 288 289 static int 290 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len) 291 { 292 int r; 293 294 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0); 295 296 queue_proc_call(); 297 queue_proc_read(&r, sizeof(r)); 298 299 if (r > 0) { 300 queue_proc_read(evpid, sizeof(*evpid)); 301 if (rlen > len) { 302 log_warnx("warn: queue-proc: buf too small"); 303 fatalx("queue-proc: exiting"); 304 } 305 if (r != (int)rlen) { 306 log_warnx("warn: queue-proc: len mismatch"); 307 fatalx("queue-proc: exiting"); 308 } 309 queue_proc_read(buf, rlen); 310 } 311 queue_proc_end(); 312 313 return (r); 314 } 315 316 static int 317 queue_proc_init(struct passwd *pw, int server, const char *conf) 318 { 319 uint32_t version; 320 int fd; 321 322 fd = fork_proc_backend("queue", conf, "queue-proc"); 323 if (fd == -1) 324 fatalx("queue-proc: exiting"); 325 326 imsg_init(&ibuf, fd); 327 328 version = PROC_QUEUE_API_VERSION; 329 imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1, 330 &version, sizeof(version)); 331 332 queue_api_on_close(queue_proc_close); 333 queue_api_on_message_create(queue_proc_message_create); 334 queue_api_on_message_commit(queue_proc_message_commit); 335 queue_api_on_message_delete(queue_proc_message_delete); 336 queue_api_on_message_fd_r(queue_proc_message_fd_r); 337 queue_api_on_message_corrupt(queue_proc_message_corrupt); 338 queue_api_on_envelope_create(queue_proc_envelope_create); 339 queue_api_on_envelope_delete(queue_proc_envelope_delete); 340 queue_api_on_envelope_update(queue_proc_envelope_update); 341 queue_api_on_envelope_load(queue_proc_envelope_load); 342 queue_api_on_envelope_walk(queue_proc_envelope_walk); 343 344 queue_proc_call(); 345 queue_proc_end(); 346 347 return (1); 348 } 349 350 struct queue_backend queue_backend_proc = { 351 queue_proc_init, 352 }; 353