1package scheduler
2
3import (
4	"fmt"
5	"time"
6
7	"sort"
8
9	log "github.com/hashicorp/go-hclog"
10
11	"github.com/hashicorp/nomad/helper"
12	"github.com/hashicorp/nomad/helper/uuid"
13	"github.com/hashicorp/nomad/nomad/structs"
14)
15
16const (
17	// batchedFailedAllocWindowSize is the window size used
18	// to batch up failed allocations before creating an eval
19	batchedFailedAllocWindowSize = 5 * time.Second
20
21	// rescheduleWindowSize is the window size relative to
22	// current time within which reschedulable allocations are placed.
23	// This helps protect against small clock drifts between servers
24	rescheduleWindowSize = 1 * time.Second
25)
26
27// allocUpdateType takes an existing allocation and a new job definition and
28// returns whether the allocation can ignore the change, requires a destructive
29// update, or can be inplace updated. If it can be inplace updated, an updated
30// allocation that has the new resources and alloc metrics attached will be
31// returned.
32type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job,
33	newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation)
34
35// allocReconciler is used to determine the set of allocations that require
36// placement, inplace updating or stopping given the job specification and
37// existing cluster state. The reconciler should only be used for batch and
38// service jobs.
39type allocReconciler struct {
40	// logger is used to log debug information. Logging should be kept at a
41	// minimal here
42	logger log.Logger
43
44	// canInplace is used to check if the allocation can be inplace upgraded
45	allocUpdateFn allocUpdateType
46
47	// batch marks whether the job is a batch job
48	batch bool
49
50	// job is the job being operated on, it may be nil if the job is being
51	// stopped via a purge
52	job *structs.Job
53
54	// jobID is the ID of the job being operated on. The job may be nil if it is
55	// being stopped so we require this separately.
56	jobID string
57
58	// oldDeployment is the last deployment for the job
59	oldDeployment *structs.Deployment
60
61	// deployment is the current deployment for the job
62	deployment *structs.Deployment
63
64	// deploymentPaused marks whether the deployment is paused
65	deploymentPaused bool
66
67	// deploymentFailed marks whether the deployment is failed
68	deploymentFailed bool
69
70	// taintedNodes contains a map of nodes that are tainted
71	taintedNodes map[string]*structs.Node
72
73	// existingAllocs is non-terminal existing allocations
74	existingAllocs []*structs.Allocation
75
76	// evalID is the ID of the evaluation that triggered the reconciler
77	evalID string
78
79	// now is the time used when determining rescheduling eligibility
80	// defaults to time.Now, and overidden in unit tests
81	now time.Time
82
83	// result is the results of the reconcile. During computation it can be
84	// used to store intermediate state
85	result *reconcileResults
86}
87
88// reconcileResults contains the results of the reconciliation and should be
89// applied by the scheduler.
90type reconcileResults struct {
91	// deployment is the deployment that should be created or updated as a
92	// result of scheduling
93	deployment *structs.Deployment
94
95	// deploymentUpdates contains a set of deployment updates that should be
96	// applied as a result of scheduling
97	deploymentUpdates []*structs.DeploymentStatusUpdate
98
99	// place is the set of allocations to place by the scheduler
100	place []allocPlaceResult
101
102	// destructiveUpdate is the set of allocations to apply a destructive update to
103	destructiveUpdate []allocDestructiveResult
104
105	// inplaceUpdate is the set of allocations to apply an inplace update to
106	inplaceUpdate []*structs.Allocation
107
108	// stop is the set of allocations to stop
109	stop []allocStopResult
110
111	// attributeUpdates are updates to the allocation that are not from a
112	// jobspec change.
113	attributeUpdates map[string]*structs.Allocation
114
115	// desiredTGUpdates captures the desired set of changes to make for each
116	// task group.
117	desiredTGUpdates map[string]*structs.DesiredUpdates
118
119	// desiredFollowupEvals is the map of follow up evaluations to create per task group
120	// This is used to create a delayed evaluation for rescheduling failed allocations.
121	desiredFollowupEvals map[string][]*structs.Evaluation
122}
123
124// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled.
125// this is used to create follow up evaluations
126type delayedRescheduleInfo struct {
127
128	// allocID is the ID of the allocation eligible to be rescheduled
129	allocID string
130
131	alloc *structs.Allocation
132
133	// rescheduleTime is the time to use in the delayed evaluation
134	rescheduleTime time.Time
135}
136
137func (r *reconcileResults) GoString() string {
138	base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d)",
139		len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop))
140
141	if r.deployment != nil {
142		base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID)
143	}
144	for _, u := range r.deploymentUpdates {
145		base += fmt.Sprintf("\nDeployment Update for ID %q: Status %q; Description %q",
146			u.DeploymentID, u.Status, u.StatusDescription)
147	}
148	for tg, u := range r.desiredTGUpdates {
149		base += fmt.Sprintf("\nDesired Changes for %q: %#v", tg, u)
150	}
151	return base
152}
153
154// Changes returns the number of total changes
155func (r *reconcileResults) Changes() int {
156	return len(r.place) + len(r.inplaceUpdate) + len(r.stop)
157}
158
159// NewAllocReconciler creates a new reconciler that should be used to determine
160// the changes required to bring the cluster state inline with the declared jobspec
161func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool,
162	jobID string, job *structs.Job, deployment *structs.Deployment,
163	existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string) *allocReconciler {
164	return &allocReconciler{
165		logger:         logger.Named("reconciler"),
166		allocUpdateFn:  allocUpdateFn,
167		batch:          batch,
168		jobID:          jobID,
169		job:            job,
170		deployment:     deployment.Copy(),
171		existingAllocs: existingAllocs,
172		taintedNodes:   taintedNodes,
173		evalID:         evalID,
174		now:            time.Now(),
175		result: &reconcileResults{
176			desiredTGUpdates:     make(map[string]*structs.DesiredUpdates),
177			desiredFollowupEvals: make(map[string][]*structs.Evaluation),
178		},
179	}
180}
181
182// Compute reconciles the existing cluster state and returns the set of changes
183// required to converge the job spec and state
184func (a *allocReconciler) Compute() *reconcileResults {
185	// Create the allocation matrix
186	m := newAllocMatrix(a.job, a.existingAllocs)
187
188	// Handle stopping unneeded deployments
189	a.cancelDeployments()
190
191	// If we are just stopping a job we do not need to do anything more than
192	// stopping all running allocs
193	if a.job.Stopped() {
194		a.handleStop(m)
195		return a.result
196	}
197
198	// Detect if the deployment is paused
199	if a.deployment != nil {
200		a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused
201		a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
202	}
203
204	// Reconcile each group
205	complete := true
206	for group, as := range m {
207		groupComplete := a.computeGroup(group, as)
208		complete = complete && groupComplete
209	}
210
211	// Mark the deployment as complete if possible
212	if a.deployment != nil && complete {
213		a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
214			DeploymentID:      a.deployment.ID,
215			Status:            structs.DeploymentStatusSuccessful,
216			StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
217		})
218	}
219
220	// Set the description of a created deployment
221	if d := a.result.deployment; d != nil {
222		if d.RequiresPromotion() {
223			if d.HasAutoPromote() {
224				d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
225			} else {
226				d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
227			}
228		}
229	}
230
231	return a.result
232}
233
234// cancelDeployments cancels any deployment that is not needed
235func (a *allocReconciler) cancelDeployments() {
236	// If the job is stopped and there is a non-terminal deployment, cancel it
237	if a.job.Stopped() {
238		if a.deployment != nil && a.deployment.Active() {
239			a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
240				DeploymentID:      a.deployment.ID,
241				Status:            structs.DeploymentStatusCancelled,
242				StatusDescription: structs.DeploymentStatusDescriptionStoppedJob,
243			})
244		}
245
246		// Nothing else to do
247		a.oldDeployment = a.deployment
248		a.deployment = nil
249		return
250	}
251
252	d := a.deployment
253	if d == nil {
254		return
255	}
256
257	// Check if the deployment is active and referencing an older job and cancel it
258	if d.JobCreateIndex != a.job.CreateIndex || d.JobVersion != a.job.Version {
259		if d.Active() {
260			a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
261				DeploymentID:      a.deployment.ID,
262				Status:            structs.DeploymentStatusCancelled,
263				StatusDescription: structs.DeploymentStatusDescriptionNewerJob,
264			})
265		}
266
267		a.oldDeployment = d
268		a.deployment = nil
269	}
270
271	// Clear it as the current deployment if it is successful
272	if d.Status == structs.DeploymentStatusSuccessful {
273		a.oldDeployment = d
274		a.deployment = nil
275	}
276}
277
278// handleStop marks all allocations to be stopped, handling the lost case
279func (a *allocReconciler) handleStop(m allocMatrix) {
280	for group, as := range m {
281		as = filterByTerminal(as)
282		untainted, migrate, lost := as.filterByTainted(a.taintedNodes)
283		a.markStop(untainted, "", allocNotNeeded)
284		a.markStop(migrate, "", allocNotNeeded)
285		a.markStop(lost, structs.AllocClientStatusLost, allocLost)
286		desiredChanges := new(structs.DesiredUpdates)
287		desiredChanges.Stop = uint64(len(as))
288		a.result.desiredTGUpdates[group] = desiredChanges
289	}
290}
291
292// markStop is a helper for marking a set of allocation for stop with a
293// particular client status and description.
294func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) {
295	for _, alloc := range allocs {
296		a.result.stop = append(a.result.stop, allocStopResult{
297			alloc:             alloc,
298			clientStatus:      clientStatus,
299			statusDescription: statusDescription,
300		})
301	}
302}
303
304// computeGroup reconciles state for a particular task group. It returns whether
305// the deployment it is for is complete with regards to the task group.
306func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
307	// Create the desired update object for the group
308	desiredChanges := new(structs.DesiredUpdates)
309	a.result.desiredTGUpdates[group] = desiredChanges
310
311	// Get the task group. The task group may be nil if the job was updates such
312	// that the task group no longer exists
313	tg := a.job.LookupTaskGroup(group)
314
315	// If the task group is nil, then the task group has been removed so all we
316	// need to do is stop everything
317	if tg == nil {
318		untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
319		a.markStop(untainted, "", allocNotNeeded)
320		a.markStop(migrate, "", allocNotNeeded)
321		a.markStop(lost, structs.AllocClientStatusLost, allocLost)
322		desiredChanges.Stop = uint64(len(untainted) + len(migrate) + len(lost))
323		return true
324	}
325
326	// Get the deployment state for the group
327	var dstate *structs.DeploymentState
328	existingDeployment := false
329	if a.deployment != nil {
330		dstate, existingDeployment = a.deployment.TaskGroups[group]
331	}
332	if !existingDeployment {
333		dstate = &structs.DeploymentState{}
334		if !tg.Update.IsEmpty() {
335			dstate.AutoRevert = tg.Update.AutoRevert
336			dstate.AutoPromote = tg.Update.AutoPromote
337			dstate.ProgressDeadline = tg.Update.ProgressDeadline
338		}
339	}
340
341	// Filter allocations that do not need to be considered because they are
342	// from an older job version and are terminal.
343	all, ignore := a.filterOldTerminalAllocs(all)
344	desiredChanges.Ignore += uint64(len(ignore))
345
346	// canaries is the set of canaries for the current deployment and all is all
347	// allocs including the canaries
348	canaries, all := a.handleGroupCanaries(all, desiredChanges)
349
350	// Determine what set of allocations are on tainted nodes
351	untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
352
353	// Determine what set of terminal allocations need to be rescheduled
354	untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment)
355
356	// Create batched follow up evaluations for allocations that are
357	// reschedulable later and mark the allocations for in place updating
358	a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
359
360	// Create a structure for choosing names. Seed with the taken names which is
361	// the union of untainted and migrating nodes (includes canaries)
362	nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow))
363
364	// Stop any unneeded allocations and update the untainted set to not
365	// included stopped allocations.
366	canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
367	stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState)
368	desiredChanges.Stop += uint64(len(stop))
369	untainted = untainted.difference(stop)
370
371	// Do inplace upgrades where possible and capture the set of upgrades that
372	// need to be done destructively.
373	ignore, inplace, destructive := a.computeUpdates(tg, untainted)
374	desiredChanges.Ignore += uint64(len(ignore))
375	desiredChanges.InPlaceUpdate += uint64(len(inplace))
376	if !existingDeployment {
377		dstate.DesiredTotal += len(destructive) + len(inplace)
378	}
379
380	// Remove the canaries now that we have handled rescheduling so that we do
381	// not consider them when making placement decisions.
382	if canaryState {
383		untainted = untainted.difference(canaries)
384	}
385
386	// The fact that we have destructive updates and have less canaries than is
387	// desired means we need to create canaries
388	numDestructive := len(destructive)
389	strategy := tg.Update
390	canariesPromoted := dstate != nil && dstate.Promoted
391	requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
392	if requireCanary && !a.deploymentPaused && !a.deploymentFailed {
393		number := strategy.Canary - len(canaries)
394		desiredChanges.Canary += uint64(number)
395		if !existingDeployment {
396			dstate.DesiredCanaries = strategy.Canary
397		}
398
399		for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) {
400			a.result.place = append(a.result.place, allocPlaceResult{
401				name:      name,
402				canary:    true,
403				taskGroup: tg,
404			})
405		}
406	}
407
408	// Determine how many we can place
409	canaryState = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
410	limit := a.computeLimit(tg, untainted, destructive, migrate, canaryState)
411
412	// Place if:
413	// * The deployment is not paused or failed
414	// * Not placing any canaries
415	// * If there are any canaries that they have been promoted
416	place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
417	if !existingDeployment {
418		dstate.DesiredTotal += len(place)
419	}
420
421	// deploymentPlaceReady tracks whether the deployment is in a state where
422	// placements can be made without any other consideration.
423	deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !canaryState
424
425	if deploymentPlaceReady {
426		desiredChanges.Place += uint64(len(place))
427		for _, p := range place {
428			a.result.place = append(a.result.place, p)
429		}
430		a.markStop(rescheduleNow, "", allocRescheduled)
431		desiredChanges.Stop += uint64(len(rescheduleNow))
432
433		min := helper.IntMin(len(place), limit)
434		limit -= min
435	} else if !deploymentPlaceReady {
436		// We do not want to place additional allocations but in the case we
437		// have lost allocations or allocations that require rescheduling now,
438		// we do so regardless to avoid odd user experiences.
439		if len(lost) != 0 {
440			allowed := helper.IntMin(len(lost), len(place))
441			desiredChanges.Place += uint64(allowed)
442			for _, p := range place[:allowed] {
443				a.result.place = append(a.result.place, p)
444			}
445		}
446
447		// Handle rescheduling of failed allocations even if the deployment is
448		// failed. We do not reschedule if the allocation is part of the failed
449		// deployment.
450		if now := len(rescheduleNow); now != 0 {
451			for _, p := range place {
452				prev := p.PreviousAllocation()
453				if p.IsRescheduling() && !(a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID) {
454					a.result.place = append(a.result.place, p)
455					desiredChanges.Place++
456
457					a.result.stop = append(a.result.stop, allocStopResult{
458						alloc:             prev,
459						statusDescription: allocRescheduled,
460					})
461					desiredChanges.Stop++
462				}
463			}
464		}
465	}
466
467	if deploymentPlaceReady {
468		// Do all destructive updates
469		min := helper.IntMin(len(destructive), limit)
470		desiredChanges.DestructiveUpdate += uint64(min)
471		desiredChanges.Ignore += uint64(len(destructive) - min)
472		for _, alloc := range destructive.nameOrder()[:min] {
473			a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{
474				placeName:             alloc.Name,
475				placeTaskGroup:        tg,
476				stopAlloc:             alloc,
477				stopStatusDescription: allocUpdating,
478			})
479		}
480	} else {
481		desiredChanges.Ignore += uint64(len(destructive))
482	}
483
484	// Migrate all the allocations
485	desiredChanges.Migrate += uint64(len(migrate))
486	for _, alloc := range migrate.nameOrder() {
487		a.result.stop = append(a.result.stop, allocStopResult{
488			alloc:             alloc,
489			statusDescription: allocMigrating,
490		})
491		a.result.place = append(a.result.place, allocPlaceResult{
492			name:          alloc.Name,
493			canary:        false,
494			taskGroup:     tg,
495			previousAlloc: alloc,
496		})
497	}
498
499	// Create new deployment if:
500	// 1. Updating a job specification
501	// 2. No running allocations (first time running a job)
502	updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0
503	hadRunning := false
504	for _, alloc := range all {
505		if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex {
506			hadRunning = true
507			break
508		}
509	}
510
511	// Create a new deployment if necessary
512	if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
513		// A previous group may have made the deployment already
514		if a.deployment == nil {
515			a.deployment = structs.NewDeployment(a.job)
516			a.result.deployment = a.deployment
517		}
518
519		// Attach the groups deployment state to the deployment
520		a.deployment.TaskGroups[group] = dstate
521	}
522
523	// deploymentComplete is whether the deployment is complete which largely
524	// means that no placements were made or desired to be made
525	deploymentComplete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 && !requireCanary
526
527	// Final check to see if the deployment is complete is to ensure everything
528	// is healthy
529	if deploymentComplete && a.deployment != nil {
530		if dstate, ok := a.deployment.TaskGroups[group]; ok {
531			if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
532				(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
533				deploymentComplete = false
534			}
535		}
536	}
537
538	return deploymentComplete
539}
540
541// filterOldTerminalAllocs filters allocations that should be ignored since they
542// are allocations that are terminal from a previous job version.
543func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) {
544	if !a.batch {
545		return all, nil
546	}
547
548	filtered = filtered.union(all)
549	ignored := make(map[string]*structs.Allocation)
550
551	// Ignore terminal batch jobs from older versions
552	for id, alloc := range filtered {
553		older := alloc.Job.Version < a.job.Version || alloc.Job.CreateIndex < a.job.CreateIndex
554		if older && alloc.TerminalStatus() {
555			delete(filtered, id)
556			ignored[id] = alloc
557		}
558	}
559
560	return filtered, ignored
561}
562
563// handleGroupCanaries handles the canaries for the group by stopping the
564// unneeded ones and returning the current set of canaries and the updated total
565// set of allocs for the group
566func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) {
567	// Stop any canary from an older deployment or from a failed one
568	var stop []string
569
570	// Cancel any non-promoted canaries from the older deployment
571	if a.oldDeployment != nil {
572		for _, s := range a.oldDeployment.TaskGroups {
573			if !s.Promoted {
574				stop = append(stop, s.PlacedCanaries...)
575			}
576		}
577	}
578
579	// Cancel any non-promoted canaries from a failed deployment
580	if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusFailed {
581		for _, s := range a.deployment.TaskGroups {
582			if !s.Promoted {
583				stop = append(stop, s.PlacedCanaries...)
584			}
585		}
586	}
587
588	// stopSet is the allocSet that contains the canaries we desire to stop from
589	// above.
590	stopSet := all.fromKeys(stop)
591	a.markStop(stopSet, "", allocNotNeeded)
592	desiredChanges.Stop += uint64(len(stopSet))
593	all = all.difference(stopSet)
594
595	// Capture our current set of canaries and handle any migrations that are
596	// needed by just stopping them.
597	if a.deployment != nil {
598		var canaryIDs []string
599		for _, s := range a.deployment.TaskGroups {
600			canaryIDs = append(canaryIDs, s.PlacedCanaries...)
601		}
602
603		canaries = all.fromKeys(canaryIDs)
604		untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes)
605		a.markStop(migrate, "", allocMigrating)
606		a.markStop(lost, structs.AllocClientStatusLost, allocLost)
607
608		canaries = untainted
609		all = all.difference(migrate, lost)
610	}
611
612	return canaries, all
613}
614
615// computeLimit returns the placement limit for a particular group. The inputs
616// are the group definition, the untainted, destructive, and migrate allocation
617// set and whether we are in a canary state.
618func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int {
619	// If there is no update strategy or deployment for the group we can deploy
620	// as many as the group has
621	if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 {
622		return group.Count
623	} else if a.deploymentPaused || a.deploymentFailed {
624		// If the deployment is paused or failed, do not create anything else
625		return 0
626	}
627
628	// If we have canaries and they have not been promoted the limit is 0
629	if canaryState {
630		return 0
631	}
632
633	// If we have been promoted or there are no canaries, the limit is the
634	// configured MaxParallel minus any outstanding non-healthy alloc for the
635	// deployment
636	limit := group.Update.MaxParallel
637	if a.deployment != nil {
638		partOf, _ := untainted.filterByDeployment(a.deployment.ID)
639		for _, alloc := range partOf {
640			// An unhealthy allocation means nothing else should be happen.
641			if alloc.DeploymentStatus.IsUnhealthy() {
642				return 0
643			}
644
645			if !alloc.DeploymentStatus.IsHealthy() {
646				limit--
647			}
648		}
649	}
650
651	// The limit can be less than zero in the case that the job was changed such
652	// that it required destructive changes and the count was scaled up.
653	if limit < 0 {
654		return 0
655	}
656
657	return limit
658}
659
660// computePlacement returns the set of allocations to place given the group
661// definition, the set of untainted, migrating and reschedule allocations for the group.
662func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
663	nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult {
664
665	// Add rescheduled placement results
666	var place []allocPlaceResult
667	for _, alloc := range reschedule {
668		place = append(place, allocPlaceResult{
669			name:          alloc.Name,
670			taskGroup:     group,
671			previousAlloc: alloc,
672			reschedule:    true,
673			canary:        alloc.DeploymentStatus.IsCanary(),
674		})
675	}
676
677	// Hot path the nothing to do case
678	existing := len(untainted) + len(migrate) + len(reschedule)
679	if existing >= group.Count {
680		return place
681	}
682
683	// Add remaining placement results
684	if existing < group.Count {
685		for _, name := range nameIndex.Next(uint(group.Count - existing)) {
686			place = append(place, allocPlaceResult{
687				name:      name,
688				taskGroup: group,
689			})
690		}
691	}
692
693	return place
694}
695
696// computeStop returns the set of allocations that are marked for stopping given
697// the group definition, the set of allocations in various states and whether we
698// are canarying.
699func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
700	untainted, migrate, lost, canaries allocSet, canaryState bool) allocSet {
701
702	// Mark all lost allocations for stop. Previous allocation doesn't matter
703	// here since it is on a lost node
704	var stop allocSet
705	stop = stop.union(lost)
706	a.markStop(lost, structs.AllocClientStatusLost, allocLost)
707
708	// If we are still deploying or creating canaries, don't stop them
709	if canaryState {
710		untainted = untainted.difference(canaries)
711	}
712
713	// Hot path the nothing to do case
714	remove := len(untainted) + len(migrate) - group.Count
715	if remove <= 0 {
716		return stop
717	}
718
719	// Filter out any terminal allocations from the untainted set
720	// This is so that we don't try to mark them as stopped redundantly
721	untainted = filterByTerminal(untainted)
722
723	// Prefer stopping any alloc that has the same name as the canaries if we
724	// are promoted
725	if !canaryState && len(canaries) != 0 {
726		canaryNames := canaries.nameSet()
727		for id, alloc := range untainted.difference(canaries) {
728			if _, match := canaryNames[alloc.Name]; match {
729				stop[id] = alloc
730				a.result.stop = append(a.result.stop, allocStopResult{
731					alloc:             alloc,
732					statusDescription: allocNotNeeded,
733				})
734				delete(untainted, id)
735
736				remove--
737				if remove == 0 {
738					return stop
739				}
740			}
741		}
742	}
743
744	// Prefer selecting from the migrating set before stopping existing allocs
745	if len(migrate) != 0 {
746		mNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate)
747		removeNames := mNames.Highest(uint(remove))
748		for id, alloc := range migrate {
749			if _, match := removeNames[alloc.Name]; !match {
750				continue
751			}
752			a.result.stop = append(a.result.stop, allocStopResult{
753				alloc:             alloc,
754				statusDescription: allocNotNeeded,
755			})
756			delete(migrate, id)
757			stop[id] = alloc
758			nameIndex.UnsetIndex(alloc.Index())
759
760			remove--
761			if remove == 0 {
762				return stop
763			}
764		}
765	}
766
767	// Select the allocs with the highest count to remove
768	removeNames := nameIndex.Highest(uint(remove))
769	for id, alloc := range untainted {
770		if _, ok := removeNames[alloc.Name]; ok {
771			stop[id] = alloc
772			a.result.stop = append(a.result.stop, allocStopResult{
773				alloc:             alloc,
774				statusDescription: allocNotNeeded,
775			})
776			delete(untainted, id)
777
778			remove--
779			if remove == 0 {
780				return stop
781			}
782		}
783	}
784
785	// It is possible that we didn't stop as many as we should have if there
786	// were allocations with duplicate names.
787	for id, alloc := range untainted {
788		stop[id] = alloc
789		a.result.stop = append(a.result.stop, allocStopResult{
790			alloc:             alloc,
791			statusDescription: allocNotNeeded,
792		})
793		delete(untainted, id)
794
795		remove--
796		if remove == 0 {
797			return stop
798		}
799	}
800
801	return stop
802}
803
804// computeUpdates determines which allocations for the passed group require
805// updates. Three groups are returned:
806// 1. Those that require no upgrades
807// 2. Those that can be upgraded in-place. These are added to the results
808// automatically since the function contains the correct state to do so,
809// 3. Those that require destructive updates
810func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) {
811	// Determine the set of allocations that need to be updated
812	ignore = make(map[string]*structs.Allocation)
813	inplace = make(map[string]*structs.Allocation)
814	destructive = make(map[string]*structs.Allocation)
815
816	for _, alloc := range untainted {
817		ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group)
818		if ignoreChange {
819			ignore[alloc.ID] = alloc
820		} else if destructiveChange {
821			destructive[alloc.ID] = alloc
822		} else {
823			inplace[alloc.ID] = alloc
824			a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc)
825		}
826	}
827
828	return
829}
830
831// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set
832// for allocations that are eligible to be rescheduled later
833func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
834	if len(rescheduleLater) == 0 {
835		return
836	}
837
838	// Sort by time
839	sort.Slice(rescheduleLater, func(i, j int) bool {
840		return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime)
841	})
842
843	var evals []*structs.Evaluation
844	nextReschedTime := rescheduleLater[0].rescheduleTime
845	allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater))
846
847	// Create a new eval for the first batch
848	eval := &structs.Evaluation{
849		ID:                uuid.Generate(),
850		Namespace:         a.job.Namespace,
851		Priority:          a.job.Priority,
852		Type:              a.job.Type,
853		TriggeredBy:       structs.EvalTriggerRetryFailedAlloc,
854		JobID:             a.job.ID,
855		JobModifyIndex:    a.job.ModifyIndex,
856		Status:            structs.EvalStatusPending,
857		StatusDescription: reschedulingFollowupEvalDesc,
858		WaitUntil:         nextReschedTime,
859	}
860	evals = append(evals, eval)
861
862	for _, allocReschedInfo := range rescheduleLater {
863		if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
864			allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
865		} else {
866			// Start a new batch
867			nextReschedTime = allocReschedInfo.rescheduleTime
868			// Create a new eval for the new batch
869			eval = &structs.Evaluation{
870				ID:             uuid.Generate(),
871				Namespace:      a.job.Namespace,
872				Priority:       a.job.Priority,
873				Type:           a.job.Type,
874				TriggeredBy:    structs.EvalTriggerRetryFailedAlloc,
875				JobID:          a.job.ID,
876				JobModifyIndex: a.job.ModifyIndex,
877				Status:         structs.EvalStatusPending,
878				WaitUntil:      nextReschedTime,
879			}
880			evals = append(evals, eval)
881			// Set the evalID for the first alloc in this new batch
882			allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
883		}
884	}
885
886	a.result.desiredFollowupEvals[tgName] = evals
887
888	// Initialize the annotations
889	if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
890		a.result.attributeUpdates = make(map[string]*structs.Allocation)
891	}
892
893	// Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID
894	for allocID, evalID := range allocIDToFollowupEvalID {
895		existingAlloc := all[allocID]
896		updatedAlloc := existingAlloc.Copy()
897		updatedAlloc.FollowupEvalID = evalID
898		a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
899	}
900}
901