1 /*	$NetBSD: qmgr_entry.c,v 1.1.1.1 2009/06/23 10:08:52 tron Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_entry 3
6 /* SUMMARY
7 /*	per-site queue entries
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	QMGR_ENTRY *qmgr_entry_create(peer, message)
12 /*      QMGR_PEER *peer;
13 /*	QMGR_MESSAGE *message;
14 /*
15 /*	void	qmgr_entry_done(entry, which)
16 /*	QMGR_ENTRY *entry;
17 /*	int	which;
18 /*
19 /*	QMGR_ENTRY *qmgr_entry_select(queue)
20 /*	QMGR_QUEUE *queue;
21 /*
22 /*	void	qmgr_entry_unselect(queue, entry)
23 /*	QMGR_QUEUE *queue;
24 /*	QMGR_ENTRY *entry;
25 /*
26 /*	void	qmgr_entry_move_todo(dst, entry)
27 /*	QMGR_QUEUE *dst;
28 /*	QMGR_ENTRY *entry;
29 /* DESCRIPTION
30 /*	These routines add/delete/manipulate per-site message
31 /*	delivery requests.
32 /*
33 /*	qmgr_entry_create() creates an entry for the named peer and message,
34 /*      and appends the entry to the peer's list and its queue's todo list.
35 /*	Filling in and cleaning up the recipients is the responsibility
36 /*	of the caller.
37 /*
38 /*	qmgr_entry_done() discards a per-site queue entry.  The
39 /*	\fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry
40 /*	of the site's `busy' list (i.e. queue entries that have been
41 /*	selected for actual delivery), or QMGR_QUEUE_TODO for an entry
42 /*	of the site's `todo' list (i.e. queue entries awaiting selection
43 /*	for actual delivery).
44 /*
45 /*	qmgr_entry_done() discards its peer structure when the peer
46 /*      is not referenced anymore.
47 /*
48 /*	qmgr_entry_done() triggers cleanup of the per-site queue when
49 /*	the site has no pending deliveries, and the site is either
50 /*	alive, or the site is dead and the number of in-core queues
51 /*	exceeds a configurable limit (see qmgr_queue_done()).
52 /*
53 /*	qmgr_entry_done() triggers special action when the last in-core
54 /*	queue entry for a message is done with: either read more
55 /*	recipients from the queue file, delete the queue file, or move
56 /*	the queue file to the deferred queue; send bounce reports to the
57 /*	message originator (see qmgr_active_done()).
58 /*
59 /*	qmgr_entry_select() selects first entry from the named
60 /*	per-site queue's `todo' list for actual delivery. The entry is
61 /*	moved to the queue's `busy' list: the list of messages being
62 /*	delivered. The entry is also removed from its peer list.
63 /*
64 /*	qmgr_entry_unselect() takes the named entry off the named
65 /*	per-site queue's `busy' list and moves it to the queue's
66 /*	`todo' list. The entry is also prepended to its peer list again.
67 /*
68 /*	qmgr_entry_move_todo() moves the specified "todo" queue entry
69 /*	to the specified "todo" queue.
70 /* DIAGNOSTICS
71 /*	Panic: interface violations, internal inconsistencies.
72 /* LICENSE
73 /* .ad
74 /* .fi
75 /*	The Secure Mailer license must be distributed with this software.
76 /* AUTHOR(S)
77 /*	Wietse Venema
78 /*	IBM T.J. Watson Research
79 /*	P.O. Box 704
80 /*	Yorktown Heights, NY 10598, USA
81 /*
82 /*	Preemptive scheduler enhancements:
83 /*	Patrik Rak
84 /*	Modra 6
85 /*	155 00, Prague, Czech Republic
86 /*--*/
87 
88 /* System library. */
89 
90 #include <sys_defs.h>
91 #include <stdlib.h>
92 #include <time.h>
93 
94 /* Utility library. */
95 
96 #include <msg.h>
97 #include <mymalloc.h>
98 #include <events.h>
99 #include <vstream.h>
100 
101 /* Global library. */
102 
103 #include <mail_params.h>
104 #include <deliver_request.h>		/* opportunistic session caching */
105 
106 /* Application-specific. */
107 
108 #include "qmgr.h"
109 
110 /* qmgr_entry_select - select queue entry for delivery */
111 
112 QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *peer)
113 {
114     const char *myname = "qmgr_entry_select";
115     QMGR_ENTRY *entry;
116     QMGR_QUEUE *queue;
117 
118     if ((entry = peer->entry_list.next) != 0) {
119 	queue = entry->queue;
120 	QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers);
121 	queue->todo_refcount--;
122 	QMGR_LIST_APPEND(queue->busy, entry, queue_peers);
123 	queue->busy_refcount++;
124 	QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers);
125 	peer->job->selected_entries++;
126 
127 	/*
128 	 * With opportunistic session caching, the delivery agent must not
129 	 * only 1) save a session upon completion, but also 2) reuse a cached
130 	 * session upon the next delivery request. In order to not miss out
131 	 * on 2), we have to make caching sticky or else we get silly
132 	 * behavior when the in-memory queue drains. Specifically, new
133 	 * connections must not be made as long as cached connections exist.
134 	 *
135 	 * Safety: don't enable opportunistic session caching unless the queue
136 	 * manager is able to schedule concurrent or back-to-back deliveries
137 	 * (we need to recognize back-to-back deliveries for transports with
138 	 * concurrency 1).
139 	 *
140 	 * If caching has previously been enabled, but is not now, fetch any
141 	 * existing entries from the cache, but don't add new ones.
142 	 */
143 #define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \
144 	    (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY())
145 
146 #define BACK_TO_BACK_DELIVERY() \
147 		(queue->last_done + 1 >= event_time())
148 
149 	/*
150 	 * Turn on session caching after we get up to speed. Don't enable
151 	 * session caching just because we have concurrent deliveries. This
152 	 * prevents unnecessary session caching when we have a burst of mail
153 	 * <= the initial concurrency limit.
154 	 */
155 	if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) {
156 	    if (BACK_TO_BACK_DELIVERY()) {
157 		if (msg_verbose)
158 		    msg_info("%s: allowing on-demand session caching for %s",
159 			     myname, queue->name);
160 		queue->dflags |= DEL_REQ_FLAG_CONN_MASK;
161 	    }
162 	}
163 
164 	/*
165 	 * Turn off session caching when concurrency drops and we're running
166 	 * out of steam. This is what prevents from turning off session
167 	 * caching too early, and from making new connections while old ones
168 	 * are still cached.
169 	 */
170 	else {
171 	    if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) {
172 		if (msg_verbose)
173 		    msg_info("%s: disallowing on-demand session caching for %s",
174 			     myname, queue->name);
175 		queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE;
176 	    }
177 	}
178     }
179     return (entry);
180 }
181 
182 /* qmgr_entry_unselect - unselect queue entry for delivery */
183 
184 void    qmgr_entry_unselect(QMGR_ENTRY *entry)
185 {
186     QMGR_PEER *peer = entry->peer;
187     QMGR_QUEUE *queue = entry->queue;
188 
189     /*
190      * Move the entry back to the todo lists. In case of the peer list, put
191      * it back to the beginning, so the select()/unselect() does not reorder
192      * entries. We use this in qmgr_message_assign() to put recipients into
193      * existing entries when possible.
194      */
195     QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers);
196     queue->busy_refcount--;
197     QMGR_LIST_APPEND(queue->todo, entry, queue_peers);
198     queue->todo_refcount++;
199     QMGR_LIST_PREPEND(peer->entry_list, entry, peer_peers);
200     peer->job->selected_entries--;
201 }
202 
203 /* qmgr_entry_move_todo - move entry between todo queues */
204 
205 void    qmgr_entry_move_todo(QMGR_QUEUE *dst_queue, QMGR_ENTRY *entry)
206 {
207     const char *myname = "qmgr_entry_move_todo";
208     QMGR_TRANSPORT *dst_transport = dst_queue->transport;
209     QMGR_MESSAGE *message = entry->message;
210     QMGR_QUEUE *src_queue = entry->queue;
211     QMGR_PEER *dst_peer, *src_peer = entry->peer;
212     QMGR_JOB *dst_job, *src_job = src_peer->job;
213     QMGR_ENTRY *new_entry;
214     int     rcpt_count = entry->rcpt_list.len;
215 
216     if (entry->stream != 0)
217 	msg_panic("%s: queue %s entry is busy", myname, src_queue->name);
218     if (QMGR_QUEUE_THROTTLED(dst_queue))
219 	msg_panic("%s: destination queue %s is throttled", myname, dst_queue->name);
220     if (QMGR_TRANSPORT_THROTTLED(dst_transport))
221 	msg_panic("%s: destination transport %s is throttled",
222 		  myname, dst_transport->name);
223 
224     /*
225      * Create new entry, swap the recipients between the two entries,
226      * adjusting the job counters accordingly, then dispose of the old entry.
227      *
228      * Note that qmgr_entry_done() will also take care of adjusting the
229      * recipient limits of all the message jobs, so we do not have to do that
230      * explicitly for the new job here.
231      *
232      * XXX This does not enforce the per-entry recipient limit, but that is not
233      * a problem as long as qmgr_entry_move_todo() is called only to bounce
234      * or defer mail.
235      */
236     dst_job = qmgr_job_obtain(message, dst_transport);
237     dst_peer = qmgr_peer_obtain(dst_job, dst_queue);
238 
239     new_entry = qmgr_entry_create(dst_peer, message);
240 
241     recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list);
242 
243     src_job->rcpt_count -= rcpt_count;
244     dst_job->rcpt_count += rcpt_count;
245 
246     qmgr_entry_done(entry, QMGR_QUEUE_TODO);
247 }
248 
249 /* qmgr_entry_done - dispose of queue entry */
250 
251 void    qmgr_entry_done(QMGR_ENTRY *entry, int which)
252 {
253     const char *myname = "qmgr_entry_done";
254     QMGR_QUEUE *queue = entry->queue;
255     QMGR_MESSAGE *message = entry->message;
256     QMGR_PEER *peer = entry->peer;
257     QMGR_JOB *sponsor, *job = peer->job;
258     QMGR_TRANSPORT *transport = job->transport;
259 
260     /*
261      * Take this entry off the in-core queue.
262      */
263     if (entry->stream != 0)
264 	msg_panic("%s: file is open", myname);
265     if (which == QMGR_QUEUE_BUSY) {
266 	QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers);
267 	queue->busy_refcount--;
268     } else if (which == QMGR_QUEUE_TODO) {
269 	QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers);
270 	job->selected_entries++;
271 	QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers);
272 	queue->todo_refcount--;
273     } else {
274 	msg_panic("%s: bad queue spec: %d", myname, which);
275     }
276 
277     /*
278      * Decrease the in-core recipient counts and free the recipient list and
279      * the structure itself.
280      */
281     job->rcpt_count -= entry->rcpt_list.len;
282     message->rcpt_count -= entry->rcpt_list.len;
283     qmgr_recipient_count -= entry->rcpt_list.len;
284     recipient_list_free(&entry->rcpt_list);
285     myfree((char *) entry);
286 
287     /*
288      * Make sure that the transport of any retired or finishing job that
289      * donated recipient slots to this message gets them back first. Then, if
290      * possible, pass the remaining unused recipient slots to the next job on
291      * the job list.
292      */
293     for (sponsor = message->job_list.next; sponsor; sponsor = sponsor->message_peers.next) {
294 	if (sponsor->rcpt_count >= sponsor->rcpt_limit || sponsor == job)
295 	    continue;
296 	if (sponsor->stack_level < 0 || message->rcpt_offset == 0)
297 	    qmgr_job_move_limits(sponsor);
298     }
299     if (message->rcpt_offset == 0) {
300 	qmgr_job_move_limits(job);
301     }
302 
303     /*
304      * We implement a rate-limited queue by emulating a slow delivery
305      * channel. We insert the artificial delays with qmgr_queue_suspend().
306      *
307      * When a queue is suspended, we must postpone any job scheduling decisions
308      * until the queue is resumed. Otherwise, we make those decisions now.
309      * The job scheduling decisions are made by qmgr_job_blocker_update().
310      */
311     if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
312 	if (queue->window > 1)
313 	    msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
314 		      myname, transport->name, queue->name, queue->window);
315 	if (QMGR_QUEUE_THROTTLED(queue))	/* XXX */
316 	    qmgr_queue_unthrottle(queue);
317 	if (QMGR_QUEUE_READY(queue))
318 	    qmgr_queue_suspend(queue, transport->rate_delay);
319     }
320     if (!QMGR_QUEUE_SUSPENDED(queue)
321 	&& queue->blocker_tag == transport->blocker_tag)
322 	qmgr_job_blocker_update(queue);
323 
324     /*
325      * When there are no more entries for this peer, discard the peer
326      * structure.
327      */
328     peer->refcount--;
329     if (peer->refcount == 0)
330 	qmgr_peer_free(peer);
331 
332     /*
333      * Maintain back-to-back delivery status.
334      */
335     if (which == QMGR_QUEUE_BUSY)
336 	queue->last_done = event_time();
337 
338     /*
339      * When the in-core queue for this site is empty and when this site is
340      * not dead or suspended, discard the in-core queue. When this site is
341      * dead, but the number of in-core queues exceeds some threshold, get rid
342      * of this in-core queue anyway, in order to avoid running out of memory.
343      */
344     if (queue->todo.next == 0 && queue->busy.next == 0) {
345 	if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit)
346 	    qmgr_queue_unthrottle(queue);
347 	if (QMGR_QUEUE_READY(queue))
348 	    qmgr_queue_done(queue);
349     }
350 
351     /*
352      * Update the in-core message reference count. When the in-core message
353      * structure has no more references, dispose of the message.
354      */
355     message->refcount--;
356     if (message->refcount == 0)
357 	qmgr_active_done(message);
358 }
359 
360 /* qmgr_entry_create - create queue todo entry */
361 
362 QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *peer, QMGR_MESSAGE *message)
363 {
364     QMGR_ENTRY *entry;
365     QMGR_QUEUE *queue = peer->queue;
366 
367     /*
368      * Sanity check.
369      */
370     if (QMGR_QUEUE_THROTTLED(queue))
371 	msg_panic("qmgr_entry_create: dead queue: %s", queue->name);
372 
373     /*
374      * Create the delivery request.
375      */
376     entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));
377     entry->stream = 0;
378     entry->message = message;
379     recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE);
380     message->refcount++;
381     entry->peer = peer;
382     QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers);
383     peer->refcount++;
384     entry->queue = queue;
385     QMGR_LIST_APPEND(queue->todo, entry, queue_peers);
386     queue->todo_refcount++;
387     peer->job->read_entries++;
388 
389     /*
390      * Warn if a destination is falling behind while the active queue
391      * contains a non-trivial amount of single-recipient email. When a
392      * destination takes up more and more space in the active queue, then
393      * other mail will not get through and delivery performance will suffer.
394      *
395      * XXX At this point in the code, the busy reference count is still less
396      * than the concurrency limit (otherwise this code would not be invoked
397      * in the first place) so we have to make make some awkward adjustments
398      * below.
399      *
400      * XXX The queue length test below looks at the active queue share of an
401      * individual destination. This catches the case where mail for one
402      * destination is falling behind because it has to round-robin compete
403      * with many other destinations. However, Postfix will also perform
404      * poorly when most of the active queue is tied up by a small number of
405      * concurrency limited destinations. The queue length test below detects
406      * such conditions only indirectly.
407      *
408      * XXX This code does not detect the case that the active queue is being
409      * starved because incoming mail is pounding the disk.
410      */
411     if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) {
412 	int     queue_length = queue->todo_refcount + queue->busy_refcount;
413 	time_t  now;
414 	QMGR_TRANSPORT *transport;
415 	double  active_share;
416 
417 	if (queue_length > var_qmgr_active_limit / 5
418 	    && (now = event_time()) >= queue->clog_time_to_warn) {
419 	    active_share = queue_length / (double) qmgr_message_count;
420 	    msg_warn("mail for %s is using up %d of %d active queue entries",
421 		     queue->nexthop, queue_length, qmgr_message_count);
422 	    if (active_share < 0.9)
423 		msg_warn("this may slow down other mail deliveries");
424 	    transport = queue->transport;
425 	    if (transport->dest_concurrency_limit > 0
426 	    && transport->dest_concurrency_limit <= queue->busy_refcount + 1)
427 		msg_warn("you may need to increase the main.cf %s%s from %d",
428 			 transport->name, _DEST_CON_LIMIT,
429 			 transport->dest_concurrency_limit);
430 	    else if (queue->window > var_qmgr_active_limit * active_share)
431 		msg_warn("you may need to increase the main.cf %s from %d",
432 			 VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit);
433 	    else if (queue->peers.next != queue->peers.prev)
434 		msg_warn("you may need a separate master.cf transport for %s",
435 			 queue->nexthop);
436 	    else {
437 		msg_warn("you may need to reduce %s connect and helo timeouts",
438 			 transport->name);
439 		msg_warn("so that Postfix quickly skips unavailable hosts");
440 		msg_warn("you may need to increase the main.cf %s and %s",
441 			 VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME);
442 		msg_warn("so that Postfix wastes less time on undeliverable mail");
443 		msg_warn("you may need to increase the master.cf %s process limit",
444 			 transport->name);
445 	    }
446 	    msg_warn("please avoid flushing the whole queue when you have");
447 	    msg_warn("lots of deferred mail, that is bad for performance");
448 	    msg_warn("to turn off these warnings specify: %s = 0",
449 		     VAR_QMGR_CLOG_WARN_TIME);
450 	    queue->clog_time_to_warn = now + var_qmgr_clog_warn_time;
451 	}
452     }
453     return (entry);
454 }
455