1 /* $NetBSD: qmgr_job.c,v 1.2 2017/02/14 01:16:47 christos Exp $ */
2
3 /*++
4 /* NAME
5 /* qmgr_job 3
6 /* SUMMARY
7 /* per-transport jobs
8 /* SYNOPSIS
9 /* #include "qmgr.h"
10 /*
11 /* QMGR_JOB *qmgr_job_obtain(message, transport)
12 /* QMGR_MESSAGE *message;
13 /* QMGR_TRANSPORT *transport;
14 /*
15 /* void qmgr_job_free(job)
16 /* QMGR_JOB *job;
17 /*
18 /* void qmgr_job_move_limits(job)
19 /* QMGR_JOB *job;
20 /*
21 /* QMGR_ENTRY *qmgr_job_entry_select(transport)
22 /* QMGR_TRANSPORT *transport;
23 /*
24 /* void qmgr_job_blocker_update(queue)
25 /* QMGR_QUEUE *queue;
26 /* DESCRIPTION
27 /* These routines add/delete/manipulate per-transport jobs.
28 /* Each job corresponds to a specific transport and message.
29 /* Each job has a peer list containing all pending delivery
30 /* requests for that message.
31 /*
32 /* qmgr_job_obtain() finds an existing job for named message and
33 /* transport combination. New empty job is created if no existing can
34 /* be found. In either case, the job is prepared for assignment of
35 /* (more) message recipients.
36 /*
37 /* qmgr_job_free() disposes of a per-transport job after all
38 /* its entries have been taken care of. It is an error to dispose
39 /* of a job that is still in use.
40 /*
41 /* qmgr_job_entry_select() attempts to find the next entry suitable
42 /* for delivery. The job preempting algorithm is also exercised.
43 /* If necessary, an attempt to read more recipients into core is made.
44 /* This can result in creation of more job, queue and entry structures.
45 /*
46 /* qmgr_job_blocker_update() updates the status of blocked
47 /* jobs after a decrease in the queue's concurrency level,
48 /* after the queue is throttled, or after the queue is resumed
49 /* from suspension.
50 /*
51 /* qmgr_job_move_limits() takes care of proper distribution of the
52 /* per-transport recipients limit among the per-transport jobs.
53 /* Should be called whenever a job's recipient slot becomes available.
54 /* DIAGNOSTICS
55 /* Panic: consistency check failure.
56 /* LICENSE
57 /* .ad
58 /* .fi
59 /* The Secure Mailer license must be distributed with this software.
60 /* AUTHOR(S)
61 /* Patrik Rak
62 /* patrik@raxoft.cz
63 /*--*/
64
65 /* System library. */
66
67 #include <sys_defs.h>
68
69 /* Utility library. */
70
71 #include <msg.h>
72 #include <htable.h>
73 #include <mymalloc.h>
74 #include <sane_time.h>
75
76 /* Application-specific. */
77
78 #include "qmgr.h"
79
80 /* Forward declarations */
81
82 static void qmgr_job_pop(QMGR_JOB *);
83
84 /* Helper macros */
85
86 #define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries)
87
88 /*
89 * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread
90 * because we don't know if all those unread recipients go to our transport yet.
91 */
92
93 #define MIN_ENTRIES(job) ((job)->read_entries)
94 #define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread)
95
96 #define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0)
97
98 #define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag)
99
100 /* qmgr_job_create - create and initialize message job structure */
101
qmgr_job_create(QMGR_MESSAGE * message,QMGR_TRANSPORT * transport)102 static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
103 {
104 QMGR_JOB *job;
105
106 job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
107 job->message = message;
108 QMGR_LIST_APPEND(message->job_list, job, message_peers);
109 htable_enter(transport->job_byname, message->queue_id, (void *) job);
110 job->transport = transport;
111 QMGR_LIST_INIT(job->transport_peers);
112 QMGR_LIST_INIT(job->time_peers);
113 job->stack_parent = 0;
114 QMGR_LIST_INIT(job->stack_children);
115 QMGR_LIST_INIT(job->stack_siblings);
116 job->stack_level = -1;
117 job->blocker_tag = 0;
118 job->peer_byname = htable_create(0);
119 QMGR_LIST_INIT(job->peer_list);
120 job->slots_used = 0;
121 job->slots_available = 0;
122 job->selected_entries = 0;
123 job->read_entries = 0;
124 job->rcpt_count = 0;
125 job->rcpt_limit = 0;
126 return (job);
127 }
128
129 /* qmgr_job_link - append the job to the job lists based on the time it was queued */
130
qmgr_job_link(QMGR_JOB * job)131 static void qmgr_job_link(QMGR_JOB *job)
132 {
133 QMGR_TRANSPORT *transport = job->transport;
134 QMGR_MESSAGE *message = job->message;
135 QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current;
136 int delay;
137
138 /*
139 * Sanity checks.
140 */
141 if (job->stack_level >= 0)
142 msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level);
143
144 /*
145 * Traverse the time list and the scheduler list from the end and stop
146 * when we found job older than the one being linked.
147 *
148 * During the traversals keep track if we have come across either the
149 * current job or the first unread job on the job list. If this is the
150 * case, these pointers will be adjusted below as required.
151 *
152 * Although both lists are exactly the same when only jobs on the stack
153 * level zero are considered, it's easier to traverse them separately.
154 * Otherwise it's impossible to keep track of the current job pointer
155 * effectively.
156 *
157 * This may look inefficient but under normal operation it is expected that
158 * the loops will stop right away, resulting in normal list appends
159 * below. However, this code is necessary for reviving retired jobs and
160 * for jobs which are created long after the first chunk of recipients
161 * was read in-core (either of these can happen only for multi-transport
162 * messages).
163 *
164 * XXX Note that we test stack_parent rather than stack_level below. This
165 * subtle difference allows us to enqueue the job in correct time order
166 * with respect to orphaned children even after their original parent on
167 * level zero is gone. Consequently, the early loop stop in candidate
168 * selection works reliably, too. These are the reasons why we care to
169 * bother with children adoption at all.
170 */
171 current = transport->job_current;
172 for (next = 0, prev = transport->job_list.prev; prev;
173 next = prev, prev = prev->transport_peers.prev) {
174 if (prev->stack_parent == 0) {
175 delay = message->queued_time - prev->message->queued_time;
176 if (delay >= 0)
177 break;
178 }
179 if (current == prev)
180 current = 0;
181 }
182 list_prev = prev;
183 list_next = next;
184
185 unread = transport->job_next_unread;
186 for (next = 0, prev = transport->job_bytime.prev; prev;
187 next = prev, prev = prev->time_peers.prev) {
188 delay = message->queued_time - prev->message->queued_time;
189 if (delay >= 0)
190 break;
191 if (unread == prev)
192 unread = 0;
193 }
194
195 /*
196 * Link the job into the proper place on the job lists and mark it so we
197 * know it has been linked.
198 */
199 job->stack_level = 0;
200 QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers);
201 QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers);
202
203 /*
204 * Update the current job pointer if necessary.
205 */
206 if (current == 0)
207 transport->job_current = job;
208
209 /*
210 * Update the pointer to the first unread job on the job list and steal
211 * the unused recipient slots from the old one.
212 */
213 if (unread == 0) {
214 unread = transport->job_next_unread;
215 transport->job_next_unread = job;
216 if (unread != 0)
217 qmgr_job_move_limits(unread);
218 }
219
220 /*
221 * Get as much recipient slots as possible. The excess will be returned
222 * to the transport pool as soon as the exact amount required is known
223 * (which is usually after all recipients have been read in core).
224 */
225 if (transport->rcpt_unused > 0) {
226 job->rcpt_limit += transport->rcpt_unused;
227 message->rcpt_limit += transport->rcpt_unused;
228 transport->rcpt_unused = 0;
229 }
230 }
231
232 /* qmgr_job_find - lookup job associated with named message and transport */
233
qmgr_job_find(QMGR_MESSAGE * message,QMGR_TRANSPORT * transport)234 static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
235 {
236
237 /*
238 * Instead of traversing the message job list, we use single per
239 * transport hash table. This is better (at least with respect to memory
240 * usage) than having single hash table (usually almost empty) for each
241 * message.
242 */
243 return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id));
244 }
245
246 /* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */
247
qmgr_job_obtain(QMGR_MESSAGE * message,QMGR_TRANSPORT * transport)248 QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
249 {
250 QMGR_JOB *job;
251
252 /*
253 * Try finding an existing job, reviving it if it was already retired.
254 * Create a new job for this transport/message combination otherwise. In
255 * either case, the job ends linked on the job lists.
256 */
257 if ((job = qmgr_job_find(message, transport)) == 0)
258 job = qmgr_job_create(message, transport);
259 if (job->stack_level < 0)
260 qmgr_job_link(job);
261
262 /*
263 * Reset the candidate cache because of the new expected recipients. Make
264 * sure the job is not marked as a blocker for the same reason. Note that
265 * this can result in having a non-blocker followed by more blockers.
266 * Consequently, we can't just update the current job pointer, we have to
267 * reset it. Fortunately qmgr_job_entry_select() will easily deal with
268 * this and will lookup the real current job for us.
269 */
270 RESET_CANDIDATE_CACHE(transport);
271 if (IS_BLOCKER(job, transport)) {
272 job->blocker_tag = 0;
273 transport->job_current = transport->job_list.next;
274 }
275 return (job);
276 }
277
278 /* qmgr_job_move_limits - move unused recipient slots to the next unread job */
279
qmgr_job_move_limits(QMGR_JOB * job)280 void qmgr_job_move_limits(QMGR_JOB *job)
281 {
282 QMGR_TRANSPORT *transport = job->transport;
283 QMGR_MESSAGE *message = job->message;
284 QMGR_JOB *next = transport->job_next_unread;
285 int rcpt_unused, msg_rcpt_unused;
286
287 /*
288 * Find next unread job on the job list if necessary. Cache it for later.
289 * This makes the amortized efficiency of this routine O(1) per job. Note
290 * that we use the time list whose ordering doesn't change over time.
291 */
292 if (job == next) {
293 for (next = next->time_peers.next; next; next = next->time_peers.next)
294 if (next->message->rcpt_offset != 0)
295 break;
296 transport->job_next_unread = next;
297 }
298
299 /*
300 * Calculate the number of available unused slots.
301 */
302 rcpt_unused = job->rcpt_limit - job->rcpt_count;
303 msg_rcpt_unused = message->rcpt_limit - message->rcpt_count;
304 if (msg_rcpt_unused < rcpt_unused)
305 rcpt_unused = msg_rcpt_unused;
306
307 /*
308 * Transfer the unused recipient slots back to the transport pool and to
309 * the next not-fully-read job. Job's message limits are adjusted
310 * accordingly. Note that the transport pool can be negative if we used
311 * some of the rcpt_per_stack slots.
312 */
313 if (rcpt_unused > 0) {
314 job->rcpt_limit -= rcpt_unused;
315 message->rcpt_limit -= rcpt_unused;
316 transport->rcpt_unused += rcpt_unused;
317 if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) {
318 next->rcpt_limit += rcpt_unused;
319 next->message->rcpt_limit += rcpt_unused;
320 transport->rcpt_unused = 0;
321 }
322 }
323 }
324
325 /* qmgr_job_parent_gone - take care of orphaned stack children */
326
qmgr_job_parent_gone(QMGR_JOB * job,QMGR_JOB * parent)327 static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent)
328 {
329 QMGR_JOB *child;
330
331 while ((child = job->stack_children.next) != 0) {
332 QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings);
333 if (parent != 0)
334 QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings);
335 child->stack_parent = parent;
336 }
337 }
338
339 /* qmgr_job_unlink - unlink the job from the job lists */
340
qmgr_job_unlink(QMGR_JOB * job)341 static void qmgr_job_unlink(QMGR_JOB *job)
342 {
343 const char *myname = "qmgr_job_unlink";
344 QMGR_TRANSPORT *transport = job->transport;
345
346 /*
347 * Sanity checks.
348 */
349 if (job->stack_level != 0)
350 msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level);
351 if (job->stack_parent != 0)
352 msg_panic("%s: parent present", myname);
353 if (job->stack_siblings.next != 0)
354 msg_panic("%s: siblings present", myname);
355
356 /*
357 * Make sure that children of job on zero stack level are informed that
358 * their parent is gone too.
359 */
360 qmgr_job_parent_gone(job, 0);
361
362 /*
363 * Update the current job pointer if necessary.
364 */
365 if (transport->job_current == job)
366 transport->job_current = job->transport_peers.next;
367
368 /*
369 * Invalidate the candidate selection cache if necessary.
370 */
371 if (job == transport->candidate_cache
372 || job == transport->candidate_cache_current)
373 RESET_CANDIDATE_CACHE(transport);
374
375 /*
376 * Remove the job from the job lists and mark it as unlinked.
377 */
378 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
379 QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers);
380 job->stack_level = -1;
381 }
382
383 /* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */
384
qmgr_job_retire(QMGR_JOB * job)385 static void qmgr_job_retire(QMGR_JOB *job)
386 {
387 if (msg_verbose)
388 msg_info("qmgr_job_retire: %s", job->message->queue_id);
389
390 /*
391 * Pop the job from the job stack if necessary.
392 */
393 if (job->stack_level > 0)
394 qmgr_job_pop(job);
395
396 /*
397 * Make sure this job is not cached as the next unread job for this
398 * transport. The qmgr_entry_done() will make sure that the slots donated
399 * by this job are moved back to the transport pool as soon as possible.
400 */
401 qmgr_job_move_limits(job);
402
403 /*
404 * Remove the job from the job lists. Note that it remains on the message
405 * job list, though, and that it can be revived by using
406 * qmgr_job_obtain(). Also note that the available slot counter is left
407 * intact.
408 */
409 qmgr_job_unlink(job);
410 }
411
412 /* qmgr_job_free - release the job structure */
413
qmgr_job_free(QMGR_JOB * job)414 void qmgr_job_free(QMGR_JOB *job)
415 {
416 const char *myname = "qmgr_job_free";
417 QMGR_MESSAGE *message = job->message;
418 QMGR_TRANSPORT *transport = job->transport;
419
420 if (msg_verbose)
421 msg_info("%s: %s %s", myname, message->queue_id, transport->name);
422
423 /*
424 * Sanity checks.
425 */
426 if (job->rcpt_count)
427 msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count);
428
429 /*
430 * Pop the job from the job stack if necessary.
431 */
432 if (job->stack_level > 0)
433 qmgr_job_pop(job);
434
435 /*
436 * Return any remaining recipient slots back to the recipient slots pool.
437 */
438 qmgr_job_move_limits(job);
439 if (job->rcpt_limit)
440 msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit);
441
442 /*
443 * Unlink and discard the structure. Check if the job is still linked on
444 * the job lists or if it was already retired before unlinking it.
445 */
446 if (job->stack_level >= 0)
447 qmgr_job_unlink(job);
448 QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers);
449 htable_delete(transport->job_byname, message->queue_id, (void (*) (void *)) 0);
450 htable_free(job->peer_byname, (void (*) (void *)) 0);
451 myfree((void *) job);
452 }
453
454 /* qmgr_job_count_slots - maintain the delivery slot counters */
455
qmgr_job_count_slots(QMGR_JOB * job)456 static void qmgr_job_count_slots(QMGR_JOB *job)
457 {
458
459 /*
460 * Count the number of delivery slots used during the delivery of the
461 * selected job. Also count the number of delivery slots available for
462 * its preemption.
463 *
464 * Despite its trivial look, this is one of the key parts of the theory
465 * behind this preempting scheduler.
466 */
467 job->slots_available++;
468 job->slots_used++;
469
470 /*
471 * If the selected job is not the original current job, reset the
472 * candidate cache because the change above have slightly increased the
473 * chance of this job becoming a candidate next time.
474 *
475 * Don't expect that the change of the current jobs this turn will render
476 * the candidate cache invalid the next turn - it can happen that the
477 * next turn the original current job will be selected again and the
478 * cache would be considered valid in such case.
479 */
480 if (job != job->transport->candidate_cache_current)
481 RESET_CANDIDATE_CACHE(job->transport);
482 }
483
484 /* qmgr_job_candidate - find best job candidate for preempting given job */
485
qmgr_job_candidate(QMGR_JOB * current)486 static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current)
487 {
488 QMGR_TRANSPORT *transport = current->transport;
489 QMGR_JOB *job, *best_job = 0;
490 double score, best_score = 0.0;
491 int max_slots, max_needed_entries, max_total_entries;
492 int delay;
493 time_t now = sane_time();
494
495 /*
496 * Fetch the result directly from the cache if the cache is still valid.
497 *
498 * Note that we cache negative results too, so the cache must be invalidated
499 * by resetting the cached current job pointer, not the candidate pointer
500 * itself.
501 *
502 * In case the cache is valid and contains no candidate, we can ignore the
503 * time change, as it affects only which candidate is the best, not if
504 * one exists. However, this feature requires that we no longer relax the
505 * cache resetting rules, depending on the automatic cache timeout.
506 */
507 if (transport->candidate_cache_current == current
508 && (transport->candidate_cache_time == now
509 || transport->candidate_cache == 0))
510 return (transport->candidate_cache);
511
512 /*
513 * Estimate the minimum amount of delivery slots that can ever be
514 * accumulated for the given job. All jobs that won't fit into these
515 * slots are excluded from the candidate selection.
516 */
517 max_slots = (MIN_ENTRIES(current) - current->selected_entries
518 + current->slots_available) / transport->slot_cost;
519
520 /*
521 * Select the candidate with best time_since_queued/total_recipients
522 * score. In addition to jobs which don't meet the max_slots limit, skip
523 * also jobs which don't have any selectable entries at the moment.
524 *
525 * Instead of traversing the whole job list we traverse it just from the
526 * current job forward. This has several advantages. First, we skip some
527 * of the blocker jobs and the current job itself right away. But the
528 * really important advantage is that we are sure that we don't consider
529 * any jobs that are already stack children of the current job. Thanks to
530 * this we can easily include all encountered jobs which are leaf
531 * children of some of the preempting stacks as valid candidates. All we
532 * need to do is to make sure we do not include any of the stack parents.
533 * And, because the leaf children are not ordered by the time since
534 * queued, we have to exclude them from the early loop end test.
535 *
536 * However, don't bother searching if we can't find anything suitable
537 * anyway.
538 */
539 if (max_slots > 0) {
540 for (job = current->transport_peers.next; job; job = job->transport_peers.next) {
541 if (job->stack_children.next != 0 || IS_BLOCKER(job, transport))
542 continue;
543 max_total_entries = MAX_ENTRIES(job);
544 max_needed_entries = max_total_entries - job->selected_entries;
545 delay = now - job->message->queued_time + 1;
546 if (max_needed_entries > 0 && max_needed_entries <= max_slots) {
547 score = (double) delay / max_total_entries;
548 if (score > best_score) {
549 best_score = score;
550 best_job = job;
551 }
552 }
553
554 /*
555 * Stop early if the best score is as good as it can get.
556 */
557 if (delay <= best_score && job->stack_level == 0)
558 break;
559 }
560 }
561
562 /*
563 * Cache the result for later use.
564 */
565 transport->candidate_cache = best_job;
566 transport->candidate_cache_current = current;
567 transport->candidate_cache_time = now;
568
569 return (best_job);
570 }
571
572 /* qmgr_job_preempt - preempt large message with smaller one */
573
qmgr_job_preempt(QMGR_JOB * current)574 static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current)
575 {
576 const char *myname = "qmgr_job_preempt";
577 QMGR_TRANSPORT *transport = current->transport;
578 QMGR_JOB *job, *prev;
579 int expected_slots;
580 int rcpt_slots;
581
582 /*
583 * Suppress preempting completely if the current job is not big enough to
584 * accumulate even the minimal number of slots required.
585 *
586 * Also, don't look for better job candidate if there are no available slots
587 * yet (the count can get negative due to the slot loans below).
588 */
589 if (current->slots_available <= 0
590 || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost)
591 return (current);
592
593 /*
594 * Find best candidate for preempting the current job.
595 *
596 * Note that the function also takes care that the candidate fits within the
597 * number of delivery slots which the current job is still able to
598 * accumulate.
599 */
600 if ((job = qmgr_job_candidate(current)) == 0)
601 return (current);
602
603 /*
604 * Sanity checks.
605 */
606 if (job == current)
607 msg_panic("%s: attempt to preempt itself", myname);
608 if (job->stack_children.next != 0)
609 msg_panic("%s: already on the job stack (%d)", myname, job->stack_level);
610 if (job->stack_level < 0)
611 msg_panic("%s: not on the job list (%d)", myname, job->stack_level);
612
613 /*
614 * Check if there is enough available delivery slots accumulated to
615 * preempt the current job.
616 *
617 * The slot loaning scheme improves the average message response time. Note
618 * that the loan only allows the preemption happen earlier, though. It
619 * doesn't affect how many slots have to be "paid" - in either case the
620 * full number of slots required has to be accumulated later before the
621 * current job can be preempted again.
622 */
623 expected_slots = MAX_ENTRIES(job) - job->selected_entries;
624 if (current->slots_available / transport->slot_cost + transport->slot_loan
625 < expected_slots * transport->slot_loan_factor / 100.0)
626 return (current);
627
628 /*
629 * Preempt the current job.
630 *
631 * This involves placing the selected candidate in front of the current job
632 * on the job list and updating the stack parent/child/sibling pointers
633 * appropriately. But first we need to make sure that the candidate is
634 * taken from its previous job stack which it might be top of.
635 */
636 if (job->stack_level > 0)
637 qmgr_job_pop(job);
638 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
639 prev = current->transport_peers.prev;
640 QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers);
641 job->stack_parent = current;
642 QMGR_LIST_APPEND(current->stack_children, job, stack_siblings);
643 job->stack_level = current->stack_level + 1;
644
645 /*
646 * Update the current job pointer and explicitly reset the candidate
647 * cache.
648 */
649 transport->job_current = job;
650 RESET_CANDIDATE_CACHE(transport);
651
652 /*
653 * Since the single job can be preempted by several jobs at the same
654 * time, we have to adjust the available slot count now to prevent using
655 * the same slots multiple times. To do that we subtract the number of
656 * slots the preempting job will supposedly use. This number will be
657 * corrected later when that job is popped from the stack to reflect the
658 * number of slots really used.
659 *
660 * As long as we don't need to keep track of how many slots were really
661 * used, we can (ab)use the slots_used counter for counting the
662 * difference between the real and expected amounts instead of the
663 * absolute amount.
664 */
665 current->slots_available -= expected_slots * transport->slot_cost;
666 job->slots_used = -expected_slots;
667
668 /*
669 * Add part of extra recipient slots reserved for preempting jobs to the
670 * new current job if necessary.
671 *
672 * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such
673 * case.
674 */
675 if (job->message->rcpt_offset != 0) {
676 rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2;
677 job->rcpt_limit += rcpt_slots;
678 job->message->rcpt_limit += rcpt_slots;
679 transport->rcpt_unused -= rcpt_slots;
680 }
681 if (msg_verbose)
682 msg_info("%s: %s by %s, level %d", myname, current->message->queue_id,
683 job->message->queue_id, job->stack_level);
684
685 return (job);
686 }
687
688 /* qmgr_job_pop - remove the job from its job preemption stack */
689
qmgr_job_pop(QMGR_JOB * job)690 static void qmgr_job_pop(QMGR_JOB *job)
691 {
692 const char *myname = "qmgr_job_pop";
693 QMGR_TRANSPORT *transport = job->transport;
694 QMGR_JOB *parent;
695
696 if (msg_verbose)
697 msg_info("%s: %s", myname, job->message->queue_id);
698
699 /*
700 * Sanity checks.
701 */
702 if (job->stack_level <= 0)
703 msg_panic("%s: not on the job stack (%d)", myname, job->stack_level);
704
705 /*
706 * Adjust the number of delivery slots available to preempt job's parent.
707 * Note that the -= actually adds back any unused slots, as we have
708 * already subtracted the expected amount of slots from both counters
709 * when we did the preemption.
710 *
711 * Note that we intentionally do not adjust slots_used of the parent. Doing
712 * so would decrease the maximum per message inflation factor if the
713 * preemption appeared near the end of parent delivery.
714 *
715 * For the same reason we do not adjust parent's slots_available if the
716 * parent is not the original parent that was preempted by this job
717 * (i.e., the original parent job has already completed).
718 *
719 * This is another key part of the theory behind this preempting scheduler.
720 */
721 if ((parent = job->stack_parent) != 0
722 && job->stack_level == parent->stack_level + 1)
723 parent->slots_available -= job->slots_used * transport->slot_cost;
724
725 /*
726 * Remove the job from its parent's children list.
727 */
728 if (parent != 0) {
729 QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings);
730 job->stack_parent = 0;
731 }
732
733 /*
734 * If there is a parent, let it adopt all those orphaned children.
735 * Otherwise at least notify the children that their parent is gone.
736 */
737 qmgr_job_parent_gone(job, parent);
738
739 /*
740 * Put the job back to stack level zero.
741 */
742 job->stack_level = 0;
743
744 /*
745 * Explicitly reset the candidate cache. It's not worth trying to skip
746 * this under some complicated conditions - in most cases the popped job
747 * is the current job so we would have to reset it anyway.
748 */
749 RESET_CANDIDATE_CACHE(transport);
750
751 /*
752 * Here we leave the remaining work involving the proper placement on the
753 * job list to the caller. The most important reason for this is that it
754 * allows us not to look up where exactly to place the job.
755 *
756 * The caller is also made responsible for invalidating the current job
757 * cache if necessary.
758 */
759 #if 0
760 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
761 QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers);
762
763 if (transport->job_current == job)
764 transport->job_current = job->transport_peers.next;
765 #endif
766 }
767
768 /* qmgr_job_peer_select - select next peer suitable for delivery */
769
qmgr_job_peer_select(QMGR_JOB * job)770 static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job)
771 {
772 QMGR_PEER *peer;
773 QMGR_MESSAGE *message = job->message;
774
775 /*
776 * Try reading in more recipients. We do that as soon as possible
777 * (almost, see below), to make sure there is enough new blood pouring
778 * in. Otherwise single recipient for slow destination might starve the
779 * entire message delivery, leaving lot of fast destination recipients
780 * sitting idle in the queue file.
781 *
782 * Ideally we would like to read in recipients whenever there is a space,
783 * but to prevent excessive I/O, we read them only when enough time has
784 * passed or we can read enough of them at once.
785 *
786 * Note that even if we read the recipients few at a time, the message
787 * loading code tries to put them to existing recipient entries whenever
788 * possible, so the per-destination recipient grouping is not grossly
789 * affected.
790 *
791 * XXX Workaround for logic mismatch. The message->refcount test needs
792 * explanation. If the refcount is zero, it means that qmgr_active_done()
793 * is being completed asynchronously. In such case, we can't read in
794 * more recipients as bad things would happen after qmgr_active_done()
795 * continues processing. Note that this results in the given job being
796 * stalled for some time, but fortunately this particular situation is so
797 * rare that it is not critical. Still we seek for better solution.
798 */
799 if (message->rcpt_offset != 0
800 && message->refcount > 0
801 && (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit
802 || (message->rcpt_limit > message->rcpt_count
803 && sane_time() - message->refill_time >= job->transport->refill_delay)))
804 qmgr_message_realloc(message);
805
806 /*
807 * Get the next suitable peer, if there is any.
808 */
809 if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0)
810 return (peer);
811
812 /*
813 * There is no suitable peer in-core, so try reading in more recipients
814 * if possible. This is our last chance to get suitable peer before
815 * giving up on this job for now.
816 *
817 * XXX For message->refcount, see above.
818 */
819 if (message->rcpt_offset != 0
820 && message->refcount > 0
821 && message->rcpt_limit > message->rcpt_count) {
822 qmgr_message_realloc(message);
823 if (HAS_ENTRIES(job))
824 return (qmgr_peer_select(job));
825 }
826 return (0);
827 }
828
829 /* qmgr_job_entry_select - select next entry suitable for delivery */
830
qmgr_job_entry_select(QMGR_TRANSPORT * transport)831 QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
832 {
833 QMGR_JOB *job, *next;
834 QMGR_PEER *peer;
835 QMGR_ENTRY *entry;
836
837 /*
838 * Get the current job if there is one.
839 */
840 if ((job = transport->job_current) == 0)
841 return (0);
842
843 /*
844 * Exercise the preempting algorithm if enabled.
845 *
846 * The slot_cost equal to 1 causes the algorithm to degenerate and is
847 * therefore disabled too.
848 */
849 if (transport->slot_cost >= 2)
850 job = qmgr_job_preempt(job);
851
852 /*
853 * Select next entry suitable for delivery. In case the current job can't
854 * provide one because of the per-destination concurrency limits, we mark
855 * it as a "blocker" job and continue with the next job on the job list.
856 *
857 * Note that the loop also takes care of getting the "stall" jobs (job with
858 * no entries currently available) out of the way if necessary. Stall
859 * jobs can appear in case of multi-transport messages whose recipients
860 * don't fit in-core at once. Some jobs created by such message may have
861 * only few recipients and would stay on the job list until all other
862 * jobs of that message are delivered, blocking precious recipient slots
863 * available to this transport. Or it can happen that the job has some
864 * more entries but suddenly they all get deferred. Whatever the reason,
865 * we retire such jobs below if we happen to come across some.
866 */
867 for ( /* empty */ ; job; job = next) {
868 next = job->transport_peers.next;
869
870 /*
871 * Don't bother if the job is known to have no available entries
872 * because of the per-destination concurrency limits.
873 */
874 if (IS_BLOCKER(job, transport))
875 continue;
876
877 if ((peer = qmgr_job_peer_select(job)) != 0) {
878
879 /*
880 * We have found a suitable peer. Select one of its entries and
881 * adjust the delivery slot counters.
882 */
883 entry = qmgr_entry_select(peer);
884 qmgr_job_count_slots(job);
885
886 /*
887 * Remember the current job for the next time so we don't have to
888 * crawl over all those blockers again. They will be reconsidered
889 * when the concurrency limit permits.
890 */
891 transport->job_current = job;
892
893 /*
894 * In case we selected the very last job entry, remove the job
895 * from the job lists right now.
896 *
897 * This action uses the assumption that once the job entry has been
898 * selected, it can be unselected only before the message ifself
899 * is deferred. Thus the job with all entries selected can't
900 * re-appear with more entries available for selection again
901 * (without reading in more entries from the queue file, which in
902 * turn invokes qmgr_job_obtain() which re-links the job back on
903 * the lists if necessary).
904 *
905 * Note that qmgr_job_move_limits() transfers the recipients slots
906 * correctly even if the job is unlinked from the job list thanks
907 * to the job_next_unread caching.
908 */
909 if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0)
910 qmgr_job_retire(job);
911
912 /*
913 * Finally. Hand back the fruit of our tedious effort.
914 */
915 return (entry);
916 } else if (HAS_ENTRIES(job)) {
917
918 /*
919 * The job can't be selected due the concurrency limits. Mark it
920 * together with its queues so we know they are blocking the job
921 * list and they get the appropriate treatment. In particular,
922 * all blockers will be reconsidered when one of the problematic
923 * queues will accept more deliveries. And the job itself will be
924 * reconsidered if it is assigned some more entries.
925 */
926 job->blocker_tag = transport->blocker_tag;
927 for (peer = job->peer_list.next; peer; peer = peer->peers.next)
928 if (peer->entry_list.next != 0)
929 peer->queue->blocker_tag = transport->blocker_tag;
930 } else {
931
932 /*
933 * The job is "stalled". Retire it until it either gets freed or
934 * gets more entries later.
935 */
936 qmgr_job_retire(job);
937 }
938 }
939
940 /*
941 * We have not found any entry we could use for delivery. Well, things
942 * must have changed since this transport was selected for asynchronous
943 * allocation. Never mind. Clear the current job pointer and reluctantly
944 * report back that we have failed in our task.
945 */
946 transport->job_current = 0;
947 return (0);
948 }
949
950 /* qmgr_job_blocker_update - update "blocked job" status */
951
qmgr_job_blocker_update(QMGR_QUEUE * queue)952 void qmgr_job_blocker_update(QMGR_QUEUE *queue)
953 {
954 QMGR_TRANSPORT *transport = queue->transport;
955
956 /*
957 * If the queue was blocking some of the jobs on the job list, check if
958 * the concurrency limit has lifted. If there are still some pending
959 * deliveries, give it a try and unmark all transport blockers at once.
960 * The qmgr_job_entry_select() will do the rest. In either case make sure
961 * the queue is not marked as a blocker anymore, with extra handling of
962 * queues which were declared dead.
963 *
964 * Note that changing the blocker status also affects the candidate cache.
965 * Most of the cases would be automatically recognized by the current job
966 * change, but we play safe and reset the cache explicitly below.
967 *
968 * Keeping the transport blocker tag odd is an easy way to make sure the tag
969 * never matches jobs that are not explicitly marked as blockers.
970 */
971 if (queue->blocker_tag == transport->blocker_tag) {
972 if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
973 transport->blocker_tag += 2;
974 transport->job_current = transport->job_list.next;
975 transport->candidate_cache_current = 0;
976 }
977 if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue))
978 queue->blocker_tag = 0;
979 }
980 }
981