1 /* $OpenBSD: queue_proc.c,v 1.9 2021/06/14 17:58:16 eric Exp $ */ 2 3 /* 4 * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <errno.h> 20 #include <fcntl.h> 21 #include <string.h> 22 23 #include "smtpd.h" 24 #include "log.h" 25 26 static struct imsgbuf ibuf; 27 static struct imsg imsg; 28 static size_t rlen; 29 static char *rdata; 30 31 static void 32 queue_proc_call(void) 33 { 34 ssize_t n; 35 36 if (imsg_flush(&ibuf) == -1) { 37 log_warn("warn: queue-proc: imsg_flush"); 38 fatalx("queue-proc: exiting"); 39 } 40 41 while (1) { 42 if ((n = imsg_get(&ibuf, &imsg)) == -1) { 43 log_warn("warn: queue-proc: imsg_get"); 44 break; 45 } 46 if (n) { 47 rlen = imsg.hdr.len - IMSG_HEADER_SIZE; 48 rdata = imsg.data; 49 50 if (imsg.hdr.type != PROC_QUEUE_OK) { 51 log_warnx("warn: queue-proc: bad response"); 52 break; 53 } 54 return; 55 } 56 57 if ((n = imsg_read(&ibuf)) == -1 && errno != EAGAIN) { 58 log_warn("warn: queue-proc: imsg_read"); 59 break; 60 } 61 62 if (n == 0) { 63 log_warnx("warn: queue-proc: pipe closed"); 64 break; 65 } 66 } 67 68 fatalx("queue-proc: exiting"); 69 } 70 71 static void 72 queue_proc_read(void *dst, size_t len) 73 { 74 if (len > rlen) { 75 log_warnx("warn: queue-proc: bad msg len"); 76 fatalx("queue-proc: exiting"); 77 } 78 79 memmove(dst, rdata, len); 80 rlen -= len; 81 rdata += len; 82 } 83 84 static void 85 queue_proc_end(void) 86 { 87 if (rlen) { 88 log_warnx("warn: queue-proc: bogus data"); 89 fatalx("queue-proc: exiting"); 90 } 91 imsg_free(&imsg); 92 } 93 94 /* 95 * API 96 */ 97 98 static int 99 queue_proc_close(void) 100 { 101 int r; 102 103 imsg_compose(&ibuf, PROC_QUEUE_CLOSE, 0, 0, -1, NULL, 0); 104 105 queue_proc_call(); 106 queue_proc_read(&r, sizeof(r)); 107 queue_proc_end(); 108 109 return (r); 110 } 111 112 static int 113 queue_proc_message_create(uint32_t *msgid) 114 { 115 int r; 116 117 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_CREATE, 0, 0, -1, NULL, 0); 118 119 queue_proc_call(); 120 queue_proc_read(&r, sizeof(r)); 121 if (r == 1) 122 queue_proc_read(msgid, sizeof(*msgid)); 123 queue_proc_end(); 124 125 return (r); 126 } 127 128 static int 129 queue_proc_message_commit(uint32_t msgid, const char *path) 130 { 131 int r, fd; 132 133 fd = open(path, O_RDONLY); 134 if (fd == -1) { 135 log_warn("queue-proc: open: %s", path); 136 return (0); 137 } 138 139 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_COMMIT, 0, 0, fd, &msgid, 140 sizeof(msgid)); 141 142 queue_proc_call(); 143 queue_proc_read(&r, sizeof(r)); 144 queue_proc_end(); 145 146 return (r); 147 } 148 149 static int 150 queue_proc_message_delete(uint32_t msgid) 151 { 152 int r; 153 154 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_DELETE, 0, 0, -1, &msgid, 155 sizeof(msgid)); 156 157 queue_proc_call(); 158 queue_proc_read(&r, sizeof(r)); 159 queue_proc_end(); 160 161 return (r); 162 } 163 164 static int 165 queue_proc_message_fd_r(uint32_t msgid) 166 { 167 imsg_compose(&ibuf, PROC_QUEUE_MESSAGE_FD_R, 0, 0, -1, &msgid, 168 sizeof(msgid)); 169 170 queue_proc_call(); 171 queue_proc_end(); 172 173 return (imsg.fd); 174 } 175 176 static int 177 queue_proc_envelope_create(uint32_t msgid, const char *buf, size_t len, 178 uint64_t *evpid) 179 { 180 struct ibuf *b; 181 int r; 182 183 msgid = evpid_to_msgid(*evpid); 184 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_CREATE, 0, 0, 185 sizeof(msgid) + len); 186 if (imsg_add(b, &msgid, sizeof(msgid)) == -1 || 187 imsg_add(b, buf, len) == -1) 188 return (0); 189 imsg_close(&ibuf, b); 190 191 queue_proc_call(); 192 queue_proc_read(&r, sizeof(r)); 193 if (r == 1) 194 queue_proc_read(evpid, sizeof(*evpid)); 195 queue_proc_end(); 196 197 return (r); 198 } 199 200 static int 201 queue_proc_envelope_delete(uint64_t evpid) 202 { 203 int r; 204 205 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_DELETE, 0, 0, -1, &evpid, 206 sizeof(evpid)); 207 208 queue_proc_call(); 209 queue_proc_read(&r, sizeof(r)); 210 queue_proc_end(); 211 212 return (r); 213 } 214 215 static int 216 queue_proc_envelope_update(uint64_t evpid, const char *buf, size_t len) 217 { 218 struct ibuf *b; 219 int r; 220 221 b = imsg_create(&ibuf, PROC_QUEUE_ENVELOPE_UPDATE, 0, 0, 222 len + sizeof(evpid)); 223 if (imsg_add(b, &evpid, sizeof(evpid)) == -1 || 224 imsg_add(b, buf, len) == -1) 225 return (0); 226 imsg_close(&ibuf, b); 227 228 queue_proc_call(); 229 queue_proc_read(&r, sizeof(r)); 230 queue_proc_end(); 231 232 return (r); 233 } 234 235 static int 236 queue_proc_envelope_load(uint64_t evpid, char *buf, size_t len) 237 { 238 int r; 239 240 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_LOAD, 0, 0, -1, &evpid, 241 sizeof(evpid)); 242 243 queue_proc_call(); 244 245 if (rlen > len) { 246 log_warnx("warn: queue-proc: buf too small"); 247 fatalx("queue-proc: exiting"); 248 } 249 250 r = rlen; 251 queue_proc_read(buf, rlen); 252 queue_proc_end(); 253 254 return (r); 255 } 256 257 static int 258 queue_proc_envelope_walk(uint64_t *evpid, char *buf, size_t len) 259 { 260 int r; 261 262 imsg_compose(&ibuf, PROC_QUEUE_ENVELOPE_WALK, 0, 0, -1, NULL, 0); 263 264 queue_proc_call(); 265 queue_proc_read(&r, sizeof(r)); 266 267 if (r > 0) { 268 queue_proc_read(evpid, sizeof(*evpid)); 269 if (rlen > len) { 270 log_warnx("warn: queue-proc: buf too small"); 271 fatalx("queue-proc: exiting"); 272 } 273 if (r != (int)rlen) { 274 log_warnx("warn: queue-proc: len mismatch"); 275 fatalx("queue-proc: exiting"); 276 } 277 queue_proc_read(buf, rlen); 278 } 279 queue_proc_end(); 280 281 return (r); 282 } 283 284 static int 285 queue_proc_init(struct passwd *pw, int server, const char *conf) 286 { 287 uint32_t version; 288 int fd; 289 290 fd = fork_proc_backend("queue", conf, "queue-proc"); 291 if (fd == -1) 292 fatalx("queue-proc: exiting"); 293 294 imsg_init(&ibuf, fd); 295 296 version = PROC_QUEUE_API_VERSION; 297 imsg_compose(&ibuf, PROC_QUEUE_INIT, 0, 0, -1, 298 &version, sizeof(version)); 299 300 queue_api_on_close(queue_proc_close); 301 queue_api_on_message_create(queue_proc_message_create); 302 queue_api_on_message_commit(queue_proc_message_commit); 303 queue_api_on_message_delete(queue_proc_message_delete); 304 queue_api_on_message_fd_r(queue_proc_message_fd_r); 305 queue_api_on_envelope_create(queue_proc_envelope_create); 306 queue_api_on_envelope_delete(queue_proc_envelope_delete); 307 queue_api_on_envelope_update(queue_proc_envelope_update); 308 queue_api_on_envelope_load(queue_proc_envelope_load); 309 queue_api_on_envelope_walk(queue_proc_envelope_walk); 310 311 queue_proc_call(); 312 queue_proc_end(); 313 314 return (1); 315 } 316 317 struct queue_backend queue_backend_proc = { 318 queue_proc_init, 319 }; 320