1 /* $NetBSD: qmgr_entry.c,v 1.1.1.1 2009/06/23 10:08:52 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_entry 3 6 /* SUMMARY 7 /* per-site queue entries 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* QMGR_ENTRY *qmgr_entry_create(peer, message) 12 /* QMGR_PEER *peer; 13 /* QMGR_MESSAGE *message; 14 /* 15 /* void qmgr_entry_done(entry, which) 16 /* QMGR_ENTRY *entry; 17 /* int which; 18 /* 19 /* QMGR_ENTRY *qmgr_entry_select(queue) 20 /* QMGR_QUEUE *queue; 21 /* 22 /* void qmgr_entry_unselect(queue, entry) 23 /* QMGR_QUEUE *queue; 24 /* QMGR_ENTRY *entry; 25 /* 26 /* void qmgr_entry_move_todo(dst, entry) 27 /* QMGR_QUEUE *dst; 28 /* QMGR_ENTRY *entry; 29 /* DESCRIPTION 30 /* These routines add/delete/manipulate per-site message 31 /* delivery requests. 32 /* 33 /* qmgr_entry_create() creates an entry for the named peer and message, 34 /* and appends the entry to the peer's list and its queue's todo list. 35 /* Filling in and cleaning up the recipients is the responsibility 36 /* of the caller. 37 /* 38 /* qmgr_entry_done() discards a per-site queue entry. The 39 /* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry 40 /* of the site's `busy' list (i.e. queue entries that have been 41 /* selected for actual delivery), or QMGR_QUEUE_TODO for an entry 42 /* of the site's `todo' list (i.e. queue entries awaiting selection 43 /* for actual delivery). 44 /* 45 /* qmgr_entry_done() discards its peer structure when the peer 46 /* is not referenced anymore. 47 /* 48 /* qmgr_entry_done() triggers cleanup of the per-site queue when 49 /* the site has no pending deliveries, and the site is either 50 /* alive, or the site is dead and the number of in-core queues 51 /* exceeds a configurable limit (see qmgr_queue_done()). 52 /* 53 /* qmgr_entry_done() triggers special action when the last in-core 54 /* queue entry for a message is done with: either read more 55 /* recipients from the queue file, delete the queue file, or move 56 /* the queue file to the deferred queue; send bounce reports to the 57 /* message originator (see qmgr_active_done()). 58 /* 59 /* qmgr_entry_select() selects first entry from the named 60 /* per-site queue's `todo' list for actual delivery. The entry is 61 /* moved to the queue's `busy' list: the list of messages being 62 /* delivered. The entry is also removed from its peer list. 63 /* 64 /* qmgr_entry_unselect() takes the named entry off the named 65 /* per-site queue's `busy' list and moves it to the queue's 66 /* `todo' list. The entry is also prepended to its peer list again. 67 /* 68 /* qmgr_entry_move_todo() moves the specified "todo" queue entry 69 /* to the specified "todo" queue. 70 /* DIAGNOSTICS 71 /* Panic: interface violations, internal inconsistencies. 72 /* LICENSE 73 /* .ad 74 /* .fi 75 /* The Secure Mailer license must be distributed with this software. 76 /* AUTHOR(S) 77 /* Wietse Venema 78 /* IBM T.J. Watson Research 79 /* P.O. Box 704 80 /* Yorktown Heights, NY 10598, USA 81 /* 82 /* Preemptive scheduler enhancements: 83 /* Patrik Rak 84 /* Modra 6 85 /* 155 00, Prague, Czech Republic 86 /*--*/ 87 88 /* System library. */ 89 90 #include <sys_defs.h> 91 #include <stdlib.h> 92 #include <time.h> 93 94 /* Utility library. */ 95 96 #include <msg.h> 97 #include <mymalloc.h> 98 #include <events.h> 99 #include <vstream.h> 100 101 /* Global library. */ 102 103 #include <mail_params.h> 104 #include <deliver_request.h> /* opportunistic session caching */ 105 106 /* Application-specific. */ 107 108 #include "qmgr.h" 109 110 /* qmgr_entry_select - select queue entry for delivery */ 111 112 QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *peer) 113 { 114 const char *myname = "qmgr_entry_select"; 115 QMGR_ENTRY *entry; 116 QMGR_QUEUE *queue; 117 118 if ((entry = peer->entry_list.next) != 0) { 119 queue = entry->queue; 120 QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers); 121 queue->todo_refcount--; 122 QMGR_LIST_APPEND(queue->busy, entry, queue_peers); 123 queue->busy_refcount++; 124 QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers); 125 peer->job->selected_entries++; 126 127 /* 128 * With opportunistic session caching, the delivery agent must not 129 * only 1) save a session upon completion, but also 2) reuse a cached 130 * session upon the next delivery request. In order to not miss out 131 * on 2), we have to make caching sticky or else we get silly 132 * behavior when the in-memory queue drains. Specifically, new 133 * connections must not be made as long as cached connections exist. 134 * 135 * Safety: don't enable opportunistic session caching unless the queue 136 * manager is able to schedule concurrent or back-to-back deliveries 137 * (we need to recognize back-to-back deliveries for transports with 138 * concurrency 1). 139 * 140 * If caching has previously been enabled, but is not now, fetch any 141 * existing entries from the cache, but don't add new ones. 142 */ 143 #define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \ 144 (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY()) 145 146 #define BACK_TO_BACK_DELIVERY() \ 147 (queue->last_done + 1 >= event_time()) 148 149 /* 150 * Turn on session caching after we get up to speed. Don't enable 151 * session caching just because we have concurrent deliveries. This 152 * prevents unnecessary session caching when we have a burst of mail 153 * <= the initial concurrency limit. 154 */ 155 if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) { 156 if (BACK_TO_BACK_DELIVERY()) { 157 if (msg_verbose) 158 msg_info("%s: allowing on-demand session caching for %s", 159 myname, queue->name); 160 queue->dflags |= DEL_REQ_FLAG_CONN_MASK; 161 } 162 } 163 164 /* 165 * Turn off session caching when concurrency drops and we're running 166 * out of steam. This is what prevents from turning off session 167 * caching too early, and from making new connections while old ones 168 * are still cached. 169 */ 170 else { 171 if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) { 172 if (msg_verbose) 173 msg_info("%s: disallowing on-demand session caching for %s", 174 myname, queue->name); 175 queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE; 176 } 177 } 178 } 179 return (entry); 180 } 181 182 /* qmgr_entry_unselect - unselect queue entry for delivery */ 183 184 void qmgr_entry_unselect(QMGR_ENTRY *entry) 185 { 186 QMGR_PEER *peer = entry->peer; 187 QMGR_QUEUE *queue = entry->queue; 188 189 /* 190 * Move the entry back to the todo lists. In case of the peer list, put 191 * it back to the beginning, so the select()/unselect() does not reorder 192 * entries. We use this in qmgr_message_assign() to put recipients into 193 * existing entries when possible. 194 */ 195 QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers); 196 queue->busy_refcount--; 197 QMGR_LIST_APPEND(queue->todo, entry, queue_peers); 198 queue->todo_refcount++; 199 QMGR_LIST_PREPEND(peer->entry_list, entry, peer_peers); 200 peer->job->selected_entries--; 201 } 202 203 /* qmgr_entry_move_todo - move entry between todo queues */ 204 205 void qmgr_entry_move_todo(QMGR_QUEUE *dst_queue, QMGR_ENTRY *entry) 206 { 207 const char *myname = "qmgr_entry_move_todo"; 208 QMGR_TRANSPORT *dst_transport = dst_queue->transport; 209 QMGR_MESSAGE *message = entry->message; 210 QMGR_QUEUE *src_queue = entry->queue; 211 QMGR_PEER *dst_peer, *src_peer = entry->peer; 212 QMGR_JOB *dst_job, *src_job = src_peer->job; 213 QMGR_ENTRY *new_entry; 214 int rcpt_count = entry->rcpt_list.len; 215 216 if (entry->stream != 0) 217 msg_panic("%s: queue %s entry is busy", myname, src_queue->name); 218 if (QMGR_QUEUE_THROTTLED(dst_queue)) 219 msg_panic("%s: destination queue %s is throttled", myname, dst_queue->name); 220 if (QMGR_TRANSPORT_THROTTLED(dst_transport)) 221 msg_panic("%s: destination transport %s is throttled", 222 myname, dst_transport->name); 223 224 /* 225 * Create new entry, swap the recipients between the two entries, 226 * adjusting the job counters accordingly, then dispose of the old entry. 227 * 228 * Note that qmgr_entry_done() will also take care of adjusting the 229 * recipient limits of all the message jobs, so we do not have to do that 230 * explicitly for the new job here. 231 * 232 * XXX This does not enforce the per-entry recipient limit, but that is not 233 * a problem as long as qmgr_entry_move_todo() is called only to bounce 234 * or defer mail. 235 */ 236 dst_job = qmgr_job_obtain(message, dst_transport); 237 dst_peer = qmgr_peer_obtain(dst_job, dst_queue); 238 239 new_entry = qmgr_entry_create(dst_peer, message); 240 241 recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list); 242 243 src_job->rcpt_count -= rcpt_count; 244 dst_job->rcpt_count += rcpt_count; 245 246 qmgr_entry_done(entry, QMGR_QUEUE_TODO); 247 } 248 249 /* qmgr_entry_done - dispose of queue entry */ 250 251 void qmgr_entry_done(QMGR_ENTRY *entry, int which) 252 { 253 const char *myname = "qmgr_entry_done"; 254 QMGR_QUEUE *queue = entry->queue; 255 QMGR_MESSAGE *message = entry->message; 256 QMGR_PEER *peer = entry->peer; 257 QMGR_JOB *sponsor, *job = peer->job; 258 QMGR_TRANSPORT *transport = job->transport; 259 260 /* 261 * Take this entry off the in-core queue. 262 */ 263 if (entry->stream != 0) 264 msg_panic("%s: file is open", myname); 265 if (which == QMGR_QUEUE_BUSY) { 266 QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers); 267 queue->busy_refcount--; 268 } else if (which == QMGR_QUEUE_TODO) { 269 QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers); 270 job->selected_entries++; 271 QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers); 272 queue->todo_refcount--; 273 } else { 274 msg_panic("%s: bad queue spec: %d", myname, which); 275 } 276 277 /* 278 * Decrease the in-core recipient counts and free the recipient list and 279 * the structure itself. 280 */ 281 job->rcpt_count -= entry->rcpt_list.len; 282 message->rcpt_count -= entry->rcpt_list.len; 283 qmgr_recipient_count -= entry->rcpt_list.len; 284 recipient_list_free(&entry->rcpt_list); 285 myfree((char *) entry); 286 287 /* 288 * Make sure that the transport of any retired or finishing job that 289 * donated recipient slots to this message gets them back first. Then, if 290 * possible, pass the remaining unused recipient slots to the next job on 291 * the job list. 292 */ 293 for (sponsor = message->job_list.next; sponsor; sponsor = sponsor->message_peers.next) { 294 if (sponsor->rcpt_count >= sponsor->rcpt_limit || sponsor == job) 295 continue; 296 if (sponsor->stack_level < 0 || message->rcpt_offset == 0) 297 qmgr_job_move_limits(sponsor); 298 } 299 if (message->rcpt_offset == 0) { 300 qmgr_job_move_limits(job); 301 } 302 303 /* 304 * We implement a rate-limited queue by emulating a slow delivery 305 * channel. We insert the artificial delays with qmgr_queue_suspend(). 306 * 307 * When a queue is suspended, we must postpone any job scheduling decisions 308 * until the queue is resumed. Otherwise, we make those decisions now. 309 * The job scheduling decisions are made by qmgr_job_blocker_update(). 310 */ 311 if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) { 312 if (queue->window > 1) 313 msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service", 314 myname, transport->name, queue->name, queue->window); 315 if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */ 316 qmgr_queue_unthrottle(queue); 317 if (QMGR_QUEUE_READY(queue)) 318 qmgr_queue_suspend(queue, transport->rate_delay); 319 } 320 if (!QMGR_QUEUE_SUSPENDED(queue) 321 && queue->blocker_tag == transport->blocker_tag) 322 qmgr_job_blocker_update(queue); 323 324 /* 325 * When there are no more entries for this peer, discard the peer 326 * structure. 327 */ 328 peer->refcount--; 329 if (peer->refcount == 0) 330 qmgr_peer_free(peer); 331 332 /* 333 * Maintain back-to-back delivery status. 334 */ 335 if (which == QMGR_QUEUE_BUSY) 336 queue->last_done = event_time(); 337 338 /* 339 * When the in-core queue for this site is empty and when this site is 340 * not dead or suspended, discard the in-core queue. When this site is 341 * dead, but the number of in-core queues exceeds some threshold, get rid 342 * of this in-core queue anyway, in order to avoid running out of memory. 343 */ 344 if (queue->todo.next == 0 && queue->busy.next == 0) { 345 if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit) 346 qmgr_queue_unthrottle(queue); 347 if (QMGR_QUEUE_READY(queue)) 348 qmgr_queue_done(queue); 349 } 350 351 /* 352 * Update the in-core message reference count. When the in-core message 353 * structure has no more references, dispose of the message. 354 */ 355 message->refcount--; 356 if (message->refcount == 0) 357 qmgr_active_done(message); 358 } 359 360 /* qmgr_entry_create - create queue todo entry */ 361 362 QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *peer, QMGR_MESSAGE *message) 363 { 364 QMGR_ENTRY *entry; 365 QMGR_QUEUE *queue = peer->queue; 366 367 /* 368 * Sanity check. 369 */ 370 if (QMGR_QUEUE_THROTTLED(queue)) 371 msg_panic("qmgr_entry_create: dead queue: %s", queue->name); 372 373 /* 374 * Create the delivery request. 375 */ 376 entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY)); 377 entry->stream = 0; 378 entry->message = message; 379 recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE); 380 message->refcount++; 381 entry->peer = peer; 382 QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers); 383 peer->refcount++; 384 entry->queue = queue; 385 QMGR_LIST_APPEND(queue->todo, entry, queue_peers); 386 queue->todo_refcount++; 387 peer->job->read_entries++; 388 389 /* 390 * Warn if a destination is falling behind while the active queue 391 * contains a non-trivial amount of single-recipient email. When a 392 * destination takes up more and more space in the active queue, then 393 * other mail will not get through and delivery performance will suffer. 394 * 395 * XXX At this point in the code, the busy reference count is still less 396 * than the concurrency limit (otherwise this code would not be invoked 397 * in the first place) so we have to make make some awkward adjustments 398 * below. 399 * 400 * XXX The queue length test below looks at the active queue share of an 401 * individual destination. This catches the case where mail for one 402 * destination is falling behind because it has to round-robin compete 403 * with many other destinations. However, Postfix will also perform 404 * poorly when most of the active queue is tied up by a small number of 405 * concurrency limited destinations. The queue length test below detects 406 * such conditions only indirectly. 407 * 408 * XXX This code does not detect the case that the active queue is being 409 * starved because incoming mail is pounding the disk. 410 */ 411 if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) { 412 int queue_length = queue->todo_refcount + queue->busy_refcount; 413 time_t now; 414 QMGR_TRANSPORT *transport; 415 double active_share; 416 417 if (queue_length > var_qmgr_active_limit / 5 418 && (now = event_time()) >= queue->clog_time_to_warn) { 419 active_share = queue_length / (double) qmgr_message_count; 420 msg_warn("mail for %s is using up %d of %d active queue entries", 421 queue->nexthop, queue_length, qmgr_message_count); 422 if (active_share < 0.9) 423 msg_warn("this may slow down other mail deliveries"); 424 transport = queue->transport; 425 if (transport->dest_concurrency_limit > 0 426 && transport->dest_concurrency_limit <= queue->busy_refcount + 1) 427 msg_warn("you may need to increase the main.cf %s%s from %d", 428 transport->name, _DEST_CON_LIMIT, 429 transport->dest_concurrency_limit); 430 else if (queue->window > var_qmgr_active_limit * active_share) 431 msg_warn("you may need to increase the main.cf %s from %d", 432 VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit); 433 else if (queue->peers.next != queue->peers.prev) 434 msg_warn("you may need a separate master.cf transport for %s", 435 queue->nexthop); 436 else { 437 msg_warn("you may need to reduce %s connect and helo timeouts", 438 transport->name); 439 msg_warn("so that Postfix quickly skips unavailable hosts"); 440 msg_warn("you may need to increase the main.cf %s and %s", 441 VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME); 442 msg_warn("so that Postfix wastes less time on undeliverable mail"); 443 msg_warn("you may need to increase the master.cf %s process limit", 444 transport->name); 445 } 446 msg_warn("please avoid flushing the whole queue when you have"); 447 msg_warn("lots of deferred mail, that is bad for performance"); 448 msg_warn("to turn off these warnings specify: %s = 0", 449 VAR_QMGR_CLOG_WARN_TIME); 450 queue->clog_time_to_warn = now + var_qmgr_clog_warn_time; 451 } 452 } 453 return (entry); 454 } 455