1 /*	$NetBSD: qmgr_queue.c,v 1.1.1.1 2009/06/23 10:08:53 tron Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_queue 3
6 /* SUMMARY
7 /*	per-destination queues
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	int	qmgr_queue_count;
12 /*
13 /*	QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
14 /*	QMGR_TRANSPORT *transport;
15 /*	const char *name;
16 /*	const char *nexthop;
17 /*
18 /*	void	qmgr_queue_done(queue)
19 /*	QMGR_QUEUE *queue;
20 /*
21 /*	QMGR_QUEUE *qmgr_queue_find(transport, name)
22 /*	QMGR_TRANSPORT *transport;
23 /*	const char *name;
24 /*
25 /*	void	qmgr_queue_throttle(queue, dsn)
26 /*	QMGR_QUEUE *queue;
27 /*	DSN	*dsn;
28 /*
29 /*	void	qmgr_queue_unthrottle(queue)
30 /*	QMGR_QUEUE *queue;
31 /*
32 /*	void	qmgr_queue_suspend(queue, delay)
33 /*	QMGR_QUEUE *queue;
34 /*	int	delay;
35 /* DESCRIPTION
36 /*	These routines add/delete/manipulate per-destination queues.
37 /*	Each queue corresponds to a specific transport and destination.
38 /*	Each queue has a `todo' list of delivery requests for that
39 /*	destination, and a `busy' list of delivery requests in progress.
40 /*
41 /*	qmgr_queue_count is a global counter for the total number
42 /*	of in-core queue structures.
43 /*
44 /*	qmgr_queue_create() creates an empty named queue for the named
45 /*	transport and destination. The queue is given an initial
46 /*	concurrency limit as specified with the
47 /*	\fIinitial_destination_concurrency\fR configuration parameter,
48 /*	provided that it does not exceed the transport-specific
49 /*	concurrency limit.
50 /*
51 /*	qmgr_queue_done() disposes of a per-destination queue after all
52 /*	its entries have been taken care of. It is an error to dispose
53 /*	of a dead queue.
54 /*
55 /*	qmgr_queue_find() looks up the named queue for the named
56 /*	transport. A null result means that the queue was not found.
57 /*
58 /*	qmgr_queue_throttle() handles a delivery error, and decrements the
59 /*	concurrency limit for the destination, with a lower bound of 1.
60 /*	When the cohort failure bound is reached, qmgr_queue_throttle()
61 /*	sets the concurrency limit to zero and starts a timer
62 /*	to re-enable delivery to the destination after a configurable delay.
63 /*
64 /*	qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
65 /*	The concurrency limit for the destination is incremented,
66 /*	provided that it does not exceed the destination concurrency
67 /*	limit specified for the transport. This routine implements
68 /*	"slow open" mode, and eliminates the "thundering herd" problem.
69 /*
70 /*	qmgr_queue_suspend() suspends delivery for this destination
71 /*	briefly. This function invalidates any scheduling decisions
72 /*	that are based on the present queue's concurrency window.
73 /*	To compensate for work skipped by qmgr_entry_done(), the
74 /*	status of blocker jobs is re-evaluated after the queue is
75 /*	resumed.
76 /* DIAGNOSTICS
77 /*	Panic: consistency check failure.
78 /* LICENSE
79 /* .ad
80 /* .fi
81 /*	The Secure Mailer license must be distributed with this software.
82 /* AUTHOR(S)
83 /*	Wietse Venema
84 /*	IBM T.J. Watson Research
85 /*	P.O. Box 704
86 /*	Yorktown Heights, NY 10598, USA
87 /*
88 /*	Pre-emptive scheduler enhancements:
89 /*	Patrik Rak
90 /*	Modra 6
91 /*	155 00, Prague, Czech Republic
92 /*
93 /*	Concurrency scheduler enhancements with:
94 /*	Victor Duchovni
95 /*	Morgan Stanley
96 /*--*/
97 
98 /* System library. */
99 
100 #include <sys_defs.h>
101 #include <time.h>
102 
103 /* Utility library. */
104 
105 #include <msg.h>
106 #include <mymalloc.h>
107 #include <events.h>
108 #include <htable.h>
109 
110 /* Global library. */
111 
112 #include <mail_params.h>
113 #include <recipient_list.h>
114 #include <mail_proto.h>			/* QMGR_LOG_WINDOW */
115 
116 /* Application-specific. */
117 
118 #include "qmgr.h"
119 
120 int     qmgr_queue_count;
121 
122 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
123 	(strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
124 	    || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
125 
126 #define QMGR_LOG_FEEDBACK(feedback) \
127 	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
128 	    msg_info("%s: feedback %g", myname, feedback);
129 
130 #define QMGR_LOG_WINDOW(queue) \
131 	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
132 	    msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
133 		    myname, queue->name, queue->transport->dest_concurrency_limit, \
134 		    queue->window, queue->success, queue->failure, queue->fail_cohorts);
135 
136 /* qmgr_queue_resume - resume delivery to destination */
137 
138 static void qmgr_queue_resume(int event, char *context)
139 {
140     QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
141     const char *myname = "qmgr_queue_resume";
142 
143     /*
144      * Sanity checks.
145      */
146     if (!QMGR_QUEUE_SUSPENDED(queue))
147 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
148 
149     /*
150      * We can't simply force delivery on this queue: the transport's pending
151      * count may already be maxed out, and there may be other constraints
152      * that definitely should be none of our business. The best we can do is
153      * to play by the same rules as everyone else: let qmgr_active_drain()
154      * and round-robin selection take care of message selection.
155      */
156     queue->window = 1;
157 
158     /*
159      * Every event handler that leaves a queue in the "ready" state should
160      * remove the queue when it is empty.
161      *
162      * XXX Do not omit the redundant test below. It is here to simplify code
163      * consistency checks. The check is trivially eliminated by the compiler
164      * optimizer. There is no need to sacrifice code clarity for the sake of
165      * performance.
166      *
167      * XXX Do not expose the blocker job logic here. Rate-limited queues are not
168      * a performance-critical feature. Here, too, there is no need to sacrifice
169      * code clarity for the sake of performance.
170      */
171     if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
172 	qmgr_queue_done(queue);
173     else
174 	qmgr_job_blocker_update(queue);
175 }
176 
177 /* qmgr_queue_suspend - briefly suspend a destination */
178 
179 void    qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
180 {
181     const char *myname = "qmgr_queue_suspend";
182 
183     /*
184      * Sanity checks.
185      */
186     if (!QMGR_QUEUE_READY(queue))
187 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
188     if (queue->busy_refcount > 0)
189 	msg_panic("%s: queue is busy", myname);
190 
191     /*
192      * Set the queue status to "suspended". No-one is supposed to remove a
193      * queue in suspended state.
194      */
195     queue->window = QMGR_QUEUE_STAT_SUSPENDED;
196     event_request_timer(qmgr_queue_resume, (char *) queue, delay);
197 }
198 
199 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
200 
201 static void qmgr_queue_unthrottle_wrapper(int unused_event, char *context)
202 {
203     QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
204 
205     /*
206      * This routine runs when a wakeup timer goes off; it does not run in the
207      * context of some queue manipulation. Therefore, it is safe to discard
208      * this in-core queue when it is empty and when this site is not dead.
209      */
210     qmgr_queue_unthrottle(queue);
211     if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
212 	qmgr_queue_done(queue);
213 }
214 
215 /* qmgr_queue_unthrottle - give this destination another chance */
216 
217 void    qmgr_queue_unthrottle(QMGR_QUEUE *queue)
218 {
219     const char *myname = "qmgr_queue_unthrottle";
220     QMGR_TRANSPORT *transport = queue->transport;
221     double  feedback;
222 
223     if (msg_verbose)
224 	msg_info("%s: queue %s", myname, queue->name);
225 
226     /*
227      * Sanity checks.
228      */
229     if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue))
230 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
231 
232     /*
233      * Don't restart the negative feedback hysteresis cycle with every
234      * positive feedback. Restart it only when we make a positive concurrency
235      * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
236      * Otherwise negative feedback would be too aggressive: negative feedback
237      * takes effect immediately at the start of its hysteresis cycle.
238      */
239     queue->fail_cohorts = 0;
240 
241     /*
242      * Special case when this site was dead.
243      */
244     if (QMGR_QUEUE_THROTTLED(queue)) {
245 	event_cancel_timer(qmgr_queue_unthrottle_wrapper, (char *) queue);
246 	if (queue->dsn == 0)
247 	    msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
248 	dsn_free(queue->dsn);
249 	queue->dsn = 0;
250 	/* Back from the almost grave, best concurrency is anyone's guess. */
251 	if (queue->busy_refcount > 0)
252 	    queue->window = queue->busy_refcount;
253 	else
254 	    queue->window = transport->init_dest_concurrency;
255 	queue->success = queue->failure = 0;
256 	QMGR_LOG_WINDOW(queue);
257 	return;
258     }
259 
260     /*
261      * Increase the destination's concurrency limit until we reach the
262      * transport's concurrency limit. Allow for a margin the size of the
263      * initial destination concurrency, so that we're not too gentle.
264      *
265      * Why is the concurrency increment based on preferred concurrency and not
266      * on the number of outstanding delivery requests? The latter fluctuates
267      * wildly when deliveries complete in bursts (artificial benchmark
268      * measurements), and does not account for cached connections.
269      *
270      * Keep the window within reasonable distance from actual concurrency
271      * otherwise negative feedback will be ineffective. This expression
272      * assumes that busy_refcount changes gradually. This is invalid when
273      * deliveries complete in bursts (artificial benchmark measurements).
274      */
275     if (transport->dest_concurrency_limit == 0
276 	|| transport->dest_concurrency_limit > queue->window)
277 	if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
278 	    feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
279 	    QMGR_LOG_FEEDBACK(feedback);
280 	    queue->success += feedback;
281 	    /* Prepare for overshoot (feedback > hysteresis, rounding error). */
282 	    while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
283 		queue->window += transport->pos_feedback.hysteresis;
284 		queue->success -= transport->pos_feedback.hysteresis;
285 		queue->failure = 0;
286 	    }
287 	    /* Prepare for overshoot. */
288 	    if (transport->dest_concurrency_limit > 0
289 		&& queue->window > transport->dest_concurrency_limit)
290 		queue->window = transport->dest_concurrency_limit;
291 	}
292     QMGR_LOG_WINDOW(queue);
293 }
294 
295 /* qmgr_queue_throttle - handle destination delivery failure */
296 
297 void    qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
298 {
299     const char *myname = "qmgr_queue_throttle";
300     QMGR_TRANSPORT *transport = queue->transport;
301     double  feedback;
302 
303     /*
304      * Sanity checks.
305      */
306     if (!QMGR_QUEUE_READY(queue))
307 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
308     if (queue->dsn)
309 	msg_panic("%s: queue %s: spurious reason %s",
310 		  myname, queue->name, queue->dsn->reason);
311     if (msg_verbose)
312 	msg_info("%s: queue %s: %s %s",
313 		 myname, queue->name, dsn->status, dsn->reason);
314 
315     /*
316      * Don't restart the positive feedback hysteresis cycle with every
317      * negative feedback. Restart it only when we make a negative concurrency
318      * adjustment (i.e. at the start of a negative feedback hysteresis
319      * cycle). Otherwise positive feedback would be too weak (positive
320      * feedback does not take effect until the end of its hysteresis cycle).
321      */
322 
323     /*
324      * This queue is declared dead after a configurable number of
325      * pseudo-cohort failures.
326      */
327     if (QMGR_QUEUE_READY(queue)) {
328 	queue->fail_cohorts += 1.0 / queue->window;
329 	if (transport->fail_cohort_limit > 0
330 	    && queue->fail_cohorts >= transport->fail_cohort_limit)
331 	    queue->window = QMGR_QUEUE_STAT_THROTTLED;
332     }
333 
334     /*
335      * Decrease the destination's concurrency limit until we reach 1. Base
336      * adjustments on the concurrency limit itself, instead of using the
337      * actual concurrency. The latter fluctuates wildly when deliveries
338      * complete in bursts (artificial benchmark measurements).
339      *
340      * Even after reaching 1, we maintain the negative hysteresis cycle so that
341      * negative feedback can cancel out positive feedback.
342      */
343     if (QMGR_QUEUE_READY(queue)) {
344 	feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
345 	QMGR_LOG_FEEDBACK(feedback);
346 	queue->failure -= feedback;
347 	/* Prepare for overshoot (feedback > hysteresis, rounding error). */
348 	while (queue->failure - feedback / 2 < 0) {
349 	    queue->window -= transport->neg_feedback.hysteresis;
350 	    queue->success = 0;
351 	    queue->failure += transport->neg_feedback.hysteresis;
352 	}
353 	/* Prepare for overshoot. */
354 	if (queue->window < 1)
355 	    queue->window = 1;
356     }
357 
358     /*
359      * Special case for a site that just was declared dead.
360      */
361     if (QMGR_QUEUE_THROTTLED(queue)) {
362 	queue->dsn = DSN_COPY(dsn);
363 	event_request_timer(qmgr_queue_unthrottle_wrapper,
364 			    (char *) queue, var_min_backoff_time);
365 	queue->dflags = 0;
366     }
367     QMGR_LOG_WINDOW(queue);
368 }
369 
370 /* qmgr_queue_done - delete in-core queue for site */
371 
372 void    qmgr_queue_done(QMGR_QUEUE *queue)
373 {
374     const char *myname = "qmgr_queue_done";
375     QMGR_TRANSPORT *transport = queue->transport;
376 
377     /*
378      * Sanity checks. It is an error to delete an in-core queue with pending
379      * messages or timers.
380      */
381     if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
382 	msg_panic("%s: refcount: %d", myname,
383 		  queue->busy_refcount + queue->todo_refcount);
384     if (queue->todo.next || queue->busy.next)
385 	msg_panic("%s: queue not empty: %s", myname, queue->name);
386     if (!QMGR_QUEUE_READY(queue))
387 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
388     if (queue->dsn)
389 	msg_panic("%s: queue %s: spurious reason %s",
390 		  myname, queue->name, queue->dsn->reason);
391 
392     /*
393      * Clean up this in-core queue.
394      */
395     QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers);
396     htable_delete(transport->queue_byname, queue->name, (void (*) (char *)) 0);
397     myfree(queue->name);
398     myfree(queue->nexthop);
399     qmgr_queue_count--;
400     myfree((char *) queue);
401 }
402 
403 /* qmgr_queue_create - create in-core queue for site */
404 
405 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
406 			              const char *nexthop)
407 {
408     QMGR_QUEUE *queue;
409 
410     /*
411      * If possible, choose an initial concurrency of > 1 so that one bad
412      * message or one bad network won't slow us down unnecessarily.
413      */
414 
415     queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
416     qmgr_queue_count++;
417     queue->dflags = 0;
418     queue->last_done = 0;
419     queue->name = mystrdup(name);
420     queue->nexthop = mystrdup(nexthop);
421     queue->todo_refcount = 0;
422     queue->busy_refcount = 0;
423     queue->transport = transport;
424     queue->window = transport->init_dest_concurrency;
425     queue->success = queue->failure = queue->fail_cohorts = 0;
426     QMGR_LIST_INIT(queue->todo);
427     QMGR_LIST_INIT(queue->busy);
428     queue->dsn = 0;
429     queue->clog_time_to_warn = 0;
430     queue->blocker_tag = 0;
431     QMGR_LIST_APPEND(transport->queue_list, queue, peers);
432     htable_enter(transport->queue_byname, name, (char *) queue);
433     return (queue);
434 }
435 
436 /* qmgr_queue_find - find in-core named queue */
437 
438 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
439 {
440     return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
441 }
442