1package scheduler
2
3import (
4	"fmt"
5	"time"
6
7	"sort"
8
9	log "github.com/hashicorp/go-hclog"
10
11	"github.com/hashicorp/nomad/helper"
12	"github.com/hashicorp/nomad/helper/uuid"
13	"github.com/hashicorp/nomad/nomad/structs"
14)
15
16const (
17	// batchedFailedAllocWindowSize is the window size used
18	// to batch up failed allocations before creating an eval
19	batchedFailedAllocWindowSize = 5 * time.Second
20
21	// rescheduleWindowSize is the window size relative to
22	// current time within which reschedulable allocations are placed.
23	// This helps protect against small clock drifts between servers
24	rescheduleWindowSize = 1 * time.Second
25)
26
27// allocUpdateType takes an existing allocation and a new job definition and
28// returns whether the allocation can ignore the change, requires a destructive
29// update, or can be inplace updated. If it can be inplace updated, an updated
30// allocation that has the new resources and alloc metrics attached will be
31// returned.
32type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job,
33	newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation)
34
35// allocReconciler is used to determine the set of allocations that require
36// placement, inplace updating or stopping given the job specification and
37// existing cluster state. The reconciler should only be used for batch and
38// service jobs.
39type allocReconciler struct {
40	// logger is used to log debug information. Logging should be kept at a
41	// minimal here
42	logger log.Logger
43
44	// canInplace is used to check if the allocation can be inplace upgraded
45	allocUpdateFn allocUpdateType
46
47	// batch marks whether the job is a batch job
48	batch bool
49
50	// job is the job being operated on, it may be nil if the job is being
51	// stopped via a purge
52	job *structs.Job
53
54	// jobID is the ID of the job being operated on. The job may be nil if it is
55	// being stopped so we require this separately.
56	jobID string
57
58	// oldDeployment is the last deployment for the job
59	oldDeployment *structs.Deployment
60
61	// deployment is the current deployment for the job
62	deployment *structs.Deployment
63
64	// deploymentPaused marks whether the deployment is paused
65	deploymentPaused bool
66
67	// deploymentFailed marks whether the deployment is failed
68	deploymentFailed bool
69
70	// taintedNodes contains a map of nodes that are tainted
71	taintedNodes map[string]*structs.Node
72
73	// existingAllocs is non-terminal existing allocations
74	existingAllocs []*structs.Allocation
75
76	// evalID is the ID of the evaluation that triggered the reconciler
77	evalID string
78
79	// now is the time used when determining rescheduling eligibility
80	// defaults to time.Now, and overidden in unit tests
81	now time.Time
82
83	// result is the results of the reconcile. During computation it can be
84	// used to store intermediate state
85	result *reconcileResults
86}
87
88// reconcileResults contains the results of the reconciliation and should be
89// applied by the scheduler.
90type reconcileResults struct {
91	// deployment is the deployment that should be created or updated as a
92	// result of scheduling
93	deployment *structs.Deployment
94
95	// deploymentUpdates contains a set of deployment updates that should be
96	// applied as a result of scheduling
97	deploymentUpdates []*structs.DeploymentStatusUpdate
98
99	// place is the set of allocations to place by the scheduler
100	place []allocPlaceResult
101
102	// destructiveUpdate is the set of allocations to apply a destructive update to
103	destructiveUpdate []allocDestructiveResult
104
105	// inplaceUpdate is the set of allocations to apply an inplace update to
106	inplaceUpdate []*structs.Allocation
107
108	// stop is the set of allocations to stop
109	stop []allocStopResult
110
111	// attributeUpdates are updates to the allocation that are not from a
112	// jobspec change.
113	attributeUpdates map[string]*structs.Allocation
114
115	// desiredTGUpdates captures the desired set of changes to make for each
116	// task group.
117	desiredTGUpdates map[string]*structs.DesiredUpdates
118
119	// desiredFollowupEvals is the map of follow up evaluations to create per task group
120	// This is used to create a delayed evaluation for rescheduling failed allocations.
121	desiredFollowupEvals map[string][]*structs.Evaluation
122}
123
124// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled.
125// this is used to create follow up evaluations
126type delayedRescheduleInfo struct {
127
128	// allocID is the ID of the allocation eligible to be rescheduled
129	allocID string
130
131	alloc *structs.Allocation
132
133	// rescheduleTime is the time to use in the delayed evaluation
134	rescheduleTime time.Time
135}
136
137func (r *reconcileResults) GoString() string {
138	base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d)",
139		len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop))
140
141	if r.deployment != nil {
142		base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID)
143	}
144	for _, u := range r.deploymentUpdates {
145		base += fmt.Sprintf("\nDeployment Update for ID %q: Status %q; Description %q",
146			u.DeploymentID, u.Status, u.StatusDescription)
147	}
148	for tg, u := range r.desiredTGUpdates {
149		base += fmt.Sprintf("\nDesired Changes for %q: %#v", tg, u)
150	}
151	return base
152}
153
154// Changes returns the number of total changes
155func (r *reconcileResults) Changes() int {
156	return len(r.place) + len(r.inplaceUpdate) + len(r.stop)
157}
158
159// NewAllocReconciler creates a new reconciler that should be used to determine
160// the changes required to bring the cluster state inline with the declared jobspec
161func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool,
162	jobID string, job *structs.Job, deployment *structs.Deployment,
163	existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string) *allocReconciler {
164	return &allocReconciler{
165		logger:         logger.Named("reconciler"),
166		allocUpdateFn:  allocUpdateFn,
167		batch:          batch,
168		jobID:          jobID,
169		job:            job,
170		deployment:     deployment.Copy(),
171		existingAllocs: existingAllocs,
172		taintedNodes:   taintedNodes,
173		evalID:         evalID,
174		now:            time.Now(),
175		result: &reconcileResults{
176			desiredTGUpdates:     make(map[string]*structs.DesiredUpdates),
177			desiredFollowupEvals: make(map[string][]*structs.Evaluation),
178		},
179	}
180}
181
182// Compute reconciles the existing cluster state and returns the set of changes
183// required to converge the job spec and state
184func (a *allocReconciler) Compute() *reconcileResults {
185	// Create the allocation matrix
186	m := newAllocMatrix(a.job, a.existingAllocs)
187
188	// Handle stopping unneeded deployments
189	a.cancelDeployments()
190
191	// If we are just stopping a job we do not need to do anything more than
192	// stopping all running allocs
193	if a.job.Stopped() {
194		a.handleStop(m)
195		return a.result
196	}
197
198	// Detect if the deployment is paused
199	if a.deployment != nil {
200		a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
201			a.deployment.Status == structs.DeploymentStatusPending
202		a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
203	}
204	if a.deployment == nil {
205		// When we create the deployment later, it will be in a pending
206		// state. But we also need to tell Compute we're paused, otherwise we
207		// make placements on the paused deployment.
208		if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) {
209			a.deploymentPaused = true
210		}
211	}
212
213	// Reconcile each group
214	complete := true
215	for group, as := range m {
216		groupComplete := a.computeGroup(group, as)
217		complete = complete && groupComplete
218	}
219
220	// Mark the deployment as complete if possible
221	if a.deployment != nil && complete {
222		if a.job.IsMultiregion() {
223			// the unblocking/successful states come after blocked, so we
224			// need to make sure we don't revert those states
225			if a.deployment.Status != structs.DeploymentStatusUnblocking &&
226				a.deployment.Status != structs.DeploymentStatusSuccessful {
227				a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
228					DeploymentID:      a.deployment.ID,
229					Status:            structs.DeploymentStatusBlocked,
230					StatusDescription: structs.DeploymentStatusDescriptionBlocked,
231				})
232			}
233		} else {
234			a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
235				DeploymentID:      a.deployment.ID,
236				Status:            structs.DeploymentStatusSuccessful,
237				StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
238			})
239		}
240	}
241
242	// Set the description of a created deployment
243	if d := a.result.deployment; d != nil {
244		if d.RequiresPromotion() {
245			if d.HasAutoPromote() {
246				d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
247			} else {
248				d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
249			}
250		}
251	}
252
253	return a.result
254}
255
256// cancelDeployments cancels any deployment that is not needed
257func (a *allocReconciler) cancelDeployments() {
258	// If the job is stopped and there is a non-terminal deployment, cancel it
259	if a.job.Stopped() {
260		if a.deployment != nil && a.deployment.Active() {
261			a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
262				DeploymentID:      a.deployment.ID,
263				Status:            structs.DeploymentStatusCancelled,
264				StatusDescription: structs.DeploymentStatusDescriptionStoppedJob,
265			})
266		}
267
268		// Nothing else to do
269		a.oldDeployment = a.deployment
270		a.deployment = nil
271		return
272	}
273
274	d := a.deployment
275	if d == nil {
276		return
277	}
278
279	// Check if the deployment is active and referencing an older job and cancel it
280	if d.JobCreateIndex != a.job.CreateIndex || d.JobVersion != a.job.Version {
281		if d.Active() {
282			a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
283				DeploymentID:      a.deployment.ID,
284				Status:            structs.DeploymentStatusCancelled,
285				StatusDescription: structs.DeploymentStatusDescriptionNewerJob,
286			})
287		}
288
289		a.oldDeployment = d
290		a.deployment = nil
291	}
292
293	// Clear it as the current deployment if it is successful
294	if d.Status == structs.DeploymentStatusSuccessful {
295		a.oldDeployment = d
296		a.deployment = nil
297	}
298}
299
300// handleStop marks all allocations to be stopped, handling the lost case
301func (a *allocReconciler) handleStop(m allocMatrix) {
302	for group, as := range m {
303		as = filterByTerminal(as)
304		untainted, migrate, lost := as.filterByTainted(a.taintedNodes)
305		a.markStop(untainted, "", allocNotNeeded)
306		a.markStop(migrate, "", allocNotNeeded)
307		a.markStop(lost, structs.AllocClientStatusLost, allocLost)
308		desiredChanges := new(structs.DesiredUpdates)
309		desiredChanges.Stop = uint64(len(as))
310		a.result.desiredTGUpdates[group] = desiredChanges
311	}
312}
313
314// markStop is a helper for marking a set of allocation for stop with a
315// particular client status and description.
316func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) {
317	for _, alloc := range allocs {
318		a.result.stop = append(a.result.stop, allocStopResult{
319			alloc:             alloc,
320			clientStatus:      clientStatus,
321			statusDescription: statusDescription,
322		})
323	}
324}
325
326// markDelayed does markStop, but optionally includes a FollowupEvalID so that we can update
327// the stopped alloc with its delayed rescheduling evalID
328func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescription string, followupEvals map[string]string) {
329	for _, alloc := range allocs {
330		a.result.stop = append(a.result.stop, allocStopResult{
331			alloc:             alloc,
332			clientStatus:      clientStatus,
333			statusDescription: statusDescription,
334			followupEvalID:    followupEvals[alloc.ID],
335		})
336	}
337}
338
339// computeGroup reconciles state for a particular task group. It returns whether
340// the deployment it is for is complete with regards to the task group.
341func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
342	// Create the desired update object for the group
343	desiredChanges := new(structs.DesiredUpdates)
344	a.result.desiredTGUpdates[group] = desiredChanges
345
346	// Get the task group. The task group may be nil if the job was updates such
347	// that the task group no longer exists
348	tg := a.job.LookupTaskGroup(group)
349
350	// If the task group is nil, then the task group has been removed so all we
351	// need to do is stop everything
352	if tg == nil {
353		untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
354		a.markStop(untainted, "", allocNotNeeded)
355		a.markStop(migrate, "", allocNotNeeded)
356		a.markStop(lost, structs.AllocClientStatusLost, allocLost)
357		desiredChanges.Stop = uint64(len(untainted) + len(migrate) + len(lost))
358		return true
359	}
360
361	// Get the deployment state for the group
362	var dstate *structs.DeploymentState
363	existingDeployment := false
364	if a.deployment != nil {
365		dstate, existingDeployment = a.deployment.TaskGroups[group]
366	}
367	if !existingDeployment {
368		dstate = &structs.DeploymentState{}
369		if !tg.Update.IsEmpty() {
370			dstate.AutoRevert = tg.Update.AutoRevert
371			dstate.AutoPromote = tg.Update.AutoPromote
372			dstate.ProgressDeadline = tg.Update.ProgressDeadline
373		}
374	}
375
376	// Filter allocations that do not need to be considered because they are
377	// from an older job version and are terminal.
378	all, ignore := a.filterOldTerminalAllocs(all)
379	desiredChanges.Ignore += uint64(len(ignore))
380
381	// canaries is the set of canaries for the current deployment and all is all
382	// allocs including the canaries
383	canaries, all := a.handleGroupCanaries(all, desiredChanges)
384
385	// Determine what set of allocations are on tainted nodes
386	untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
387
388	// Determine what set of terminal allocations need to be rescheduled
389	untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment)
390
391	// Find delays for any lost allocs that have stop_after_client_disconnect
392	lostLater := lost.delayByStopAfterClientDisconnect()
393	lostLaterEvals := a.handleDelayedLost(lostLater, all, tg.Name)
394
395	// Create batched follow up evaluations for allocations that are
396	// reschedulable later and mark the allocations for in place updating
397	a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
398
399	// Create a structure for choosing names. Seed with the taken names
400	// which is the union of untainted, rescheduled, allocs on migrating
401	// nodes, and allocs on down nodes (includes canaries)
402	nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow, lost))
403
404	// Stop any unneeded allocations and update the untainted set to not
405	// include stopped allocations.
406	canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
407	stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState, lostLaterEvals)
408	desiredChanges.Stop += uint64(len(stop))
409	untainted = untainted.difference(stop)
410
411	// Do inplace upgrades where possible and capture the set of upgrades that
412	// need to be done destructively.
413	ignore, inplace, destructive := a.computeUpdates(tg, untainted)
414	desiredChanges.Ignore += uint64(len(ignore))
415	desiredChanges.InPlaceUpdate += uint64(len(inplace))
416	if !existingDeployment {
417		dstate.DesiredTotal += len(destructive) + len(inplace)
418	}
419
420	// Remove the canaries now that we have handled rescheduling so that we do
421	// not consider them when making placement decisions.
422	if canaryState {
423		untainted = untainted.difference(canaries)
424	}
425
426	// The fact that we have destructive updates and have less canaries than is
427	// desired means we need to create canaries
428	strategy := tg.Update
429	canariesPromoted := dstate != nil && dstate.Promoted
430	requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
431	if requireCanary {
432		dstate.DesiredCanaries = strategy.Canary
433	}
434	if requireCanary && !a.deploymentPaused && !a.deploymentFailed {
435		number := strategy.Canary - len(canaries)
436		desiredChanges.Canary += uint64(number)
437
438		for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) {
439			a.result.place = append(a.result.place, allocPlaceResult{
440				name:      name,
441				canary:    true,
442				taskGroup: tg,
443			})
444		}
445	}
446
447	// Determine how many we can place
448	canaryState = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
449	limit := a.computeLimit(tg, untainted, destructive, migrate, canaryState)
450
451	// Place if:
452	// * The deployment is not paused or failed
453	// * Not placing any canaries
454	// * If there are any canaries that they have been promoted
455	// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
456	// * An alloc was lost
457	var place []allocPlaceResult
458	if len(lostLater) == 0 {
459		place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState, lost)
460		if !existingDeployment {
461			dstate.DesiredTotal += len(place)
462		}
463	}
464
465	// deploymentPlaceReady tracks whether the deployment is in a state where
466	// placements can be made without any other consideration.
467	deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !canaryState
468
469	if deploymentPlaceReady {
470		desiredChanges.Place += uint64(len(place))
471		a.result.place = append(a.result.place, place...)
472		a.markStop(rescheduleNow, "", allocRescheduled)
473		desiredChanges.Stop += uint64(len(rescheduleNow))
474
475		min := helper.IntMin(len(place), limit)
476		limit -= min
477	} else if !deploymentPlaceReady {
478		// We do not want to place additional allocations but in the case we
479		// have lost allocations or allocations that require rescheduling now,
480		// we do so regardless to avoid odd user experiences.
481		if len(lost) != 0 {
482			allowed := helper.IntMin(len(lost), len(place))
483			desiredChanges.Place += uint64(allowed)
484			a.result.place = append(a.result.place, place[:allowed]...)
485		}
486
487		// Handle rescheduling of failed allocations even if the deployment is
488		// failed. We do not reschedule if the allocation is part of the failed
489		// deployment.
490		if now := len(rescheduleNow); now != 0 {
491			for _, p := range place {
492				prev := p.PreviousAllocation()
493				if p.IsRescheduling() && !(a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID) {
494					a.result.place = append(a.result.place, p)
495					desiredChanges.Place++
496
497					a.result.stop = append(a.result.stop, allocStopResult{
498						alloc:             prev,
499						statusDescription: allocRescheduled,
500					})
501					desiredChanges.Stop++
502				}
503			}
504		}
505	}
506
507	if deploymentPlaceReady {
508		// Do all destructive updates
509		min := helper.IntMin(len(destructive), limit)
510		desiredChanges.DestructiveUpdate += uint64(min)
511		desiredChanges.Ignore += uint64(len(destructive) - min)
512		for _, alloc := range destructive.nameOrder()[:min] {
513			a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{
514				placeName:             alloc.Name,
515				placeTaskGroup:        tg,
516				stopAlloc:             alloc,
517				stopStatusDescription: allocUpdating,
518			})
519		}
520	} else {
521		desiredChanges.Ignore += uint64(len(destructive))
522	}
523
524	// Migrate all the allocations
525	desiredChanges.Migrate += uint64(len(migrate))
526	for _, alloc := range migrate.nameOrder() {
527		a.result.stop = append(a.result.stop, allocStopResult{
528			alloc:             alloc,
529			statusDescription: allocMigrating,
530		})
531		a.result.place = append(a.result.place, allocPlaceResult{
532			name:          alloc.Name,
533			canary:        alloc.DeploymentStatus.IsCanary(),
534			taskGroup:     tg,
535			previousAlloc: alloc,
536
537			downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
538			minJobVersion:      alloc.Job.Version,
539		})
540	}
541
542	// Create new deployment if:
543	// 1. Updating a job specification
544	// 2. No running allocations (first time running a job)
545	updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0
546	hadRunning := false
547	for _, alloc := range all {
548		if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex {
549			hadRunning = true
550			break
551		}
552	}
553
554	// Create a new deployment if necessary
555	if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
556		// A previous group may have made the deployment already
557		if a.deployment == nil {
558			a.deployment = structs.NewDeployment(a.job)
559			// in multiregion jobs, most deployments start in a pending state
560			if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
561				a.deployment.Status = structs.DeploymentStatusPending
562				a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
563			}
564			a.result.deployment = a.deployment
565		}
566
567		// Attach the groups deployment state to the deployment
568		a.deployment.TaskGroups[group] = dstate
569	}
570
571	// deploymentComplete is whether the deployment is complete which largely
572	// means that no placements were made or desired to be made
573	deploymentComplete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 && !requireCanary
574
575	// Final check to see if the deployment is complete is to ensure everything
576	// is healthy
577	if deploymentComplete && a.deployment != nil {
578		if dstate, ok := a.deployment.TaskGroups[group]; ok {
579			if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
580				(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
581				deploymentComplete = false
582			}
583		}
584	}
585
586	return deploymentComplete
587}
588
589// filterOldTerminalAllocs filters allocations that should be ignored since they
590// are allocations that are terminal from a previous job version.
591func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) {
592	if !a.batch {
593		return all, nil
594	}
595
596	filtered = filtered.union(all)
597	ignored := make(map[string]*structs.Allocation)
598
599	// Ignore terminal batch jobs from older versions
600	for id, alloc := range filtered {
601		older := alloc.Job.Version < a.job.Version || alloc.Job.CreateIndex < a.job.CreateIndex
602		if older && alloc.TerminalStatus() {
603			delete(filtered, id)
604			ignored[id] = alloc
605		}
606	}
607
608	return filtered, ignored
609}
610
611// handleGroupCanaries handles the canaries for the group by stopping the
612// unneeded ones and returning the current set of canaries and the updated total
613// set of allocs for the group
614func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) {
615	// Stop any canary from an older deployment or from a failed one
616	var stop []string
617
618	// Cancel any non-promoted canaries from the older deployment
619	if a.oldDeployment != nil {
620		for _, dstate := range a.oldDeployment.TaskGroups {
621			if !dstate.Promoted {
622				stop = append(stop, dstate.PlacedCanaries...)
623			}
624		}
625	}
626
627	// Cancel any non-promoted canaries from a failed deployment
628	if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusFailed {
629		for _, dstate := range a.deployment.TaskGroups {
630			if !dstate.Promoted {
631				stop = append(stop, dstate.PlacedCanaries...)
632			}
633		}
634	}
635
636	// stopSet is the allocSet that contains the canaries we desire to stop from
637	// above.
638	stopSet := all.fromKeys(stop)
639	a.markStop(stopSet, "", allocNotNeeded)
640	desiredChanges.Stop += uint64(len(stopSet))
641	all = all.difference(stopSet)
642
643	// Capture our current set of canaries and handle any migrations that are
644	// needed by just stopping them.
645	if a.deployment != nil {
646		var canaryIDs []string
647		for _, dstate := range a.deployment.TaskGroups {
648			canaryIDs = append(canaryIDs, dstate.PlacedCanaries...)
649		}
650
651		canaries = all.fromKeys(canaryIDs)
652		untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes)
653		a.markStop(migrate, "", allocMigrating)
654		a.markStop(lost, structs.AllocClientStatusLost, allocLost)
655
656		canaries = untainted
657		all = all.difference(migrate, lost)
658	}
659
660	return canaries, all
661}
662
663// computeLimit returns the placement limit for a particular group. The inputs
664// are the group definition, the untainted, destructive, and migrate allocation
665// set and whether we are in a canary state.
666func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int {
667	// If there is no update strategy or deployment for the group we can deploy
668	// as many as the group has
669	if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 {
670		return group.Count
671	} else if a.deploymentPaused || a.deploymentFailed {
672		// If the deployment is paused or failed, do not create anything else
673		return 0
674	}
675
676	// If we have canaries and they have not been promoted the limit is 0
677	if canaryState {
678		return 0
679	}
680
681	// If we have been promoted or there are no canaries, the limit is the
682	// configured MaxParallel minus any outstanding non-healthy alloc for the
683	// deployment
684	limit := group.Update.MaxParallel
685	if a.deployment != nil {
686		partOf, _ := untainted.filterByDeployment(a.deployment.ID)
687		for _, alloc := range partOf {
688			// An unhealthy allocation means nothing else should be happen.
689			if alloc.DeploymentStatus.IsUnhealthy() {
690				return 0
691			}
692
693			if !alloc.DeploymentStatus.IsHealthy() {
694				limit--
695			}
696		}
697	}
698
699	// The limit can be less than zero in the case that the job was changed such
700	// that it required destructive changes and the count was scaled up.
701	if limit < 0 {
702		return 0
703	}
704
705	return limit
706}
707
708// computePlacement returns the set of allocations to place given the group
709// definition, the set of untainted, migrating and reschedule allocations for the group.
710//
711// Placements will meet or exceed group count.
712func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
713	nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet,
714	canaryState bool, lost allocSet) []allocPlaceResult {
715
716	// Add rescheduled placement results
717	var place []allocPlaceResult
718	for _, alloc := range reschedule {
719		place = append(place, allocPlaceResult{
720			name:          alloc.Name,
721			taskGroup:     group,
722			previousAlloc: alloc,
723			reschedule:    true,
724			canary:        alloc.DeploymentStatus.IsCanary(),
725
726			downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
727			minJobVersion:      alloc.Job.Version,
728			lost:               false,
729		})
730	}
731
732	// Add replacements for lost allocs up to group.Count
733	existing := len(untainted) + len(migrate) + len(reschedule)
734
735	for _, alloc := range lost {
736		if existing >= group.Count {
737			// Reached desired count, do not replace remaining lost
738			// allocs
739			break
740		}
741
742		existing++
743		place = append(place, allocPlaceResult{
744			name:               alloc.Name,
745			taskGroup:          group,
746			previousAlloc:      alloc,
747			reschedule:         false,
748			canary:             alloc.DeploymentStatus.IsCanary(),
749			downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
750			minJobVersion:      alloc.Job.Version,
751			lost:               true,
752		})
753	}
754
755	// Add remaining placement results
756	if existing < group.Count {
757		for _, name := range nameIndex.Next(uint(group.Count - existing)) {
758			place = append(place, allocPlaceResult{
759				name:               name,
760				taskGroup:          group,
761				downgradeNonCanary: canaryState,
762			})
763		}
764	}
765
766	return place
767}
768
769// computeStop returns the set of allocations that are marked for stopping given
770// the group definition, the set of allocations in various states and whether we
771// are canarying.
772func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
773	untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet {
774
775	// Mark all lost allocations for stop.
776	var stop allocSet
777	stop = stop.union(lost)
778	a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
779
780	// If we are still deploying or creating canaries, don't stop them
781	if canaryState {
782		untainted = untainted.difference(canaries)
783	}
784
785	// Hot path the nothing to do case
786	remove := len(untainted) + len(migrate) - group.Count
787	if remove <= 0 {
788		return stop
789	}
790
791	// Filter out any terminal allocations from the untainted set
792	// This is so that we don't try to mark them as stopped redundantly
793	untainted = filterByTerminal(untainted)
794
795	// Prefer stopping any alloc that has the same name as the canaries if we
796	// are promoted
797	if !canaryState && len(canaries) != 0 {
798		canaryNames := canaries.nameSet()
799		for id, alloc := range untainted.difference(canaries) {
800			if _, match := canaryNames[alloc.Name]; match {
801				stop[id] = alloc
802				a.result.stop = append(a.result.stop, allocStopResult{
803					alloc:             alloc,
804					statusDescription: allocNotNeeded,
805				})
806				delete(untainted, id)
807
808				remove--
809				if remove == 0 {
810					return stop
811				}
812			}
813		}
814	}
815
816	// Prefer selecting from the migrating set before stopping existing allocs
817	if len(migrate) != 0 {
818		mNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate)
819		removeNames := mNames.Highest(uint(remove))
820		for id, alloc := range migrate {
821			if _, match := removeNames[alloc.Name]; !match {
822				continue
823			}
824			a.result.stop = append(a.result.stop, allocStopResult{
825				alloc:             alloc,
826				statusDescription: allocNotNeeded,
827			})
828			delete(migrate, id)
829			stop[id] = alloc
830			nameIndex.UnsetIndex(alloc.Index())
831
832			remove--
833			if remove == 0 {
834				return stop
835			}
836		}
837	}
838
839	// Select the allocs with the highest count to remove
840	removeNames := nameIndex.Highest(uint(remove))
841	for id, alloc := range untainted {
842		if _, ok := removeNames[alloc.Name]; ok {
843			stop[id] = alloc
844			a.result.stop = append(a.result.stop, allocStopResult{
845				alloc:             alloc,
846				statusDescription: allocNotNeeded,
847			})
848			delete(untainted, id)
849
850			remove--
851			if remove == 0 {
852				return stop
853			}
854		}
855	}
856
857	// It is possible that we didn't stop as many as we should have if there
858	// were allocations with duplicate names.
859	for id, alloc := range untainted {
860		stop[id] = alloc
861		a.result.stop = append(a.result.stop, allocStopResult{
862			alloc:             alloc,
863			statusDescription: allocNotNeeded,
864		})
865		delete(untainted, id)
866
867		remove--
868		if remove == 0 {
869			return stop
870		}
871	}
872
873	return stop
874}
875
876// computeUpdates determines which allocations for the passed group require
877// updates. Three groups are returned:
878// 1. Those that require no upgrades
879// 2. Those that can be upgraded in-place. These are added to the results
880// automatically since the function contains the correct state to do so,
881// 3. Those that require destructive updates
882func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) {
883	// Determine the set of allocations that need to be updated
884	ignore = make(map[string]*structs.Allocation)
885	inplace = make(map[string]*structs.Allocation)
886	destructive = make(map[string]*structs.Allocation)
887
888	for _, alloc := range untainted {
889		ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group)
890		if ignoreChange {
891			ignore[alloc.ID] = alloc
892		} else if destructiveChange {
893			destructive[alloc.ID] = alloc
894		} else {
895			inplace[alloc.ID] = alloc
896			a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc)
897		}
898	}
899
900	return
901}
902
903// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field
904// set for allocations that are eligible to be rescheduled later, and marks the alloc with
905// the followupEvalID
906func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
907	// followupEvals are created in the same way as for delayed lost allocs
908	allocIDToFollowupEvalID := a.handleDelayedLost(rescheduleLater, all, tgName)
909
910	// Initialize the annotations
911	if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
912		a.result.attributeUpdates = make(map[string]*structs.Allocation)
913	}
914
915	// Create updates that will be applied to the allocs to mark the FollowupEvalID
916	for allocID, evalID := range allocIDToFollowupEvalID {
917		existingAlloc := all[allocID]
918		updatedAlloc := existingAlloc.Copy()
919		updatedAlloc.FollowupEvalID = evalID
920		a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
921	}
922}
923
924// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
925// lost allocations. followupEvals are appended to a.result as a side effect, we return a
926// map of alloc IDs to their followupEval IDs
927func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
928	if len(rescheduleLater) == 0 {
929		return map[string]string{}
930	}
931
932	// Sort by time
933	sort.Slice(rescheduleLater, func(i, j int) bool {
934		return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime)
935	})
936
937	var evals []*structs.Evaluation
938	nextReschedTime := rescheduleLater[0].rescheduleTime
939	allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater))
940
941	// Create a new eval for the first batch
942	eval := &structs.Evaluation{
943		ID:                uuid.Generate(),
944		Namespace:         a.job.Namespace,
945		Priority:          a.job.Priority,
946		Type:              a.job.Type,
947		TriggeredBy:       structs.EvalTriggerRetryFailedAlloc,
948		JobID:             a.job.ID,
949		JobModifyIndex:    a.job.ModifyIndex,
950		Status:            structs.EvalStatusPending,
951		StatusDescription: reschedulingFollowupEvalDesc,
952		WaitUntil:         nextReschedTime,
953	}
954	evals = append(evals, eval)
955
956	for _, allocReschedInfo := range rescheduleLater {
957		if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
958			allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
959		} else {
960			// Start a new batch
961			nextReschedTime = allocReschedInfo.rescheduleTime
962			// Create a new eval for the new batch
963			eval = &structs.Evaluation{
964				ID:             uuid.Generate(),
965				Namespace:      a.job.Namespace,
966				Priority:       a.job.Priority,
967				Type:           a.job.Type,
968				TriggeredBy:    structs.EvalTriggerRetryFailedAlloc,
969				JobID:          a.job.ID,
970				JobModifyIndex: a.job.ModifyIndex,
971				Status:         structs.EvalStatusPending,
972				WaitUntil:      nextReschedTime,
973			}
974			evals = append(evals, eval)
975			// Set the evalID for the first alloc in this new batch
976			allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
977		}
978	}
979
980	a.result.desiredFollowupEvals[tgName] = evals
981
982	return allocIDToFollowupEvalID
983}
984