1 /*++
2 /* NAME
3 /*	qmgr_active 3
4 /* SUMMARY
5 /*	active queue management
6 /* SYNOPSIS
7 /*	#include "qmgr.h"
8 /*
9 /*	void	qmgr_active_feed(scan_info, queue_id)
10 /*	QMGR_SCAN *scan_info;
11 /*	const char *queue_id;
12 /*
13 /*	void	qmgr_active_drain()
14 /*
15 /*	int	qmgr_active_done(message)
16 /*	QMGR_MESSAGE *message;
17 /* DESCRIPTION
18 /*	These functions maintain the active message queue: the set
19 /*	of messages that the queue manager is actually working on.
20 /*	The active queue is limited in size. Messages are drained
21 /*	from the active queue by allocating a delivery process and
22 /*	by delivering mail via that process.  Messages leak into the
23 /*	active queue only when the active queue is small enough.
24 /*	Damaged message files are saved to the "corrupt" directory.
25 /*
26 /*	qmgr_active_feed() inserts the named message file into
27 /*	the active queue. Message files with the wrong name or
28 /*	with other wrong properties are skipped but not removed.
29 /*	The following queue flags are recognized, other flags being
30 /*	ignored:
31 /* .IP QMGR_SCAN_ALL
32 /*	Examine all queue files. Normally, deferred queue files with
33 /*	future time stamps are ignored, and incoming queue files with
34 /*	future time stamps are frowned upon.
35 /* .PP
36 /*	qmgr_active_drain() allocates one delivery process.
37 /*	Process allocation is asynchronous. Once the delivery
38 /*	process is available, an attempt is made to deliver
39 /*	a message via it. Message delivery is asynchronous, too.
40 /*
41 /*	qmgr_active_done() deals with a message after delivery
42 /*	has been tried for all in-core recipients. If the message
43 /*	was bounced, a bounce message is sent to the sender, or
44 /*	to the Errors-To: address if one was specified.
45 /*	If there are more on-file recipients, a new batch of
46 /*	in-core recipients is read from the queue file. Otherwise,
47 /*	if a delivery agent marked the queue file as corrupt,
48 /*	the queue file is moved to the "corrupt" queue (surprise);
49 /*	if at least one delivery failed, the message is moved
50 /*	to the deferred queue. The time stamps of a deferred queue
51 /*	file are set to the nearest wakeup time of its recipient
52 /*	sites (if delivery failed due to a problem with a next-hop
53 /*	host), are set into the future by the amount of time the
54 /*	message was queued (per-message exponential backoff), or are set
55 /*	into the future by a minimal backoff time, whichever is more.
56 /*	The minimal_backoff_time parameter specifies the minimal
57 /*	amount of time between delivery attempts; maximal_backoff_time
58 /*	specifies an upper limit.
59 /* DIAGNOSTICS
60 /*	Fatal: queue file access failures, out of memory.
61 /*	Panic: interface violations, internal consistency errors.
62 /*	Warnings: corrupt message file. A corrupt message is saved
63 /*	to the "corrupt" queue for further inspection.
64 /* LICENSE
65 /* .ad
66 /* .fi
67 /*	The Secure Mailer license must be distributed with this software.
68 /* AUTHOR(S)
69 /*	Wietse Venema
70 /*	IBM T.J. Watson Research
71 /*	P.O. Box 704
72 /*	Yorktown Heights, NY 10598, USA
73 /*
74 /*	Wietse Venema
75 /*	Google, Inc.
76 /*	111 8th Avenue
77 /*	New York, NY 10011, USA
78 /*--*/
79 
80 /* System library. */
81 
82 #include <sys_defs.h>
83 #include <sys/stat.h>
84 #include <dirent.h>
85 #include <stdlib.h>
86 #include <unistd.h>
87 #include <string.h>
88 #include <utime.h>
89 #include <errno.h>
90 
91 #ifndef S_IRWXU				/* What? no POSIX system? */
92 #define S_IRWXU 0700
93 #endif
94 
95 /* Utility library. */
96 
97 #include <msg.h>
98 #include <events.h>
99 #include <mymalloc.h>
100 #include <vstream.h>
101 #include <warn_stat.h>
102 
103 /* Global library. */
104 
105 #include <mail_params.h>
106 #include <mail_open_ok.h>
107 #include <mail_queue.h>
108 #include <recipient_list.h>
109 #include <bounce.h>
110 #include <defer.h>
111 #include <trace.h>
112 #include <abounce.h>
113 #include <rec_type.h>
114 #include <qmgr_user.h>
115 #include <info_log_addr_form.h>
116 
117 /* Application-specific. */
118 
119 #include "qmgr.h"
120 
121  /*
122   * A bunch of call-back routines.
123   */
124 static void qmgr_active_done_2_bounce_flush(int, void *);
125 static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
126 static void qmgr_active_done_25_trace_flush(int, void *);
127 static void qmgr_active_done_25_generic(QMGR_MESSAGE *);
128 static void qmgr_active_done_3_defer_flush(int, void *);
129 static void qmgr_active_done_3_defer_warn(int, void *);
130 static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
131 
132 /* qmgr_active_corrupt - move corrupted file out of the way */
133 
qmgr_active_corrupt(const char * queue_id)134 static void qmgr_active_corrupt(const char *queue_id)
135 {
136     const char *myname = "qmgr_active_corrupt";
137 
138     if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
139 	if (errno != ENOENT)
140 	    msg_fatal("%s: save corrupt file queue %s id %s: %m",
141 		      myname, MAIL_QUEUE_ACTIVE, queue_id);
142     } else {
143 	msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
144 		 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
145     }
146 }
147 
148 /* qmgr_active_defer - defer queue file */
149 
qmgr_active_defer(const char * queue_name,const char * queue_id,const char * dest_queue,int delay)150 static void qmgr_active_defer(const char *queue_name, const char *queue_id,
151 			              const char *dest_queue, int delay)
152 {
153     const char *myname = "qmgr_active_defer";
154     const char *path;
155     struct utimbuf tbuf;
156 
157     if (msg_verbose)
158 	msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
159 
160     tbuf.actime = tbuf.modtime = event_time() + delay;
161     path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
162     if (utime(path, &tbuf) < 0 && errno != ENOENT)
163 	msg_fatal("%s: update %s time stamps: %m", myname, path);
164     if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
165 	if (errno != ENOENT)
166 	    msg_fatal("%s: rename %s from %s to %s: %m", myname,
167 		      queue_id, queue_name, dest_queue);
168 	msg_warn("%s: rename %s from %s to %s: %m", myname,
169 		 queue_id, queue_name, dest_queue);
170     } else if (msg_verbose) {
171 	msg_info("%s: defer %s", myname, queue_id);
172     }
173 }
174 
175 /* qmgr_active_feed - feed one message into active queue */
176 
qmgr_active_feed(QMGR_SCAN * scan_info,const char * queue_id)177 int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
178 {
179     const char *myname = "qmgr_active_feed";
180     QMGR_MESSAGE *message;
181     struct stat st;
182     const char *path;
183 
184     if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
185 	msg_panic("%s: bad queue %s", myname, scan_info->queue);
186     if (msg_verbose)
187 	msg_info("%s: queue %s", myname, scan_info->queue);
188 
189     /*
190      * Make sure this is something we are willing to open.
191      */
192     if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
193 	return (0);
194 
195     if (msg_verbose)
196 	msg_info("%s: %s", myname, path);
197 
198     /*
199      * Skip files that have time stamps into the future. They need to cool
200      * down. Incoming and deferred files can have future time stamps.
201      */
202     if ((scan_info->flags & QMGR_SCAN_ALL) == 0
203 	&& st.st_mtime > time((time_t *) 0) + 1) {
204 	if (msg_verbose)
205 	    msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
206 		     (long) (st.st_mtime - event_time()));
207 	return (0);
208     }
209 
210     /*
211      * Move the message to the active queue. File access errors are fatal.
212      */
213     if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
214 	if (errno != ENOENT)
215 	    msg_fatal("%s: %s: rename from %s to %s: %m", myname,
216 		      queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
217 	msg_warn("%s: %s: rename from %s to %s: %m", myname,
218 		 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
219 	return (0);
220     }
221 
222     /*
223      * Extract envelope information: sender and recipients. At this point,
224      * mail addresses have been processed by the cleanup service so they
225      * should be in canonical form. Generate requests to deliver this
226      * message.
227      *
228      * Throwing away queue files seems bad, especially when they made it this
229      * far into the mail system. Therefore we save bad files to a separate
230      * directory for further inspection.
231      *
232      * After queue manager restart it is possible that a queue file is still
233      * being delivered. In that case (the file is locked), defer delivery by
234      * a minimal amount of time.
235      */
236 #define QMGR_FLUSH_AFTER	(QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
237 #define MAYBE_FLUSH_AFTER(mode) \
238 	(((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? QMGR_FLUSH_AFTER : 0)
239 #define MAYBE_FORCE_EXPIRE(mode) \
240 	(((mode) & MAIL_QUEUE_STAT_EXPIRE) ? QMGR_FORCE_EXPIRE : 0)
241 #define MAYBE_UPDATE_MODE(mode) \
242 	(((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? \
243 	(mode) & ~MAIL_QUEUE_STAT_UNTHROTTLE : 0)
244 
245     if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
246 				      scan_info->flags
247 				      | MAYBE_FLUSH_AFTER(st.st_mode)
248 				      | MAYBE_FORCE_EXPIRE(st.st_mode),
249 				      MAYBE_UPDATE_MODE(st.st_mode))) == 0) {
250 	qmgr_active_corrupt(queue_id);
251 	return (0);
252     } else if (message == QMGR_MESSAGE_LOCKED) {
253 	qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
254 	return (0);
255     } else {
256 
257 	/*
258 	 * Special case if all recipients were already delivered. Send any
259 	 * bounces and clean up.
260 	 */
261 	if (message->refcount == 0)
262 	    qmgr_active_done(message);
263 	return (1);
264     }
265 }
266 
267 /* qmgr_active_done - dispose of message after recipients have been tried */
268 
qmgr_active_done(QMGR_MESSAGE * message)269 void    qmgr_active_done(QMGR_MESSAGE *message)
270 {
271     const char *myname = "qmgr_active_done";
272     struct stat st;
273 
274     if (msg_verbose)
275 	msg_info("%s: %s", myname, message->queue_id);
276 
277     /*
278      * During a previous iteration, an attempt to bounce this message may
279      * have failed, so there may still be a bounce log lying around. XXX By
280      * groping around in the bounce queue, we're trespassing on the bounce
281      * service's territory. But doing so is more robust than depending on the
282      * bounce daemon to do the lookup for us, and for us to do the deleting
283      * after we have received a successful status from the bounce service.
284      * The bounce queue directory blocks are most likely in memory anyway. If
285      * these lookups become a performance problem we will have to build an
286      * in-core cache into the bounce daemon.
287      *
288      * Don't bounce when the bounce log is empty. The bounce process obviously
289      * failed, and the delivery agent will have requested that the message be
290      * deferred.
291      *
292      * Bounces are sent asynchronously to avoid stalling while the cleanup
293      * daemon waits for the qmgr to accept the "new mail" trigger.
294      *
295      * See also code in cleanup_bounce.c.
296      */
297     if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
298 	if (st.st_size == 0) {
299 	    if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
300 		msg_fatal("remove %s %s: %m",
301 			  MAIL_QUEUE_BOUNCE, message->queue_id);
302 	} else {
303 	    if (msg_verbose)
304 		msg_info("%s: bounce %s", myname, message->queue_id);
305 	    if (message->verp_delims == 0 || var_verp_bounce_off)
306 		abounce_flush(BOUNCE_FLAG_KEEP,
307 			      message->queue_name,
308 			      message->queue_id,
309 			      message->encoding,
310 			      message->smtputf8,
311 			      message->sender,
312 			      message->dsn_envid,
313 			      message->dsn_ret,
314 			      qmgr_active_done_2_bounce_flush,
315 			      (void *) message);
316 	    else
317 		abounce_flush_verp(BOUNCE_FLAG_KEEP,
318 				   message->queue_name,
319 				   message->queue_id,
320 				   message->encoding,
321 				   message->smtputf8,
322 				   message->sender,
323 				   message->dsn_envid,
324 				   message->dsn_ret,
325 				   message->verp_delims,
326 				   qmgr_active_done_2_bounce_flush,
327 				   (void *) message);
328 	    return;
329 	}
330     }
331 
332     /*
333      * Asynchronous processing does not reach this point.
334      */
335     qmgr_active_done_2_generic(message);
336 }
337 
338 /* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
339 
qmgr_active_done_2_bounce_flush(int status,void * context)340 static void qmgr_active_done_2_bounce_flush(int status, void *context)
341 {
342     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
343 
344     /*
345      * Process abounce_flush() status and continue processing.
346      */
347     message->flags |= status;
348     qmgr_active_done_2_generic(message);
349 }
350 
351 /* qmgr_active_done_2_generic - continue processing */
352 
qmgr_active_done_2_generic(QMGR_MESSAGE * message)353 static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
354 {
355     const char *path;
356     struct stat st;
357 
358     /*
359      * A delivery agent marks a queue file as corrupt by changing its
360      * attributes, and by pretending that delivery was deferred.
361      */
362     if (message->flags
363 	&& mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
364 	qmgr_active_corrupt(message->queue_id);
365 	qmgr_message_free(message);
366 	return;
367     }
368 
369     /*
370      * If we did not read all recipients from this file, go read some more,
371      * but remember whether some recipients have to be tried again.
372      *
373      * Throwing away queue files seems bad, especially when they made it this
374      * far into the mail system. Therefore we save bad files to a separate
375      * directory for further inspection by a human being.
376      */
377     if (message->rcpt_offset > 0) {
378 	if (qmgr_message_realloc(message) == 0) {
379 	    qmgr_active_corrupt(message->queue_id);
380 	    qmgr_message_free(message);
381 	} else {
382 	    if (message->refcount == 0)
383 		qmgr_active_done(message);	/* recurse for consistency */
384 	}
385 	return;
386     }
387 
388     /*
389      * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
390      * and others not. Depending on what subset of recipients are delivered,
391      * a trace file may or may not be created. Even when the last partial
392      * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
393      * still exist from a previous partial delivery attempt. So as long as
394      * any recipient has NOTIFY=SUCCESS we have to always look for the trace
395      * file and be prepared for the file not to exist.
396      *
397      * See also comments in bounce/bounce_notify_util.c.
398      */
399     if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD
400 			    | DEL_REQ_FLAG_REC_DLY_SENT))
401 	|| (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
402 	atrace_flush(message->tflags,
403 		     message->queue_name,
404 		     message->queue_id,
405 		     message->encoding,
406 		     message->smtputf8,
407 		     message->sender,
408 		     message->dsn_envid,
409 		     message->dsn_ret,
410 		     qmgr_active_done_25_trace_flush,
411 		     (void *) message);
412 	return;
413     }
414 
415     /*
416      * Asynchronous processing does not reach this point.
417      */
418     qmgr_active_done_25_generic(message);
419 }
420 
421 /* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */
422 
qmgr_active_done_25_trace_flush(int status,void * context)423 static void qmgr_active_done_25_trace_flush(int status, void *context)
424 {
425     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
426 
427     /*
428      * Process atrace_flush() status and continue processing.
429      */
430     if (status == 0 && message->tflags_offset)
431 	qmgr_message_kill_record(message, message->tflags_offset);
432     message->flags |= status;
433     qmgr_active_done_25_generic(message);
434 }
435 
436 /* qmgr_active_done_25_generic - continue processing */
437 
qmgr_active_done_25_generic(QMGR_MESSAGE * message)438 static void qmgr_active_done_25_generic(QMGR_MESSAGE *message)
439 {
440     const char *myname = "qmgr_active_done_25_generic";
441     const char *expire_status = 0;
442 
443     /*
444      * If we get to this point we have tried all recipients for this message.
445      * If the message is too old, try to bounce it.
446      *
447      * Bounces are sent asynchronously to avoid stalling while the cleanup
448      * daemon waits for the qmgr to accept the "new mail" trigger.
449      */
450     if (message->flags) {
451 	if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) {
452 	    expire_status = "force-expired";
453 	} else if (event_time() >= message->create_time +
454 	     (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
455 	    expire_status = "expired";
456 	} else {
457 	    expire_status = 0;
458 	}
459 	if (expire_status != 0) {
460 	    msg_info("%s: from=<%s>, status=%s, returned to sender",
461 	      message->queue_id, info_log_addr_form_sender(message->sender),
462 		     expire_status);
463 	    if (message->verp_delims == 0 || var_verp_bounce_off)
464 		adefer_flush(BOUNCE_FLAG_KEEP,
465 			     message->queue_name,
466 			     message->queue_id,
467 			     message->encoding,
468 			     message->smtputf8,
469 			     message->sender,
470 			     message->dsn_envid,
471 			     message->dsn_ret,
472 			     qmgr_active_done_3_defer_flush,
473 			     (void *) message);
474 	    else
475 		adefer_flush_verp(BOUNCE_FLAG_KEEP,
476 				  message->queue_name,
477 				  message->queue_id,
478 				  message->encoding,
479 				  message->smtputf8,
480 				  message->sender,
481 				  message->dsn_envid,
482 				  message->dsn_ret,
483 				  message->verp_delims,
484 				  qmgr_active_done_3_defer_flush,
485 				  (void *) message);
486 	    return;
487 	} else if (message->warn_time > 0
488 		   && event_time() >= message->warn_time - 1) {
489 	    if (msg_verbose)
490 		msg_info("%s: sending defer warning for %s", myname, message->queue_id);
491 	    adefer_warn(BOUNCE_FLAG_KEEP,
492 			message->queue_name,
493 			message->queue_id,
494 			message->encoding,
495 			message->smtputf8,
496 			message->sender,
497 			message->dsn_envid,
498 			message->dsn_ret,
499 			qmgr_active_done_3_defer_warn,
500 			(void *) message);
501 	    return;
502 	}
503     }
504 
505     /*
506      * Asynchronous processing does not reach this point.
507      */
508     qmgr_active_done_3_generic(message);
509 }
510 
511 /* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
512 
qmgr_active_done_3_defer_warn(int status,void * context)513 static void qmgr_active_done_3_defer_warn(int status, void *context)
514 {
515     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
516 
517     /*
518      * Process adefer_warn() completion status and continue processing.
519      */
520     if (status == 0)
521 	qmgr_message_update_warn(message);
522     qmgr_active_done_3_generic(message);
523 }
524 
525 /* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
526 
qmgr_active_done_3_defer_flush(int status,void * context)527 static void qmgr_active_done_3_defer_flush(int status, void *context)
528 {
529     QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
530 
531     /*
532      * Process adefer_flush() status and continue processing.
533      */
534     message->flags = status;
535     qmgr_active_done_3_generic(message);
536 }
537 
538 /* qmgr_active_done_3_generic - continue processing */
539 
qmgr_active_done_3_generic(QMGR_MESSAGE * message)540 static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
541 {
542     const char *myname = "qmgr_active_done_3_generic";
543     int     delay;
544 
545     /*
546      * Some recipients need to be tried again. Move the queue file time
547      * stamps into the future by the amount of time that the message is
548      * delayed, and move the message to the deferred queue. Impose minimal
549      * and maximal backoff times.
550      *
551      * Since we look at actual time in queue, not time since last delivery
552      * attempt, backoff times will be distributed. However, we can still see
553      * spikes in delivery activity because the interval between deferred
554      * queue scans is finite.
555      */
556     if (message->flags) {
557 	if (message->create_time > 0) {
558 	    delay = event_time() - message->create_time;
559 	    if (delay > var_max_backoff_time)
560 		delay = var_max_backoff_time;
561 	    if (delay < var_min_backoff_time)
562 		delay = var_min_backoff_time;
563 	} else {
564 	    delay = var_min_backoff_time;
565 	}
566 	qmgr_active_defer(message->queue_name, message->queue_id,
567 			  MAIL_QUEUE_DEFERRED, delay);
568     }
569 
570     /*
571      * All recipients done. Remove the queue file.
572      */
573     else {
574 	if (mail_queue_remove(message->queue_name, message->queue_id)) {
575 	    if (errno != ENOENT)
576 		msg_fatal("%s: remove %s from %s: %m", myname,
577 			  message->queue_id, message->queue_name);
578 	    msg_warn("%s: remove %s from %s: %m", myname,
579 		     message->queue_id, message->queue_name);
580 	} else {
581 	    /* Same format as logged by postsuper. */
582 	    msg_info("%s: removed", message->queue_id);
583 	}
584     }
585 
586     /*
587      * Finally, delete the in-core message structure.
588      */
589     qmgr_message_free(message);
590 }
591 
592 /* qmgr_active_drain - drain active queue by allocating a delivery process */
593 
qmgr_active_drain(void)594 void    qmgr_active_drain(void)
595 {
596     QMGR_TRANSPORT *transport;
597 
598     /*
599      * Allocate one delivery process for every transport with pending mail.
600      * The process allocation completes asynchronously.
601      */
602     while ((transport = qmgr_transport_select()) != 0) {
603 	if (msg_verbose)
604 	    msg_info("qmgr_active_drain: allocate %s", transport->name);
605 	qmgr_transport_alloc(transport, qmgr_deliver);
606     }
607 }
608