1 /* $NetBSD: qmgr_queue.c,v 1.1.1.1 2009/06/23 10:08:53 tron Exp $ */ 2 3 /*++ 4 /* NAME 5 /* qmgr_queue 3 6 /* SUMMARY 7 /* per-destination queues 8 /* SYNOPSIS 9 /* #include "qmgr.h" 10 /* 11 /* int qmgr_queue_count; 12 /* 13 /* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop) 14 /* QMGR_TRANSPORT *transport; 15 /* const char *name; 16 /* const char *nexthop; 17 /* 18 /* void qmgr_queue_done(queue) 19 /* QMGR_QUEUE *queue; 20 /* 21 /* QMGR_QUEUE *qmgr_queue_find(transport, name) 22 /* QMGR_TRANSPORT *transport; 23 /* const char *name; 24 /* 25 /* void qmgr_queue_throttle(queue, dsn) 26 /* QMGR_QUEUE *queue; 27 /* DSN *dsn; 28 /* 29 /* void qmgr_queue_unthrottle(queue) 30 /* QMGR_QUEUE *queue; 31 /* 32 /* void qmgr_queue_suspend(queue, delay) 33 /* QMGR_QUEUE *queue; 34 /* int delay; 35 /* DESCRIPTION 36 /* These routines add/delete/manipulate per-destination queues. 37 /* Each queue corresponds to a specific transport and destination. 38 /* Each queue has a `todo' list of delivery requests for that 39 /* destination, and a `busy' list of delivery requests in progress. 40 /* 41 /* qmgr_queue_count is a global counter for the total number 42 /* of in-core queue structures. 43 /* 44 /* qmgr_queue_create() creates an empty named queue for the named 45 /* transport and destination. The queue is given an initial 46 /* concurrency limit as specified with the 47 /* \fIinitial_destination_concurrency\fR configuration parameter, 48 /* provided that it does not exceed the transport-specific 49 /* concurrency limit. 50 /* 51 /* qmgr_queue_done() disposes of a per-destination queue after all 52 /* its entries have been taken care of. It is an error to dispose 53 /* of a dead queue. 54 /* 55 /* qmgr_queue_find() looks up the named queue for the named 56 /* transport. A null result means that the queue was not found. 57 /* 58 /* qmgr_queue_throttle() handles a delivery error, and decrements the 59 /* concurrency limit for the destination, with a lower bound of 1. 60 /* When the cohort failure bound is reached, qmgr_queue_throttle() 61 /* sets the concurrency limit to zero and starts a timer 62 /* to re-enable delivery to the destination after a configurable delay. 63 /* 64 /* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects. 65 /* The concurrency limit for the destination is incremented, 66 /* provided that it does not exceed the destination concurrency 67 /* limit specified for the transport. This routine implements 68 /* "slow open" mode, and eliminates the "thundering herd" problem. 69 /* 70 /* qmgr_queue_suspend() suspends delivery for this destination 71 /* briefly. This function invalidates any scheduling decisions 72 /* that are based on the present queue's concurrency window. 73 /* To compensate for work skipped by qmgr_entry_done(), the 74 /* status of blocker jobs is re-evaluated after the queue is 75 /* resumed. 76 /* DIAGNOSTICS 77 /* Panic: consistency check failure. 78 /* LICENSE 79 /* .ad 80 /* .fi 81 /* The Secure Mailer license must be distributed with this software. 82 /* AUTHOR(S) 83 /* Wietse Venema 84 /* IBM T.J. Watson Research 85 /* P.O. Box 704 86 /* Yorktown Heights, NY 10598, USA 87 /* 88 /* Pre-emptive scheduler enhancements: 89 /* Patrik Rak 90 /* Modra 6 91 /* 155 00, Prague, Czech Republic 92 /* 93 /* Concurrency scheduler enhancements with: 94 /* Victor Duchovni 95 /* Morgan Stanley 96 /*--*/ 97 98 /* System library. */ 99 100 #include <sys_defs.h> 101 #include <time.h> 102 103 /* Utility library. */ 104 105 #include <msg.h> 106 #include <mymalloc.h> 107 #include <events.h> 108 #include <htable.h> 109 110 /* Global library. */ 111 112 #include <mail_params.h> 113 #include <recipient_list.h> 114 #include <mail_proto.h> /* QMGR_LOG_WINDOW */ 115 116 /* Application-specific. */ 117 118 #include "qmgr.h" 119 120 int qmgr_queue_count; 121 122 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ 123 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \ 124 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0) 125 126 #define QMGR_LOG_FEEDBACK(feedback) \ 127 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ 128 msg_info("%s: feedback %g", myname, feedback); 129 130 #define QMGR_LOG_WINDOW(queue) \ 131 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ 132 msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \ 133 myname, queue->name, queue->transport->dest_concurrency_limit, \ 134 queue->window, queue->success, queue->failure, queue->fail_cohorts); 135 136 /* qmgr_queue_resume - resume delivery to destination */ 137 138 static void qmgr_queue_resume(int event, char *context) 139 { 140 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; 141 const char *myname = "qmgr_queue_resume"; 142 143 /* 144 * Sanity checks. 145 */ 146 if (!QMGR_QUEUE_SUSPENDED(queue)) 147 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 148 149 /* 150 * We can't simply force delivery on this queue: the transport's pending 151 * count may already be maxed out, and there may be other constraints 152 * that definitely should be none of our business. The best we can do is 153 * to play by the same rules as everyone else: let qmgr_active_drain() 154 * and round-robin selection take care of message selection. 155 */ 156 queue->window = 1; 157 158 /* 159 * Every event handler that leaves a queue in the "ready" state should 160 * remove the queue when it is empty. 161 * 162 * XXX Do not omit the redundant test below. It is here to simplify code 163 * consistency checks. The check is trivially eliminated by the compiler 164 * optimizer. There is no need to sacrifice code clarity for the sake of 165 * performance. 166 * 167 * XXX Do not expose the blocker job logic here. Rate-limited queues are not 168 * a performance-critical feature. Here, too, there is no need to sacrifice 169 * code clarity for the sake of performance. 170 */ 171 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) 172 qmgr_queue_done(queue); 173 else 174 qmgr_job_blocker_update(queue); 175 } 176 177 /* qmgr_queue_suspend - briefly suspend a destination */ 178 179 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) 180 { 181 const char *myname = "qmgr_queue_suspend"; 182 183 /* 184 * Sanity checks. 185 */ 186 if (!QMGR_QUEUE_READY(queue)) 187 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 188 if (queue->busy_refcount > 0) 189 msg_panic("%s: queue is busy", myname); 190 191 /* 192 * Set the queue status to "suspended". No-one is supposed to remove a 193 * queue in suspended state. 194 */ 195 queue->window = QMGR_QUEUE_STAT_SUSPENDED; 196 event_request_timer(qmgr_queue_resume, (char *) queue, delay); 197 } 198 199 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */ 200 201 static void qmgr_queue_unthrottle_wrapper(int unused_event, char *context) 202 { 203 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; 204 205 /* 206 * This routine runs when a wakeup timer goes off; it does not run in the 207 * context of some queue manipulation. Therefore, it is safe to discard 208 * this in-core queue when it is empty and when this site is not dead. 209 */ 210 qmgr_queue_unthrottle(queue); 211 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) 212 qmgr_queue_done(queue); 213 } 214 215 /* qmgr_queue_unthrottle - give this destination another chance */ 216 217 void qmgr_queue_unthrottle(QMGR_QUEUE *queue) 218 { 219 const char *myname = "qmgr_queue_unthrottle"; 220 QMGR_TRANSPORT *transport = queue->transport; 221 double feedback; 222 223 if (msg_verbose) 224 msg_info("%s: queue %s", myname, queue->name); 225 226 /* 227 * Sanity checks. 228 */ 229 if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue)) 230 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 231 232 /* 233 * Don't restart the negative feedback hysteresis cycle with every 234 * positive feedback. Restart it only when we make a positive concurrency 235 * adjustment (i.e. at the end of a positive feedback hysteresis cycle). 236 * Otherwise negative feedback would be too aggressive: negative feedback 237 * takes effect immediately at the start of its hysteresis cycle. 238 */ 239 queue->fail_cohorts = 0; 240 241 /* 242 * Special case when this site was dead. 243 */ 244 if (QMGR_QUEUE_THROTTLED(queue)) { 245 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (char *) queue); 246 if (queue->dsn == 0) 247 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); 248 dsn_free(queue->dsn); 249 queue->dsn = 0; 250 /* Back from the almost grave, best concurrency is anyone's guess. */ 251 if (queue->busy_refcount > 0) 252 queue->window = queue->busy_refcount; 253 else 254 queue->window = transport->init_dest_concurrency; 255 queue->success = queue->failure = 0; 256 QMGR_LOG_WINDOW(queue); 257 return; 258 } 259 260 /* 261 * Increase the destination's concurrency limit until we reach the 262 * transport's concurrency limit. Allow for a margin the size of the 263 * initial destination concurrency, so that we're not too gentle. 264 * 265 * Why is the concurrency increment based on preferred concurrency and not 266 * on the number of outstanding delivery requests? The latter fluctuates 267 * wildly when deliveries complete in bursts (artificial benchmark 268 * measurements), and does not account for cached connections. 269 * 270 * Keep the window within reasonable distance from actual concurrency 271 * otherwise negative feedback will be ineffective. This expression 272 * assumes that busy_refcount changes gradually. This is invalid when 273 * deliveries complete in bursts (artificial benchmark measurements). 274 */ 275 if (transport->dest_concurrency_limit == 0 276 || transport->dest_concurrency_limit > queue->window) 277 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { 278 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); 279 QMGR_LOG_FEEDBACK(feedback); 280 queue->success += feedback; 281 /* Prepare for overshoot (feedback > hysteresis, rounding error). */ 282 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { 283 queue->window += transport->pos_feedback.hysteresis; 284 queue->success -= transport->pos_feedback.hysteresis; 285 queue->failure = 0; 286 } 287 /* Prepare for overshoot. */ 288 if (transport->dest_concurrency_limit > 0 289 && queue->window > transport->dest_concurrency_limit) 290 queue->window = transport->dest_concurrency_limit; 291 } 292 QMGR_LOG_WINDOW(queue); 293 } 294 295 /* qmgr_queue_throttle - handle destination delivery failure */ 296 297 void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) 298 { 299 const char *myname = "qmgr_queue_throttle"; 300 QMGR_TRANSPORT *transport = queue->transport; 301 double feedback; 302 303 /* 304 * Sanity checks. 305 */ 306 if (!QMGR_QUEUE_READY(queue)) 307 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 308 if (queue->dsn) 309 msg_panic("%s: queue %s: spurious reason %s", 310 myname, queue->name, queue->dsn->reason); 311 if (msg_verbose) 312 msg_info("%s: queue %s: %s %s", 313 myname, queue->name, dsn->status, dsn->reason); 314 315 /* 316 * Don't restart the positive feedback hysteresis cycle with every 317 * negative feedback. Restart it only when we make a negative concurrency 318 * adjustment (i.e. at the start of a negative feedback hysteresis 319 * cycle). Otherwise positive feedback would be too weak (positive 320 * feedback does not take effect until the end of its hysteresis cycle). 321 */ 322 323 /* 324 * This queue is declared dead after a configurable number of 325 * pseudo-cohort failures. 326 */ 327 if (QMGR_QUEUE_READY(queue)) { 328 queue->fail_cohorts += 1.0 / queue->window; 329 if (transport->fail_cohort_limit > 0 330 && queue->fail_cohorts >= transport->fail_cohort_limit) 331 queue->window = QMGR_QUEUE_STAT_THROTTLED; 332 } 333 334 /* 335 * Decrease the destination's concurrency limit until we reach 1. Base 336 * adjustments on the concurrency limit itself, instead of using the 337 * actual concurrency. The latter fluctuates wildly when deliveries 338 * complete in bursts (artificial benchmark measurements). 339 * 340 * Even after reaching 1, we maintain the negative hysteresis cycle so that 341 * negative feedback can cancel out positive feedback. 342 */ 343 if (QMGR_QUEUE_READY(queue)) { 344 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); 345 QMGR_LOG_FEEDBACK(feedback); 346 queue->failure -= feedback; 347 /* Prepare for overshoot (feedback > hysteresis, rounding error). */ 348 while (queue->failure - feedback / 2 < 0) { 349 queue->window -= transport->neg_feedback.hysteresis; 350 queue->success = 0; 351 queue->failure += transport->neg_feedback.hysteresis; 352 } 353 /* Prepare for overshoot. */ 354 if (queue->window < 1) 355 queue->window = 1; 356 } 357 358 /* 359 * Special case for a site that just was declared dead. 360 */ 361 if (QMGR_QUEUE_THROTTLED(queue)) { 362 queue->dsn = DSN_COPY(dsn); 363 event_request_timer(qmgr_queue_unthrottle_wrapper, 364 (char *) queue, var_min_backoff_time); 365 queue->dflags = 0; 366 } 367 QMGR_LOG_WINDOW(queue); 368 } 369 370 /* qmgr_queue_done - delete in-core queue for site */ 371 372 void qmgr_queue_done(QMGR_QUEUE *queue) 373 { 374 const char *myname = "qmgr_queue_done"; 375 QMGR_TRANSPORT *transport = queue->transport; 376 377 /* 378 * Sanity checks. It is an error to delete an in-core queue with pending 379 * messages or timers. 380 */ 381 if (queue->busy_refcount != 0 || queue->todo_refcount != 0) 382 msg_panic("%s: refcount: %d", myname, 383 queue->busy_refcount + queue->todo_refcount); 384 if (queue->todo.next || queue->busy.next) 385 msg_panic("%s: queue not empty: %s", myname, queue->name); 386 if (!QMGR_QUEUE_READY(queue)) 387 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 388 if (queue->dsn) 389 msg_panic("%s: queue %s: spurious reason %s", 390 myname, queue->name, queue->dsn->reason); 391 392 /* 393 * Clean up this in-core queue. 394 */ 395 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers); 396 htable_delete(transport->queue_byname, queue->name, (void (*) (char *)) 0); 397 myfree(queue->name); 398 myfree(queue->nexthop); 399 qmgr_queue_count--; 400 myfree((char *) queue); 401 } 402 403 /* qmgr_queue_create - create in-core queue for site */ 404 405 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name, 406 const char *nexthop) 407 { 408 QMGR_QUEUE *queue; 409 410 /* 411 * If possible, choose an initial concurrency of > 1 so that one bad 412 * message or one bad network won't slow us down unnecessarily. 413 */ 414 415 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); 416 qmgr_queue_count++; 417 queue->dflags = 0; 418 queue->last_done = 0; 419 queue->name = mystrdup(name); 420 queue->nexthop = mystrdup(nexthop); 421 queue->todo_refcount = 0; 422 queue->busy_refcount = 0; 423 queue->transport = transport; 424 queue->window = transport->init_dest_concurrency; 425 queue->success = queue->failure = queue->fail_cohorts = 0; 426 QMGR_LIST_INIT(queue->todo); 427 QMGR_LIST_INIT(queue->busy); 428 queue->dsn = 0; 429 queue->clog_time_to_warn = 0; 430 queue->blocker_tag = 0; 431 QMGR_LIST_APPEND(transport->queue_list, queue, peers); 432 htable_enter(transport->queue_byname, name, (char *) queue); 433 return (queue); 434 } 435 436 /* qmgr_queue_find - find in-core named queue */ 437 438 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name) 439 { 440 return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name)); 441 } 442