1 /* $NetBSD: qmgr_transport.c,v 1.1.1.1 2009/06/23 10:08:50 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_transport 3 6 /* SUMMARY 7 /* per-transport data structures 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* QMGR_TRANSPORT *qmgr_transport_create(name) 12 /* const char *name; 13 /* 14 /* QMGR_TRANSPORT *qmgr_transport_find(name) 15 /* const char *name; 16 /* 17 /* QMGR_TRANSPORT *qmgr_transport_select() 18 /* 19 /* void qmgr_transport_alloc(transport, notify) 20 /* QMGR_TRANSPORT *transport; 21 /* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp); 22 /* 23 /* void qmgr_transport_throttle(transport, dsn) 24 /* QMGR_TRANSPORT *transport; 25 /* DSN *dsn; 26 /* 27 /* void qmgr_transport_unthrottle(transport) 28 /* QMGR_TRANSPORT *transport; 29 /* DESCRIPTION 30 /* This module organizes the world by message transport type. 31 /* Each transport can have zero or more destination queues 32 /* associated with it. 33 /* 34 /* qmgr_transport_create() instantiates a data structure for the 35 /* named transport type. 36 /* 37 /* qmgr_transport_find() looks up an existing message transport 38 /* data structure. 39 /* 40 /* qmgr_transport_select() attempts to find a transport that 41 /* has messages pending delivery. This routine implements 42 /* round-robin search among transports. 43 /* 44 /* qmgr_transport_alloc() allocates a delivery process for the 45 /* specified transport type. Allocation is performed asynchronously. 46 /* When a process becomes available, the application callback routine 47 /* is invoked with as arguments the transport and a stream that 48 /* is connected to a delivery process. It is an error to call 49 /* qmgr_transport_alloc() while delivery process allocation for 50 /* the same transport is in progress. 51 /* 52 /* qmgr_transport_throttle blocks further allocation of delivery 53 /* processes for the named transport. Attempts to throttle a 54 /* throttled transport are ignored. 55 /* 56 /* qmgr_transport_unthrottle() undoes qmgr_transport_throttle(). 57 /* Attempts to unthrottle a non-throttled transport are ignored. 58 /* DIAGNOSTICS 59 /* Panic: consistency check failure. Fatal: out of memory. 60 /* LICENSE 61 /* .ad 62 /* .fi 63 /* The Secure Mailer license must be distributed with this software. 64 /* AUTHOR(S) 65 /* Wietse Venema 66 /* IBM T.J. Watson Research 67 /* P.O. Box 704 68 /* Yorktown Heights, NY 10598, USA 69 /*--*/ 70 71 /* System library. */ 72 73 #include <sys_defs.h> 74 #include <unistd.h> 75 76 #include <sys/time.h> /* FD_SETSIZE */ 77 #include <sys/types.h> /* FD_SETSIZE */ 78 #include <unistd.h> /* FD_SETSIZE */ 79 80 #ifdef USE_SYS_SELECT_H 81 #include <sys/select.h> /* FD_SETSIZE */ 82 #endif 83 84 /* Utility library. */ 85 86 #include <msg.h> 87 #include <htable.h> 88 #include <events.h> 89 #include <mymalloc.h> 90 #include <vstream.h> 91 #include <iostuff.h> 92 93 /* Global library. */ 94 95 #include <mail_proto.h> 96 #include <recipient_list.h> 97 #include <mail_conf.h> 98 #include <mail_params.h> 99 100 /* Application-specific. */ 101 102 #include "qmgr.h" 103 104 HTABLE *qmgr_transport_byname; /* transport by name */ 105 QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */ 106 107 /* 108 * A local structure to remember a delivery process allocation request. 109 */ 110 typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC; 111 112 struct QMGR_TRANSPORT_ALLOC { 113 QMGR_TRANSPORT *transport; /* transport context */ 114 VSTREAM *stream; /* delivery service stream */ 115 QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ 116 }; 117 118 /* 119 * Connections to delivery agents are managed asynchronously. Each delivery 120 * agent connection goes through multiple wait states: 121 * 122 * - With Linux/Solaris and old queue manager implementations only, wait for 123 * the server to invoke accept(). 124 * 125 * - Wait for the delivery agent's announcement that it is ready to receive a 126 * delivery request. 127 * 128 * - Wait for the delivery request completion status. 129 * 130 * Older queue manager implementations had only one pending delivery agent 131 * connection per transport. With low-latency destinations, the output rates 132 * were reduced on Linux/Solaris systems that had the extra wait state. 133 * 134 * To maximize delivery agent output rates with low-latency destinations, the 135 * following changes were made to the queue manager by the end of the 2.4 136 * development cycle: 137 * 138 * - The Linux/Solaris accept() wait state was eliminated. 139 * 140 * - A pipeline was implemented for pending delivery agent connections. The 141 * number of pending delivery agent connections was increased from one to 142 * two: the number of before-delivery wait states, plus one extra pipeline 143 * slot to prevent the pipeline from stalling easily. Increasing the 144 * pipeline much further actually hurt performance. 145 * 146 * - To reduce queue manager disk competition with delivery agents, the queue 147 * scanning algorithm was modified to import only one message per interrupt. 148 * The incoming and deferred queue scans now happen on alternate interrupts. 149 * 150 * Simplistically reasoned, a non-zero (incoming + active) queue length is 151 * equivalent to a time shift for mail deliveries; this is undesirable when 152 * delivery agents are not fully utilized. 153 * 154 * On the other hand a non-empty active queue is what allows us to do clever 155 * things such as queue file prefetch, concurrency windows, and connection 156 * caching; the idea is that such "thinking time" is affordable only after 157 * the output channels are maxed out. 158 */ 159 #ifndef QMGR_TRANSPORT_MAX_PEND 160 #define QMGR_TRANSPORT_MAX_PEND 2 161 #endif 162 163 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ 164 165 static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context) 166 { 167 qmgr_transport_unthrottle((QMGR_TRANSPORT *) context); 168 } 169 170 /* qmgr_transport_unthrottle - open the throttle */ 171 172 void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport) 173 { 174 const char *myname = "qmgr_transport_unthrottle"; 175 176 /* 177 * This routine runs after expiration of the timer set by 178 * qmgr_transport_throttle(), or whenever a delivery transport has been 179 * used without malfunction. In either case, we enable delivery again if 180 * the transport was blocked, otherwise the request is ignored. 181 */ 182 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) { 183 if (msg_verbose) 184 msg_info("%s: transport %s", myname, transport->name); 185 transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD; 186 if (transport->dsn == 0) 187 msg_panic("%s: transport %s: null reason", 188 myname, transport->name); 189 dsn_free(transport->dsn); 190 transport->dsn = 0; 191 event_cancel_timer(qmgr_transport_unthrottle_wrapper, 192 (char *) transport); 193 } 194 } 195 196 /* qmgr_transport_throttle - disable delivery process allocation */ 197 198 void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn) 199 { 200 const char *myname = "qmgr_transport_throttle"; 201 202 /* 203 * We are unable to connect to a deliver process for this type of message 204 * transport. Instead of hosing the system by retrying in a tight loop, 205 * back off and disable this transport type for a while. 206 */ 207 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) { 208 if (msg_verbose) 209 msg_info("%s: transport %s: status: %s reason: %s", 210 myname, transport->name, dsn->status, dsn->reason); 211 transport->flags |= QMGR_TRANSPORT_STAT_DEAD; 212 if (transport->dsn) 213 msg_panic("%s: transport %s: spurious reason: %s", 214 myname, transport->name, transport->dsn->reason); 215 transport->dsn = DSN_COPY(dsn); 216 event_request_timer(qmgr_transport_unthrottle_wrapper, 217 (char *) transport, var_transport_retry_time); 218 } 219 } 220 221 /* qmgr_transport_abort - transport connect watchdog */ 222 223 static void qmgr_transport_abort(int unused_event, char *context) 224 { 225 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 226 227 msg_fatal("timeout connecting to transport: %s", alloc->transport->name); 228 } 229 230 /* qmgr_transport_event - delivery process availability notice */ 231 232 static void qmgr_transport_event(int unused_event, char *context) 233 { 234 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 235 236 /* 237 * This routine notifies the application when the request given to 238 * qmgr_transport_alloc() completes. 239 */ 240 if (msg_verbose) 241 msg_info("transport_event: %s", alloc->transport->name); 242 243 /* 244 * Connection request completed. Stop the watchdog timer. 245 */ 246 event_cancel_timer(qmgr_transport_abort, context); 247 248 /* 249 * Disable further read events that end up calling this function, and 250 * free up this pending connection pipeline slot. 251 */ 252 if (alloc->stream) { 253 event_disable_readwrite(vstream_fileno(alloc->stream)); 254 non_blocking(vstream_fileno(alloc->stream), BLOCKING); 255 } 256 alloc->transport->pending -= 1; 257 258 /* 259 * Notify the requestor. 260 */ 261 alloc->notify(alloc->transport, alloc->stream); 262 myfree((char *) alloc); 263 } 264 265 /* qmgr_transport_select - select transport for allocation */ 266 267 QMGR_TRANSPORT *qmgr_transport_select(void) 268 { 269 QMGR_TRANSPORT *xport; 270 QMGR_QUEUE *queue; 271 int need; 272 273 /* 274 * If we find a suitable transport, rotate the list of transports to 275 * effectuate round-robin selection. See similar selection code in 276 * qmgr_queue_select(). 277 * 278 * This function is called repeatedly until all transports have maxed out 279 * the number of pending delivery agent connections, until all delivery 280 * agent concurrency windows are maxed out, or until we run out of "todo" 281 * queue entries. 282 */ 283 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) 284 285 for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { 286 if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 287 || xport->pending >= QMGR_TRANSPORT_MAX_PEND) 288 continue; 289 need = xport->pending + 1; 290 for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { 291 if (QMGR_QUEUE_READY(queue) == 0) 292 continue; 293 if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, 294 queue->todo_refcount)) <= 0) { 295 QMGR_LIST_ROTATE(qmgr_transport_list, xport); 296 if (msg_verbose) 297 msg_info("qmgr_transport_select: %s", xport->name); 298 return (xport); 299 } 300 } 301 } 302 return (0); 303 } 304 305 /* qmgr_transport_alloc - allocate delivery process */ 306 307 void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) 308 { 309 QMGR_TRANSPORT_ALLOC *alloc; 310 311 /* 312 * Sanity checks. 313 */ 314 if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) 315 msg_panic("qmgr_transport: dead transport: %s", transport->name); 316 if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) 317 msg_panic("qmgr_transport: excess allocation: %s", transport->name); 318 319 /* 320 * Connect to the well-known port for this delivery service, and wake up 321 * when a process announces its availability. Allow only a limited number 322 * of delivery process allocation attempts for this transport. In case of 323 * problems, back off. Do not hose the system when it is in trouble 324 * already. 325 * 326 * Use non-blocking connect(), so that Linux won't block the queue manager 327 * until the delivery agent calls accept(). 328 * 329 * When the connection to delivery agent cannot be completed, notify the 330 * event handler so that it can throttle the transport and defer the todo 331 * queues, just like it does when communication fails *after* connection 332 * completion. 333 * 334 * Before Postfix 2.4, the event handler was not invoked after connect() 335 * error, and mail was not deferred. Because of this, mail would be stuck 336 * in the active queue after triggering a "connection refused" condition. 337 */ 338 alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); 339 alloc->transport = transport; 340 alloc->notify = notify; 341 transport->pending += 1; 342 if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, 343 NON_BLOCKING)) == 0) { 344 msg_warn("connect to transport %s/%s: %m", 345 MAIL_CLASS_PRIVATE, transport->name); 346 event_request_timer(qmgr_transport_event, (char *) alloc, 0); 347 return; 348 } 349 #if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD) 350 #ifndef THRESHOLD_FD_WORKAROUND 351 #define THRESHOLD_FD_WORKAROUND 128 352 #endif 353 vstream_control(alloc->stream, 354 VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND, 355 VSTREAM_CTL_END); 356 #endif 357 event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, 358 (char *) alloc); 359 360 /* 361 * Guard against broken systems. 362 */ 363 event_request_timer(qmgr_transport_abort, (char *) alloc, 364 var_daemon_timeout); 365 } 366 367 /* qmgr_transport_create - create transport instance */ 368 369 QMGR_TRANSPORT *qmgr_transport_create(const char *name) 370 { 371 QMGR_TRANSPORT *transport; 372 373 if (htable_find(qmgr_transport_byname, name) != 0) 374 msg_panic("qmgr_transport_create: transport exists: %s", name); 375 transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); 376 transport->flags = 0; 377 transport->pending = 0; 378 transport->name = mystrdup(name); 379 380 /* 381 * Use global configuration settings or transport-specific settings. 382 */ 383 transport->dest_concurrency_limit = 384 get_mail_conf_int2(name, _DEST_CON_LIMIT, 385 var_dest_con_limit, 0, 0); 386 transport->recipient_limit = 387 get_mail_conf_int2(name, _DEST_RCPT_LIMIT, 388 var_dest_rcpt_limit, 0, 0); 389 transport->init_dest_concurrency = 390 get_mail_conf_int2(name, _INIT_DEST_CON, 391 var_init_dest_concurrency, 1, 0); 392 transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, 393 var_dest_rate_delay, 394 's', 0, 0); 395 396 if (transport->rate_delay > 0) 397 transport->dest_concurrency_limit = 1; 398 if (transport->dest_concurrency_limit != 0 399 && transport->dest_concurrency_limit < transport->init_dest_concurrency) 400 transport->init_dest_concurrency = transport->dest_concurrency_limit; 401 402 transport->queue_byname = htable_create(0); 403 QMGR_LIST_INIT(transport->queue_list); 404 transport->dsn = 0; 405 qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK, 406 VAR_CONC_POS_FDBACK, var_conc_pos_feedback); 407 qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK, 408 VAR_CONC_NEG_FDBACK, var_conc_neg_feedback); 409 transport->fail_cohort_limit = 410 get_mail_conf_int2(name, _CONC_COHORT_LIM, 411 var_conc_cohort_limit, 0, 0); 412 if (qmgr_transport_byname == 0) 413 qmgr_transport_byname = htable_create(10); 414 htable_enter(qmgr_transport_byname, name, (char *) transport); 415 QMGR_LIST_APPEND(qmgr_transport_list, transport); 416 if (msg_verbose) 417 msg_info("qmgr_transport_create: %s concurrency %d recipients %d", 418 transport->name, transport->dest_concurrency_limit, 419 transport->recipient_limit); 420 return (transport); 421 } 422 423 /* qmgr_transport_find - find transport instance */ 424 425 QMGR_TRANSPORT *qmgr_transport_find(const char *name) 426 { 427 return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name)); 428 } 429