1 /*	$NetBSD: qmgr_transport.c,v 1.1.1.1 2009/06/23 10:08:50 tron Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_transport 3
6 /* SUMMARY
7 /*	per-transport data structures
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	QMGR_TRANSPORT *qmgr_transport_create(name)
12 /*	const char *name;
13 /*
14 /*	QMGR_TRANSPORT *qmgr_transport_find(name)
15 /*	const char *name;
16 /*
17 /*	QMGR_TRANSPORT *qmgr_transport_select()
18 /*
19 /*	void	qmgr_transport_alloc(transport, notify)
20 /*	QMGR_TRANSPORT *transport;
21 /*	void	(*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp);
22 /*
23 /*	void	qmgr_transport_throttle(transport, dsn)
24 /*	QMGR_TRANSPORT *transport;
25 /*	DSN	*dsn;
26 /*
27 /*	void	qmgr_transport_unthrottle(transport)
28 /*	QMGR_TRANSPORT *transport;
29 /* DESCRIPTION
30 /*	This module organizes the world by message transport type.
31 /*	Each transport can have zero or more destination queues
32 /*	associated with it.
33 /*
34 /*	qmgr_transport_create() instantiates a data structure for the
35 /*	named transport type.
36 /*
37 /*	qmgr_transport_find() looks up an existing message transport
38 /*	data structure.
39 /*
40 /*	qmgr_transport_select() attempts to find a transport that
41 /*	has messages pending delivery.  This routine implements
42 /*	round-robin search among transports.
43 /*
44 /*	qmgr_transport_alloc() allocates a delivery process for the
45 /*	specified transport type. Allocation is performed asynchronously.
46 /*	When a process becomes available, the application callback routine
47 /*	is invoked with as arguments the transport and a stream that
48 /*	is connected to a delivery process. It is an error to call
49 /*	qmgr_transport_alloc() while delivery process allocation for
50 /*	the same transport is in progress.
51 /*
52 /*	qmgr_transport_throttle blocks further allocation of delivery
53 /*	processes for the named transport. Attempts to throttle a
54 /*	throttled transport are ignored.
55 /*
56 /*	qmgr_transport_unthrottle() undoes qmgr_transport_throttle().
57 /*	Attempts to unthrottle a non-throttled transport are ignored.
58 /* DIAGNOSTICS
59 /*	Panic: consistency check failure. Fatal: out of memory.
60 /* LICENSE
61 /* .ad
62 /* .fi
63 /*	The Secure Mailer license must be distributed with this software.
64 /* AUTHOR(S)
65 /*	Wietse Venema
66 /*	IBM T.J. Watson Research
67 /*	P.O. Box 704
68 /*	Yorktown Heights, NY 10598, USA
69 /*--*/
70 
71 /* System library. */
72 
73 #include <sys_defs.h>
74 #include <unistd.h>
75 
76 #include <sys/time.h>			/* FD_SETSIZE */
77 #include <sys/types.h>			/* FD_SETSIZE */
78 #include <unistd.h>			/* FD_SETSIZE */
79 
80 #ifdef USE_SYS_SELECT_H
81 #include <sys/select.h>			/* FD_SETSIZE */
82 #endif
83 
84 /* Utility library. */
85 
86 #include <msg.h>
87 #include <htable.h>
88 #include <events.h>
89 #include <mymalloc.h>
90 #include <vstream.h>
91 #include <iostuff.h>
92 
93 /* Global library. */
94 
95 #include <mail_proto.h>
96 #include <recipient_list.h>
97 #include <mail_conf.h>
98 #include <mail_params.h>
99 
100 /* Application-specific. */
101 
102 #include "qmgr.h"
103 
104 HTABLE *qmgr_transport_byname;		/* transport by name */
105 QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */
106 
107  /*
108   * A local structure to remember a delivery process allocation request.
109   */
110 typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC;
111 
112 struct QMGR_TRANSPORT_ALLOC {
113     QMGR_TRANSPORT *transport;		/* transport context */
114     VSTREAM *stream;			/* delivery service stream */
115     QMGR_TRANSPORT_ALLOC_NOTIFY notify;	/* application call-back routine */
116 };
117 
118  /*
119   * Connections to delivery agents are managed asynchronously. Each delivery
120   * agent connection goes through multiple wait states:
121   *
122   * - With Linux/Solaris and old queue manager implementations only, wait for
123   * the server to invoke accept().
124   *
125   * - Wait for the delivery agent's announcement that it is ready to receive a
126   * delivery request.
127   *
128   * - Wait for the delivery request completion status.
129   *
130   * Older queue manager implementations had only one pending delivery agent
131   * connection per transport. With low-latency destinations, the output rates
132   * were reduced on Linux/Solaris systems that had the extra wait state.
133   *
134   * To maximize delivery agent output rates with low-latency destinations, the
135   * following changes were made to the queue manager by the end of the 2.4
136   * development cycle:
137   *
138   * - The Linux/Solaris accept() wait state was eliminated.
139   *
140   * - A pipeline was implemented for pending delivery agent connections. The
141   * number of pending delivery agent connections was increased from one to
142   * two: the number of before-delivery wait states, plus one extra pipeline
143   * slot to prevent the pipeline from stalling easily. Increasing the
144   * pipeline much further actually hurt performance.
145   *
146   * - To reduce queue manager disk competition with delivery agents, the queue
147   * scanning algorithm was modified to import only one message per interrupt.
148   * The incoming and deferred queue scans now happen on alternate interrupts.
149   *
150   * Simplistically reasoned, a non-zero (incoming + active) queue length is
151   * equivalent to a time shift for mail deliveries; this is undesirable when
152   * delivery agents are not fully utilized.
153   *
154   * On the other hand a non-empty active queue is what allows us to do clever
155   * things such as queue file prefetch, concurrency windows, and connection
156   * caching; the idea is that such "thinking time" is affordable only after
157   * the output channels are maxed out.
158   */
159 #ifndef QMGR_TRANSPORT_MAX_PEND
160 #define QMGR_TRANSPORT_MAX_PEND	2
161 #endif
162 
163 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
164 
165 static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
166 {
167     qmgr_transport_unthrottle((QMGR_TRANSPORT *) context);
168 }
169 
170 /* qmgr_transport_unthrottle - open the throttle */
171 
172 void    qmgr_transport_unthrottle(QMGR_TRANSPORT *transport)
173 {
174     const char *myname = "qmgr_transport_unthrottle";
175 
176     /*
177      * This routine runs after expiration of the timer set by
178      * qmgr_transport_throttle(), or whenever a delivery transport has been
179      * used without malfunction. In either case, we enable delivery again if
180      * the transport was blocked, otherwise the request is ignored.
181      */
182     if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) {
183 	if (msg_verbose)
184 	    msg_info("%s: transport %s", myname, transport->name);
185 	transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD;
186 	if (transport->dsn == 0)
187 	    msg_panic("%s: transport %s: null reason",
188 		      myname, transport->name);
189 	dsn_free(transport->dsn);
190 	transport->dsn = 0;
191 	event_cancel_timer(qmgr_transport_unthrottle_wrapper,
192 			   (char *) transport);
193     }
194 }
195 
196 /* qmgr_transport_throttle - disable delivery process allocation */
197 
198 void    qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
199 {
200     const char *myname = "qmgr_transport_throttle";
201 
202     /*
203      * We are unable to connect to a deliver process for this type of message
204      * transport. Instead of hosing the system by retrying in a tight loop,
205      * back off and disable this transport type for a while.
206      */
207     if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) {
208 	if (msg_verbose)
209 	    msg_info("%s: transport %s: status: %s reason: %s",
210 		     myname, transport->name, dsn->status, dsn->reason);
211 	transport->flags |= QMGR_TRANSPORT_STAT_DEAD;
212 	if (transport->dsn)
213 	    msg_panic("%s: transport %s: spurious reason: %s",
214 		      myname, transport->name, transport->dsn->reason);
215 	transport->dsn = DSN_COPY(dsn);
216 	event_request_timer(qmgr_transport_unthrottle_wrapper,
217 			    (char *) transport, var_transport_retry_time);
218     }
219 }
220 
221 /* qmgr_transport_abort - transport connect watchdog */
222 
223 static void qmgr_transport_abort(int unused_event, char *context)
224 {
225     QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
226 
227     msg_fatal("timeout connecting to transport: %s", alloc->transport->name);
228 }
229 
230 /* qmgr_transport_event - delivery process availability notice */
231 
232 static void qmgr_transport_event(int unused_event, char *context)
233 {
234     QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
235 
236     /*
237      * This routine notifies the application when the request given to
238      * qmgr_transport_alloc() completes.
239      */
240     if (msg_verbose)
241 	msg_info("transport_event: %s", alloc->transport->name);
242 
243     /*
244      * Connection request completed. Stop the watchdog timer.
245      */
246     event_cancel_timer(qmgr_transport_abort, context);
247 
248     /*
249      * Disable further read events that end up calling this function, and
250      * free up this pending connection pipeline slot.
251      */
252     if (alloc->stream) {
253 	event_disable_readwrite(vstream_fileno(alloc->stream));
254 	non_blocking(vstream_fileno(alloc->stream), BLOCKING);
255     }
256     alloc->transport->pending -= 1;
257 
258     /*
259      * Notify the requestor.
260      */
261     alloc->notify(alloc->transport, alloc->stream);
262     myfree((char *) alloc);
263 }
264 
265 /* qmgr_transport_select - select transport for allocation */
266 
267 QMGR_TRANSPORT *qmgr_transport_select(void)
268 {
269     QMGR_TRANSPORT *xport;
270     QMGR_QUEUE *queue;
271     int     need;
272 
273     /*
274      * If we find a suitable transport, rotate the list of transports to
275      * effectuate round-robin selection. See similar selection code in
276      * qmgr_queue_select().
277      *
278      * This function is called repeatedly until all transports have maxed out
279      * the number of pending delivery agent connections, until all delivery
280      * agent concurrency windows are maxed out, or until we run out of "todo"
281      * queue entries.
282      */
283 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
284 
285     for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
286 	if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
287 	    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
288 	    continue;
289 	need = xport->pending + 1;
290 	for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
291 	    if (QMGR_QUEUE_READY(queue) == 0)
292 		continue;
293 	    if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
294 					  queue->todo_refcount)) <= 0) {
295 		QMGR_LIST_ROTATE(qmgr_transport_list, xport);
296 		if (msg_verbose)
297 		    msg_info("qmgr_transport_select: %s", xport->name);
298 		return (xport);
299 	    }
300 	}
301     }
302     return (0);
303 }
304 
305 /* qmgr_transport_alloc - allocate delivery process */
306 
307 void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
308 {
309     QMGR_TRANSPORT_ALLOC *alloc;
310 
311     /*
312      * Sanity checks.
313      */
314     if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
315 	msg_panic("qmgr_transport: dead transport: %s", transport->name);
316     if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
317 	msg_panic("qmgr_transport: excess allocation: %s", transport->name);
318 
319     /*
320      * Connect to the well-known port for this delivery service, and wake up
321      * when a process announces its availability. Allow only a limited number
322      * of delivery process allocation attempts for this transport. In case of
323      * problems, back off. Do not hose the system when it is in trouble
324      * already.
325      *
326      * Use non-blocking connect(), so that Linux won't block the queue manager
327      * until the delivery agent calls accept().
328      *
329      * When the connection to delivery agent cannot be completed, notify the
330      * event handler so that it can throttle the transport and defer the todo
331      * queues, just like it does when communication fails *after* connection
332      * completion.
333      *
334      * Before Postfix 2.4, the event handler was not invoked after connect()
335      * error, and mail was not deferred. Because of this, mail would be stuck
336      * in the active queue after triggering a "connection refused" condition.
337      */
338     alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
339     alloc->transport = transport;
340     alloc->notify = notify;
341     transport->pending += 1;
342     if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
343 				      NON_BLOCKING)) == 0) {
344 	msg_warn("connect to transport %s/%s: %m",
345 		 MAIL_CLASS_PRIVATE, transport->name);
346 	event_request_timer(qmgr_transport_event, (char *) alloc, 0);
347 	return;
348     }
349 #if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD)
350 #ifndef THRESHOLD_FD_WORKAROUND
351 #define THRESHOLD_FD_WORKAROUND 128
352 #endif
353     vstream_control(alloc->stream,
354 		    VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND,
355 		    VSTREAM_CTL_END);
356 #endif
357     event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
358 		      (char *) alloc);
359 
360     /*
361      * Guard against broken systems.
362      */
363     event_request_timer(qmgr_transport_abort, (char *) alloc,
364 			var_daemon_timeout);
365 }
366 
367 /* qmgr_transport_create - create transport instance */
368 
369 QMGR_TRANSPORT *qmgr_transport_create(const char *name)
370 {
371     QMGR_TRANSPORT *transport;
372 
373     if (htable_find(qmgr_transport_byname, name) != 0)
374 	msg_panic("qmgr_transport_create: transport exists: %s", name);
375     transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
376     transport->flags = 0;
377     transport->pending = 0;
378     transport->name = mystrdup(name);
379 
380     /*
381      * Use global configuration settings or transport-specific settings.
382      */
383     transport->dest_concurrency_limit =
384 	get_mail_conf_int2(name, _DEST_CON_LIMIT,
385 			   var_dest_con_limit, 0, 0);
386     transport->recipient_limit =
387 	get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
388 			   var_dest_rcpt_limit, 0, 0);
389     transport->init_dest_concurrency =
390 	get_mail_conf_int2(name, _INIT_DEST_CON,
391 			   var_init_dest_concurrency, 1, 0);
392     transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
393 						var_dest_rate_delay,
394 						's', 0, 0);
395 
396     if (transport->rate_delay > 0)
397 	transport->dest_concurrency_limit = 1;
398     if (transport->dest_concurrency_limit != 0
399     && transport->dest_concurrency_limit < transport->init_dest_concurrency)
400 	transport->init_dest_concurrency = transport->dest_concurrency_limit;
401 
402     transport->queue_byname = htable_create(0);
403     QMGR_LIST_INIT(transport->queue_list);
404     transport->dsn = 0;
405     qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
406 		       VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
407     qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
408 		       VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
409     transport->fail_cohort_limit =
410 	get_mail_conf_int2(name, _CONC_COHORT_LIM,
411 			   var_conc_cohort_limit, 0, 0);
412     if (qmgr_transport_byname == 0)
413 	qmgr_transport_byname = htable_create(10);
414     htable_enter(qmgr_transport_byname, name, (char *) transport);
415     QMGR_LIST_APPEND(qmgr_transport_list, transport);
416     if (msg_verbose)
417 	msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
418 		 transport->name, transport->dest_concurrency_limit,
419 		 transport->recipient_limit);
420     return (transport);
421 }
422 
423 /* qmgr_transport_find - find transport instance */
424 
425 QMGR_TRANSPORT *qmgr_transport_find(const char *name)
426 {
427     return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name));
428 }
429