1 /*	$NetBSD: qmgr_job.c,v 1.2 2017/02/14 01:16:47 christos Exp $	*/
2 
3 /*++
4 /* NAME
5 /*	qmgr_job 3
6 /* SUMMARY
7 /*	per-transport jobs
8 /* SYNOPSIS
9 /*	#include "qmgr.h"
10 /*
11 /*	QMGR_JOB *qmgr_job_obtain(message, transport)
12 /*	QMGR_MESSAGE *message;
13 /*	QMGR_TRANSPORT *transport;
14 /*
15 /*	void qmgr_job_free(job)
16 /*	QMGR_JOB *job;
17 /*
18 /*	void qmgr_job_move_limits(job)
19 /*	QMGR_JOB *job;
20 /*
21 /*	QMGR_ENTRY *qmgr_job_entry_select(transport)
22 /*	QMGR_TRANSPORT *transport;
23 /*
24 /*	void	qmgr_job_blocker_update(queue)
25 /*	QMGR_QUEUE *queue;
26 /* DESCRIPTION
27 /*	These routines add/delete/manipulate per-transport jobs.
28 /*	Each job corresponds to a specific transport and message.
29 /*	Each job has a peer list containing all pending delivery
30 /*	requests for that message.
31 /*
32 /*	qmgr_job_obtain() finds an existing job for named message and
33 /*	transport combination. New empty job is created if no existing can
34 /*	be found. In either case, the job is prepared for assignment of
35 /*	(more) message recipients.
36 /*
37 /*	qmgr_job_free() disposes of a per-transport job after all
38 /*	its entries have been taken care of. It is an error to dispose
39 /*	of a job that is still in use.
40 /*
41 /*	qmgr_job_entry_select() attempts to find the next entry suitable
42 /*	for delivery. The job preempting algorithm is also exercised.
43 /*	If necessary, an attempt to read more recipients into core is made.
44 /*	This can result in creation of more job, queue and entry structures.
45 /*
46 /*	qmgr_job_blocker_update() updates the status of blocked
47 /*	jobs after a decrease in the queue's concurrency level,
48 /*	after the queue is throttled, or after the queue is resumed
49 /*	from suspension.
50 /*
51 /*	qmgr_job_move_limits() takes care of proper distribution of the
52 /*	per-transport recipients limit among the per-transport jobs.
53 /*	Should be called whenever a job's recipient slot becomes available.
54 /* DIAGNOSTICS
55 /*	Panic: consistency check failure.
56 /* LICENSE
57 /* .ad
58 /* .fi
59 /*	The Secure Mailer license must be distributed with this software.
60 /* AUTHOR(S)
61 /*	Patrik Rak
62 /*	patrik@raxoft.cz
63 /*--*/
64 
65 /* System library. */
66 
67 #include <sys_defs.h>
68 
69 /* Utility library. */
70 
71 #include <msg.h>
72 #include <htable.h>
73 #include <mymalloc.h>
74 #include <sane_time.h>
75 
76 /* Application-specific. */
77 
78 #include "qmgr.h"
79 
80 /* Forward declarations */
81 
82 static void qmgr_job_pop(QMGR_JOB *);
83 
84 /* Helper macros */
85 
86 #define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries)
87 
88 /*
89  * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread
90  * because we don't know if all those unread recipients go to our transport yet.
91  */
92 
93 #define MIN_ENTRIES(job) ((job)->read_entries)
94 #define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread)
95 
96 #define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0)
97 
98 #define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag)
99 
100 /* qmgr_job_create - create and initialize message job structure */
101 
qmgr_job_create(QMGR_MESSAGE * message,QMGR_TRANSPORT * transport)102 static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
103 {
104     QMGR_JOB *job;
105 
106     job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
107     job->message = message;
108     QMGR_LIST_APPEND(message->job_list, job, message_peers);
109     htable_enter(transport->job_byname, message->queue_id, (void *) job);
110     job->transport = transport;
111     QMGR_LIST_INIT(job->transport_peers);
112     QMGR_LIST_INIT(job->time_peers);
113     job->stack_parent = 0;
114     QMGR_LIST_INIT(job->stack_children);
115     QMGR_LIST_INIT(job->stack_siblings);
116     job->stack_level = -1;
117     job->blocker_tag = 0;
118     job->peer_byname = htable_create(0);
119     QMGR_LIST_INIT(job->peer_list);
120     job->slots_used = 0;
121     job->slots_available = 0;
122     job->selected_entries = 0;
123     job->read_entries = 0;
124     job->rcpt_count = 0;
125     job->rcpt_limit = 0;
126     return (job);
127 }
128 
129 /* qmgr_job_link - append the job to the job lists based on the time it was queued */
130 
qmgr_job_link(QMGR_JOB * job)131 static void qmgr_job_link(QMGR_JOB *job)
132 {
133     QMGR_TRANSPORT *transport = job->transport;
134     QMGR_MESSAGE *message = job->message;
135     QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current;
136     int     delay;
137 
138     /*
139      * Sanity checks.
140      */
141     if (job->stack_level >= 0)
142 	msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level);
143 
144     /*
145      * Traverse the time list and the scheduler list from the end and stop
146      * when we found job older than the one being linked.
147      *
148      * During the traversals keep track if we have come across either the
149      * current job or the first unread job on the job list. If this is the
150      * case, these pointers will be adjusted below as required.
151      *
152      * Although both lists are exactly the same when only jobs on the stack
153      * level zero are considered, it's easier to traverse them separately.
154      * Otherwise it's impossible to keep track of the current job pointer
155      * effectively.
156      *
157      * This may look inefficient but under normal operation it is expected that
158      * the loops will stop right away, resulting in normal list appends
159      * below. However, this code is necessary for reviving retired jobs and
160      * for jobs which are created long after the first chunk of recipients
161      * was read in-core (either of these can happen only for multi-transport
162      * messages).
163      *
164      * XXX Note that we test stack_parent rather than stack_level below. This
165      * subtle difference allows us to enqueue the job in correct time order
166      * with respect to orphaned children even after their original parent on
167      * level zero is gone. Consequently, the early loop stop in candidate
168      * selection works reliably, too. These are the reasons why we care to
169      * bother with children adoption at all.
170      */
171     current = transport->job_current;
172     for (next = 0, prev = transport->job_list.prev; prev;
173 	 next = prev, prev = prev->transport_peers.prev) {
174 	if (prev->stack_parent == 0) {
175 	    delay = message->queued_time - prev->message->queued_time;
176 	    if (delay >= 0)
177 		break;
178 	}
179 	if (current == prev)
180 	    current = 0;
181     }
182     list_prev = prev;
183     list_next = next;
184 
185     unread = transport->job_next_unread;
186     for (next = 0, prev = transport->job_bytime.prev; prev;
187 	 next = prev, prev = prev->time_peers.prev) {
188 	delay = message->queued_time - prev->message->queued_time;
189 	if (delay >= 0)
190 	    break;
191 	if (unread == prev)
192 	    unread = 0;
193     }
194 
195     /*
196      * Link the job into the proper place on the job lists and mark it so we
197      * know it has been linked.
198      */
199     job->stack_level = 0;
200     QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers);
201     QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers);
202 
203     /*
204      * Update the current job pointer if necessary.
205      */
206     if (current == 0)
207 	transport->job_current = job;
208 
209     /*
210      * Update the pointer to the first unread job on the job list and steal
211      * the unused recipient slots from the old one.
212      */
213     if (unread == 0) {
214 	unread = transport->job_next_unread;
215 	transport->job_next_unread = job;
216 	if (unread != 0)
217 	    qmgr_job_move_limits(unread);
218     }
219 
220     /*
221      * Get as much recipient slots as possible. The excess will be returned
222      * to the transport pool as soon as the exact amount required is known
223      * (which is usually after all recipients have been read in core).
224      */
225     if (transport->rcpt_unused > 0) {
226 	job->rcpt_limit += transport->rcpt_unused;
227 	message->rcpt_limit += transport->rcpt_unused;
228 	transport->rcpt_unused = 0;
229     }
230 }
231 
232 /* qmgr_job_find - lookup job associated with named message and transport */
233 
qmgr_job_find(QMGR_MESSAGE * message,QMGR_TRANSPORT * transport)234 static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
235 {
236 
237     /*
238      * Instead of traversing the message job list, we use single per
239      * transport hash table. This is better (at least with respect to memory
240      * usage) than having single hash table (usually almost empty) for each
241      * message.
242      */
243     return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id));
244 }
245 
246 /* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */
247 
qmgr_job_obtain(QMGR_MESSAGE * message,QMGR_TRANSPORT * transport)248 QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
249 {
250     QMGR_JOB *job;
251 
252     /*
253      * Try finding an existing job, reviving it if it was already retired.
254      * Create a new job for this transport/message combination otherwise. In
255      * either case, the job ends linked on the job lists.
256      */
257     if ((job = qmgr_job_find(message, transport)) == 0)
258 	job = qmgr_job_create(message, transport);
259     if (job->stack_level < 0)
260 	qmgr_job_link(job);
261 
262     /*
263      * Reset the candidate cache because of the new expected recipients. Make
264      * sure the job is not marked as a blocker for the same reason. Note that
265      * this can result in having a non-blocker followed by more blockers.
266      * Consequently, we can't just update the current job pointer, we have to
267      * reset it. Fortunately qmgr_job_entry_select() will easily deal with
268      * this and will lookup the real current job for us.
269      */
270     RESET_CANDIDATE_CACHE(transport);
271     if (IS_BLOCKER(job, transport)) {
272 	job->blocker_tag = 0;
273 	transport->job_current = transport->job_list.next;
274     }
275     return (job);
276 }
277 
278 /* qmgr_job_move_limits - move unused recipient slots to the next unread job */
279 
qmgr_job_move_limits(QMGR_JOB * job)280 void    qmgr_job_move_limits(QMGR_JOB *job)
281 {
282     QMGR_TRANSPORT *transport = job->transport;
283     QMGR_MESSAGE *message = job->message;
284     QMGR_JOB *next = transport->job_next_unread;
285     int     rcpt_unused, msg_rcpt_unused;
286 
287     /*
288      * Find next unread job on the job list if necessary. Cache it for later.
289      * This makes the amortized efficiency of this routine O(1) per job. Note
290      * that we use the time list whose ordering doesn't change over time.
291      */
292     if (job == next) {
293 	for (next = next->time_peers.next; next; next = next->time_peers.next)
294 	    if (next->message->rcpt_offset != 0)
295 		break;
296 	transport->job_next_unread = next;
297     }
298 
299     /*
300      * Calculate the number of available unused slots.
301      */
302     rcpt_unused = job->rcpt_limit - job->rcpt_count;
303     msg_rcpt_unused = message->rcpt_limit - message->rcpt_count;
304     if (msg_rcpt_unused < rcpt_unused)
305 	rcpt_unused = msg_rcpt_unused;
306 
307     /*
308      * Transfer the unused recipient slots back to the transport pool and to
309      * the next not-fully-read job. Job's message limits are adjusted
310      * accordingly. Note that the transport pool can be negative if we used
311      * some of the rcpt_per_stack slots.
312      */
313     if (rcpt_unused > 0) {
314 	job->rcpt_limit -= rcpt_unused;
315 	message->rcpt_limit -= rcpt_unused;
316 	transport->rcpt_unused += rcpt_unused;
317 	if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) {
318 	    next->rcpt_limit += rcpt_unused;
319 	    next->message->rcpt_limit += rcpt_unused;
320 	    transport->rcpt_unused = 0;
321 	}
322     }
323 }
324 
325 /* qmgr_job_parent_gone - take care of orphaned stack children */
326 
qmgr_job_parent_gone(QMGR_JOB * job,QMGR_JOB * parent)327 static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent)
328 {
329     QMGR_JOB *child;
330 
331     while ((child = job->stack_children.next) != 0) {
332 	QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings);
333 	if (parent != 0)
334 	    QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings);
335 	child->stack_parent = parent;
336     }
337 }
338 
339 /* qmgr_job_unlink - unlink the job from the job lists */
340 
qmgr_job_unlink(QMGR_JOB * job)341 static void qmgr_job_unlink(QMGR_JOB *job)
342 {
343     const char *myname = "qmgr_job_unlink";
344     QMGR_TRANSPORT *transport = job->transport;
345 
346     /*
347      * Sanity checks.
348      */
349     if (job->stack_level != 0)
350 	msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level);
351     if (job->stack_parent != 0)
352 	msg_panic("%s: parent present", myname);
353     if (job->stack_siblings.next != 0)
354 	msg_panic("%s: siblings present", myname);
355 
356     /*
357      * Make sure that children of job on zero stack level are informed that
358      * their parent is gone too.
359      */
360     qmgr_job_parent_gone(job, 0);
361 
362     /*
363      * Update the current job pointer if necessary.
364      */
365     if (transport->job_current == job)
366 	transport->job_current = job->transport_peers.next;
367 
368     /*
369      * Invalidate the candidate selection cache if necessary.
370      */
371     if (job == transport->candidate_cache
372 	|| job == transport->candidate_cache_current)
373 	RESET_CANDIDATE_CACHE(transport);
374 
375     /*
376      * Remove the job from the job lists and mark it as unlinked.
377      */
378     QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
379     QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers);
380     job->stack_level = -1;
381 }
382 
383 /* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */
384 
qmgr_job_retire(QMGR_JOB * job)385 static void qmgr_job_retire(QMGR_JOB *job)
386 {
387     if (msg_verbose)
388 	msg_info("qmgr_job_retire: %s", job->message->queue_id);
389 
390     /*
391      * Pop the job from the job stack if necessary.
392      */
393     if (job->stack_level > 0)
394 	qmgr_job_pop(job);
395 
396     /*
397      * Make sure this job is not cached as the next unread job for this
398      * transport. The qmgr_entry_done() will make sure that the slots donated
399      * by this job are moved back to the transport pool as soon as possible.
400      */
401     qmgr_job_move_limits(job);
402 
403     /*
404      * Remove the job from the job lists. Note that it remains on the message
405      * job list, though, and that it can be revived by using
406      * qmgr_job_obtain(). Also note that the available slot counter is left
407      * intact.
408      */
409     qmgr_job_unlink(job);
410 }
411 
412 /* qmgr_job_free - release the job structure */
413 
qmgr_job_free(QMGR_JOB * job)414 void    qmgr_job_free(QMGR_JOB *job)
415 {
416     const char *myname = "qmgr_job_free";
417     QMGR_MESSAGE *message = job->message;
418     QMGR_TRANSPORT *transport = job->transport;
419 
420     if (msg_verbose)
421 	msg_info("%s: %s %s", myname, message->queue_id, transport->name);
422 
423     /*
424      * Sanity checks.
425      */
426     if (job->rcpt_count)
427 	msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count);
428 
429     /*
430      * Pop the job from the job stack if necessary.
431      */
432     if (job->stack_level > 0)
433 	qmgr_job_pop(job);
434 
435     /*
436      * Return any remaining recipient slots back to the recipient slots pool.
437      */
438     qmgr_job_move_limits(job);
439     if (job->rcpt_limit)
440 	msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit);
441 
442     /*
443      * Unlink and discard the structure. Check if the job is still linked on
444      * the job lists or if it was already retired before unlinking it.
445      */
446     if (job->stack_level >= 0)
447 	qmgr_job_unlink(job);
448     QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers);
449     htable_delete(transport->job_byname, message->queue_id, (void (*) (void *)) 0);
450     htable_free(job->peer_byname, (void (*) (void *)) 0);
451     myfree((void *) job);
452 }
453 
454 /* qmgr_job_count_slots - maintain the delivery slot counters */
455 
qmgr_job_count_slots(QMGR_JOB * job)456 static void qmgr_job_count_slots(QMGR_JOB *job)
457 {
458 
459     /*
460      * Count the number of delivery slots used during the delivery of the
461      * selected job. Also count the number of delivery slots available for
462      * its preemption.
463      *
464      * Despite its trivial look, this is one of the key parts of the theory
465      * behind this preempting scheduler.
466      */
467     job->slots_available++;
468     job->slots_used++;
469 
470     /*
471      * If the selected job is not the original current job, reset the
472      * candidate cache because the change above have slightly increased the
473      * chance of this job becoming a candidate next time.
474      *
475      * Don't expect that the change of the current jobs this turn will render
476      * the candidate cache invalid the next turn - it can happen that the
477      * next turn the original current job will be selected again and the
478      * cache would be considered valid in such case.
479      */
480     if (job != job->transport->candidate_cache_current)
481 	RESET_CANDIDATE_CACHE(job->transport);
482 }
483 
484 /* qmgr_job_candidate - find best job candidate for preempting given job */
485 
qmgr_job_candidate(QMGR_JOB * current)486 static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current)
487 {
488     QMGR_TRANSPORT *transport = current->transport;
489     QMGR_JOB *job, *best_job = 0;
490     double  score, best_score = 0.0;
491     int     max_slots, max_needed_entries, max_total_entries;
492     int     delay;
493     time_t  now = sane_time();
494 
495     /*
496      * Fetch the result directly from the cache if the cache is still valid.
497      *
498      * Note that we cache negative results too, so the cache must be invalidated
499      * by resetting the cached current job pointer, not the candidate pointer
500      * itself.
501      *
502      * In case the cache is valid and contains no candidate, we can ignore the
503      * time change, as it affects only which candidate is the best, not if
504      * one exists. However, this feature requires that we no longer relax the
505      * cache resetting rules, depending on the automatic cache timeout.
506      */
507     if (transport->candidate_cache_current == current
508 	&& (transport->candidate_cache_time == now
509 	    || transport->candidate_cache == 0))
510 	return (transport->candidate_cache);
511 
512     /*
513      * Estimate the minimum amount of delivery slots that can ever be
514      * accumulated for the given job. All jobs that won't fit into these
515      * slots are excluded from the candidate selection.
516      */
517     max_slots = (MIN_ENTRIES(current) - current->selected_entries
518 		 + current->slots_available) / transport->slot_cost;
519 
520     /*
521      * Select the candidate with best time_since_queued/total_recipients
522      * score. In addition to jobs which don't meet the max_slots limit, skip
523      * also jobs which don't have any selectable entries at the moment.
524      *
525      * Instead of traversing the whole job list we traverse it just from the
526      * current job forward. This has several advantages. First, we skip some
527      * of the blocker jobs and the current job itself right away. But the
528      * really important advantage is that we are sure that we don't consider
529      * any jobs that are already stack children of the current job. Thanks to
530      * this we can easily include all encountered jobs which are leaf
531      * children of some of the preempting stacks as valid candidates. All we
532      * need to do is to make sure we do not include any of the stack parents.
533      * And, because the leaf children are not ordered by the time since
534      * queued, we have to exclude them from the early loop end test.
535      *
536      * However, don't bother searching if we can't find anything suitable
537      * anyway.
538      */
539     if (max_slots > 0) {
540 	for (job = current->transport_peers.next; job; job = job->transport_peers.next) {
541 	    if (job->stack_children.next != 0 || IS_BLOCKER(job, transport))
542 		continue;
543 	    max_total_entries = MAX_ENTRIES(job);
544 	    max_needed_entries = max_total_entries - job->selected_entries;
545 	    delay = now - job->message->queued_time + 1;
546 	    if (max_needed_entries > 0 && max_needed_entries <= max_slots) {
547 		score = (double) delay / max_total_entries;
548 		if (score > best_score) {
549 		    best_score = score;
550 		    best_job = job;
551 		}
552 	    }
553 
554 	    /*
555 	     * Stop early if the best score is as good as it can get.
556 	     */
557 	    if (delay <= best_score && job->stack_level == 0)
558 		break;
559 	}
560     }
561 
562     /*
563      * Cache the result for later use.
564      */
565     transport->candidate_cache = best_job;
566     transport->candidate_cache_current = current;
567     transport->candidate_cache_time = now;
568 
569     return (best_job);
570 }
571 
572 /* qmgr_job_preempt - preempt large message with smaller one */
573 
qmgr_job_preempt(QMGR_JOB * current)574 static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current)
575 {
576     const char *myname = "qmgr_job_preempt";
577     QMGR_TRANSPORT *transport = current->transport;
578     QMGR_JOB *job, *prev;
579     int     expected_slots;
580     int     rcpt_slots;
581 
582     /*
583      * Suppress preempting completely if the current job is not big enough to
584      * accumulate even the minimal number of slots required.
585      *
586      * Also, don't look for better job candidate if there are no available slots
587      * yet (the count can get negative due to the slot loans below).
588      */
589     if (current->slots_available <= 0
590       || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost)
591 	return (current);
592 
593     /*
594      * Find best candidate for preempting the current job.
595      *
596      * Note that the function also takes care that the candidate fits within the
597      * number of delivery slots which the current job is still able to
598      * accumulate.
599      */
600     if ((job = qmgr_job_candidate(current)) == 0)
601 	return (current);
602 
603     /*
604      * Sanity checks.
605      */
606     if (job == current)
607 	msg_panic("%s: attempt to preempt itself", myname);
608     if (job->stack_children.next != 0)
609 	msg_panic("%s: already on the job stack (%d)", myname, job->stack_level);
610     if (job->stack_level < 0)
611 	msg_panic("%s: not on the job list (%d)", myname, job->stack_level);
612 
613     /*
614      * Check if there is enough available delivery slots accumulated to
615      * preempt the current job.
616      *
617      * The slot loaning scheme improves the average message response time. Note
618      * that the loan only allows the preemption happen earlier, though. It
619      * doesn't affect how many slots have to be "paid" - in either case the
620      * full number of slots required has to be accumulated later before the
621      * current job can be preempted again.
622      */
623     expected_slots = MAX_ENTRIES(job) - job->selected_entries;
624     if (current->slots_available / transport->slot_cost + transport->slot_loan
625 	< expected_slots * transport->slot_loan_factor / 100.0)
626 	return (current);
627 
628     /*
629      * Preempt the current job.
630      *
631      * This involves placing the selected candidate in front of the current job
632      * on the job list and updating the stack parent/child/sibling pointers
633      * appropriately. But first we need to make sure that the candidate is
634      * taken from its previous job stack which it might be top of.
635      */
636     if (job->stack_level > 0)
637 	qmgr_job_pop(job);
638     QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
639     prev = current->transport_peers.prev;
640     QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers);
641     job->stack_parent = current;
642     QMGR_LIST_APPEND(current->stack_children, job, stack_siblings);
643     job->stack_level = current->stack_level + 1;
644 
645     /*
646      * Update the current job pointer and explicitly reset the candidate
647      * cache.
648      */
649     transport->job_current = job;
650     RESET_CANDIDATE_CACHE(transport);
651 
652     /*
653      * Since the single job can be preempted by several jobs at the same
654      * time, we have to adjust the available slot count now to prevent using
655      * the same slots multiple times. To do that we subtract the number of
656      * slots the preempting job will supposedly use. This number will be
657      * corrected later when that job is popped from the stack to reflect the
658      * number of slots really used.
659      *
660      * As long as we don't need to keep track of how many slots were really
661      * used, we can (ab)use the slots_used counter for counting the
662      * difference between the real and expected amounts instead of the
663      * absolute amount.
664      */
665     current->slots_available -= expected_slots * transport->slot_cost;
666     job->slots_used = -expected_slots;
667 
668     /*
669      * Add part of extra recipient slots reserved for preempting jobs to the
670      * new current job if necessary.
671      *
672      * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such
673      * case.
674      */
675     if (job->message->rcpt_offset != 0) {
676 	rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2;
677 	job->rcpt_limit += rcpt_slots;
678 	job->message->rcpt_limit += rcpt_slots;
679 	transport->rcpt_unused -= rcpt_slots;
680     }
681     if (msg_verbose)
682 	msg_info("%s: %s by %s, level %d", myname, current->message->queue_id,
683 		 job->message->queue_id, job->stack_level);
684 
685     return (job);
686 }
687 
688 /* qmgr_job_pop - remove the job from its job preemption stack */
689 
qmgr_job_pop(QMGR_JOB * job)690 static void qmgr_job_pop(QMGR_JOB *job)
691 {
692     const char *myname = "qmgr_job_pop";
693     QMGR_TRANSPORT *transport = job->transport;
694     QMGR_JOB *parent;
695 
696     if (msg_verbose)
697 	msg_info("%s: %s", myname, job->message->queue_id);
698 
699     /*
700      * Sanity checks.
701      */
702     if (job->stack_level <= 0)
703 	msg_panic("%s: not on the job stack (%d)", myname, job->stack_level);
704 
705     /*
706      * Adjust the number of delivery slots available to preempt job's parent.
707      * Note that the -= actually adds back any unused slots, as we have
708      * already subtracted the expected amount of slots from both counters
709      * when we did the preemption.
710      *
711      * Note that we intentionally do not adjust slots_used of the parent. Doing
712      * so would decrease the maximum per message inflation factor if the
713      * preemption appeared near the end of parent delivery.
714      *
715      * For the same reason we do not adjust parent's slots_available if the
716      * parent is not the original parent that was preempted by this job
717      * (i.e., the original parent job has already completed).
718      *
719      * This is another key part of the theory behind this preempting scheduler.
720      */
721     if ((parent = job->stack_parent) != 0
722 	&& job->stack_level == parent->stack_level + 1)
723 	parent->slots_available -= job->slots_used * transport->slot_cost;
724 
725     /*
726      * Remove the job from its parent's children list.
727      */
728     if (parent != 0) {
729 	QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings);
730 	job->stack_parent = 0;
731     }
732 
733     /*
734      * If there is a parent, let it adopt all those orphaned children.
735      * Otherwise at least notify the children that their parent is gone.
736      */
737     qmgr_job_parent_gone(job, parent);
738 
739     /*
740      * Put the job back to stack level zero.
741      */
742     job->stack_level = 0;
743 
744     /*
745      * Explicitly reset the candidate cache. It's not worth trying to skip
746      * this under some complicated conditions - in most cases the popped job
747      * is the current job so we would have to reset it anyway.
748      */
749     RESET_CANDIDATE_CACHE(transport);
750 
751     /*
752      * Here we leave the remaining work involving the proper placement on the
753      * job list to the caller. The most important reason for this is that it
754      * allows us not to look up where exactly to place the job.
755      *
756      * The caller is also made responsible for invalidating the current job
757      * cache if necessary.
758      */
759 #if 0
760     QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
761     QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers);
762 
763     if (transport->job_current == job)
764 	transport->job_current = job->transport_peers.next;
765 #endif
766 }
767 
768 /* qmgr_job_peer_select - select next peer suitable for delivery */
769 
qmgr_job_peer_select(QMGR_JOB * job)770 static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job)
771 {
772     QMGR_PEER *peer;
773     QMGR_MESSAGE *message = job->message;
774 
775     /*
776      * Try reading in more recipients. We do that as soon as possible
777      * (almost, see below), to make sure there is enough new blood pouring
778      * in. Otherwise single recipient for slow destination might starve the
779      * entire message delivery, leaving lot of fast destination recipients
780      * sitting idle in the queue file.
781      *
782      * Ideally we would like to read in recipients whenever there is a space,
783      * but to prevent excessive I/O, we read them only when enough time has
784      * passed or we can read enough of them at once.
785      *
786      * Note that even if we read the recipients few at a time, the message
787      * loading code tries to put them to existing recipient entries whenever
788      * possible, so the per-destination recipient grouping is not grossly
789      * affected.
790      *
791      * XXX Workaround for logic mismatch. The message->refcount test needs
792      * explanation. If the refcount is zero, it means that qmgr_active_done()
793      * is being completed asynchronously.  In such case, we can't read in
794      * more recipients as bad things would happen after qmgr_active_done()
795      * continues processing. Note that this results in the given job being
796      * stalled for some time, but fortunately this particular situation is so
797      * rare that it is not critical. Still we seek for better solution.
798      */
799     if (message->rcpt_offset != 0
800 	&& message->refcount > 0
801 	&& (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit
802 	    || (message->rcpt_limit > message->rcpt_count
803     && sane_time() - message->refill_time >= job->transport->refill_delay)))
804 	qmgr_message_realloc(message);
805 
806     /*
807      * Get the next suitable peer, if there is any.
808      */
809     if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0)
810 	return (peer);
811 
812     /*
813      * There is no suitable peer in-core, so try reading in more recipients
814      * if possible. This is our last chance to get suitable peer before
815      * giving up on this job for now.
816      *
817      * XXX For message->refcount, see above.
818      */
819     if (message->rcpt_offset != 0
820 	&& message->refcount > 0
821 	&& message->rcpt_limit > message->rcpt_count) {
822 	qmgr_message_realloc(message);
823 	if (HAS_ENTRIES(job))
824 	    return (qmgr_peer_select(job));
825     }
826     return (0);
827 }
828 
829 /* qmgr_job_entry_select - select next entry suitable for delivery */
830 
qmgr_job_entry_select(QMGR_TRANSPORT * transport)831 QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
832 {
833     QMGR_JOB *job, *next;
834     QMGR_PEER *peer;
835     QMGR_ENTRY *entry;
836 
837     /*
838      * Get the current job if there is one.
839      */
840     if ((job = transport->job_current) == 0)
841 	return (0);
842 
843     /*
844      * Exercise the preempting algorithm if enabled.
845      *
846      * The slot_cost equal to 1 causes the algorithm to degenerate and is
847      * therefore disabled too.
848      */
849     if (transport->slot_cost >= 2)
850 	job = qmgr_job_preempt(job);
851 
852     /*
853      * Select next entry suitable for delivery. In case the current job can't
854      * provide one because of the per-destination concurrency limits, we mark
855      * it as a "blocker" job and continue with the next job on the job list.
856      *
857      * Note that the loop also takes care of getting the "stall" jobs (job with
858      * no entries currently available) out of the way if necessary. Stall
859      * jobs can appear in case of multi-transport messages whose recipients
860      * don't fit in-core at once. Some jobs created by such message may have
861      * only few recipients and would stay on the job list until all other
862      * jobs of that message are delivered, blocking precious recipient slots
863      * available to this transport. Or it can happen that the job has some
864      * more entries but suddenly they all get deferred. Whatever the reason,
865      * we retire such jobs below if we happen to come across some.
866      */
867     for ( /* empty */ ; job; job = next) {
868 	next = job->transport_peers.next;
869 
870 	/*
871 	 * Don't bother if the job is known to have no available entries
872 	 * because of the per-destination concurrency limits.
873 	 */
874 	if (IS_BLOCKER(job, transport))
875 	    continue;
876 
877 	if ((peer = qmgr_job_peer_select(job)) != 0) {
878 
879 	    /*
880 	     * We have found a suitable peer. Select one of its entries and
881 	     * adjust the delivery slot counters.
882 	     */
883 	    entry = qmgr_entry_select(peer);
884 	    qmgr_job_count_slots(job);
885 
886 	    /*
887 	     * Remember the current job for the next time so we don't have to
888 	     * crawl over all those blockers again. They will be reconsidered
889 	     * when the concurrency limit permits.
890 	     */
891 	    transport->job_current = job;
892 
893 	    /*
894 	     * In case we selected the very last job entry, remove the job
895 	     * from the job lists right now.
896 	     *
897 	     * This action uses the assumption that once the job entry has been
898 	     * selected, it can be unselected only before the message ifself
899 	     * is deferred. Thus the job with all entries selected can't
900 	     * re-appear with more entries available for selection again
901 	     * (without reading in more entries from the queue file, which in
902 	     * turn invokes qmgr_job_obtain() which re-links the job back on
903 	     * the lists if necessary).
904 	     *
905 	     * Note that qmgr_job_move_limits() transfers the recipients slots
906 	     * correctly even if the job is unlinked from the job list thanks
907 	     * to the job_next_unread caching.
908 	     */
909 	    if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0)
910 		qmgr_job_retire(job);
911 
912 	    /*
913 	     * Finally. Hand back the fruit of our tedious effort.
914 	     */
915 	    return (entry);
916 	} else if (HAS_ENTRIES(job)) {
917 
918 	    /*
919 	     * The job can't be selected due the concurrency limits. Mark it
920 	     * together with its queues so we know they are blocking the job
921 	     * list and they get the appropriate treatment. In particular,
922 	     * all blockers will be reconsidered when one of the problematic
923 	     * queues will accept more deliveries. And the job itself will be
924 	     * reconsidered if it is assigned some more entries.
925 	     */
926 	    job->blocker_tag = transport->blocker_tag;
927 	    for (peer = job->peer_list.next; peer; peer = peer->peers.next)
928 		if (peer->entry_list.next != 0)
929 		    peer->queue->blocker_tag = transport->blocker_tag;
930 	} else {
931 
932 	    /*
933 	     * The job is "stalled". Retire it until it either gets freed or
934 	     * gets more entries later.
935 	     */
936 	    qmgr_job_retire(job);
937 	}
938     }
939 
940     /*
941      * We have not found any entry we could use for delivery. Well, things
942      * must have changed since this transport was selected for asynchronous
943      * allocation. Never mind. Clear the current job pointer and reluctantly
944      * report back that we have failed in our task.
945      */
946     transport->job_current = 0;
947     return (0);
948 }
949 
950 /* qmgr_job_blocker_update - update "blocked job" status */
951 
qmgr_job_blocker_update(QMGR_QUEUE * queue)952 void    qmgr_job_blocker_update(QMGR_QUEUE *queue)
953 {
954     QMGR_TRANSPORT *transport = queue->transport;
955 
956     /*
957      * If the queue was blocking some of the jobs on the job list, check if
958      * the concurrency limit has lifted. If there are still some pending
959      * deliveries, give it a try and unmark all transport blockers at once.
960      * The qmgr_job_entry_select() will do the rest. In either case make sure
961      * the queue is not marked as a blocker anymore, with extra handling of
962      * queues which were declared dead.
963      *
964      * Note that changing the blocker status also affects the candidate cache.
965      * Most of the cases would be automatically recognized by the current job
966      * change, but we play safe and reset the cache explicitly below.
967      *
968      * Keeping the transport blocker tag odd is an easy way to make sure the tag
969      * never matches jobs that are not explicitly marked as blockers.
970      */
971     if (queue->blocker_tag == transport->blocker_tag) {
972 	if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
973 	    transport->blocker_tag += 2;
974 	    transport->job_current = transport->job_list.next;
975 	    transport->candidate_cache_current = 0;
976 	}
977 	if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue))
978 	    queue->blocker_tag = 0;
979     }
980 }
981