1package scheduler 2 3import ( 4 "fmt" 5 "time" 6 7 "sort" 8 9 log "github.com/hashicorp/go-hclog" 10 11 "github.com/hashicorp/nomad/helper" 12 "github.com/hashicorp/nomad/helper/uuid" 13 "github.com/hashicorp/nomad/nomad/structs" 14) 15 16const ( 17 // batchedFailedAllocWindowSize is the window size used 18 // to batch up failed allocations before creating an eval 19 batchedFailedAllocWindowSize = 5 * time.Second 20 21 // rescheduleWindowSize is the window size relative to 22 // current time within which reschedulable allocations are placed. 23 // This helps protect against small clock drifts between servers 24 rescheduleWindowSize = 1 * time.Second 25) 26 27// allocUpdateType takes an existing allocation and a new job definition and 28// returns whether the allocation can ignore the change, requires a destructive 29// update, or can be inplace updated. If it can be inplace updated, an updated 30// allocation that has the new resources and alloc metrics attached will be 31// returned. 32type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job, 33 newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) 34 35// allocReconciler is used to determine the set of allocations that require 36// placement, inplace updating or stopping given the job specification and 37// existing cluster state. The reconciler should only be used for batch and 38// service jobs. 39type allocReconciler struct { 40 // logger is used to log debug information. Logging should be kept at a 41 // minimal here 42 logger log.Logger 43 44 // canInplace is used to check if the allocation can be inplace upgraded 45 allocUpdateFn allocUpdateType 46 47 // batch marks whether the job is a batch job 48 batch bool 49 50 // job is the job being operated on, it may be nil if the job is being 51 // stopped via a purge 52 job *structs.Job 53 54 // jobID is the ID of the job being operated on. The job may be nil if it is 55 // being stopped so we require this separately. 56 jobID string 57 58 // oldDeployment is the last deployment for the job 59 oldDeployment *structs.Deployment 60 61 // deployment is the current deployment for the job 62 deployment *structs.Deployment 63 64 // deploymentPaused marks whether the deployment is paused 65 deploymentPaused bool 66 67 // deploymentFailed marks whether the deployment is failed 68 deploymentFailed bool 69 70 // taintedNodes contains a map of nodes that are tainted 71 taintedNodes map[string]*structs.Node 72 73 // existingAllocs is non-terminal existing allocations 74 existingAllocs []*structs.Allocation 75 76 // evalID is the ID of the evaluation that triggered the reconciler 77 evalID string 78 79 // now is the time used when determining rescheduling eligibility 80 // defaults to time.Now, and overidden in unit tests 81 now time.Time 82 83 // result is the results of the reconcile. During computation it can be 84 // used to store intermediate state 85 result *reconcileResults 86} 87 88// reconcileResults contains the results of the reconciliation and should be 89// applied by the scheduler. 90type reconcileResults struct { 91 // deployment is the deployment that should be created or updated as a 92 // result of scheduling 93 deployment *structs.Deployment 94 95 // deploymentUpdates contains a set of deployment updates that should be 96 // applied as a result of scheduling 97 deploymentUpdates []*structs.DeploymentStatusUpdate 98 99 // place is the set of allocations to place by the scheduler 100 place []allocPlaceResult 101 102 // destructiveUpdate is the set of allocations to apply a destructive update to 103 destructiveUpdate []allocDestructiveResult 104 105 // inplaceUpdate is the set of allocations to apply an inplace update to 106 inplaceUpdate []*structs.Allocation 107 108 // stop is the set of allocations to stop 109 stop []allocStopResult 110 111 // attributeUpdates are updates to the allocation that are not from a 112 // jobspec change. 113 attributeUpdates map[string]*structs.Allocation 114 115 // desiredTGUpdates captures the desired set of changes to make for each 116 // task group. 117 desiredTGUpdates map[string]*structs.DesiredUpdates 118 119 // desiredFollowupEvals is the map of follow up evaluations to create per task group 120 // This is used to create a delayed evaluation for rescheduling failed allocations. 121 desiredFollowupEvals map[string][]*structs.Evaluation 122} 123 124// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled. 125// this is used to create follow up evaluations 126type delayedRescheduleInfo struct { 127 128 // allocID is the ID of the allocation eligible to be rescheduled 129 allocID string 130 131 alloc *structs.Allocation 132 133 // rescheduleTime is the time to use in the delayed evaluation 134 rescheduleTime time.Time 135} 136 137func (r *reconcileResults) GoString() string { 138 base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d)", 139 len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop)) 140 141 if r.deployment != nil { 142 base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID) 143 } 144 for _, u := range r.deploymentUpdates { 145 base += fmt.Sprintf("\nDeployment Update for ID %q: Status %q; Description %q", 146 u.DeploymentID, u.Status, u.StatusDescription) 147 } 148 for tg, u := range r.desiredTGUpdates { 149 base += fmt.Sprintf("\nDesired Changes for %q: %#v", tg, u) 150 } 151 return base 152} 153 154// Changes returns the number of total changes 155func (r *reconcileResults) Changes() int { 156 return len(r.place) + len(r.inplaceUpdate) + len(r.stop) 157} 158 159// NewAllocReconciler creates a new reconciler that should be used to determine 160// the changes required to bring the cluster state inline with the declared jobspec 161func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool, 162 jobID string, job *structs.Job, deployment *structs.Deployment, 163 existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string) *allocReconciler { 164 return &allocReconciler{ 165 logger: logger.Named("reconciler"), 166 allocUpdateFn: allocUpdateFn, 167 batch: batch, 168 jobID: jobID, 169 job: job, 170 deployment: deployment.Copy(), 171 existingAllocs: existingAllocs, 172 taintedNodes: taintedNodes, 173 evalID: evalID, 174 now: time.Now(), 175 result: &reconcileResults{ 176 desiredTGUpdates: make(map[string]*structs.DesiredUpdates), 177 desiredFollowupEvals: make(map[string][]*structs.Evaluation), 178 }, 179 } 180} 181 182// Compute reconciles the existing cluster state and returns the set of changes 183// required to converge the job spec and state 184func (a *allocReconciler) Compute() *reconcileResults { 185 // Create the allocation matrix 186 m := newAllocMatrix(a.job, a.existingAllocs) 187 188 // Handle stopping unneeded deployments 189 a.cancelDeployments() 190 191 // If we are just stopping a job we do not need to do anything more than 192 // stopping all running allocs 193 if a.job.Stopped() { 194 a.handleStop(m) 195 return a.result 196 } 197 198 // Detect if the deployment is paused 199 if a.deployment != nil { 200 a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused || 201 a.deployment.Status == structs.DeploymentStatusPending 202 a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed 203 } 204 if a.deployment == nil { 205 // When we create the deployment later, it will be in a pending 206 // state. But we also need to tell Compute we're paused, otherwise we 207 // make placements on the paused deployment. 208 if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) { 209 a.deploymentPaused = true 210 } 211 } 212 213 // Reconcile each group 214 complete := true 215 for group, as := range m { 216 groupComplete := a.computeGroup(group, as) 217 complete = complete && groupComplete 218 } 219 220 // Mark the deployment as complete if possible 221 if a.deployment != nil && complete { 222 if a.job.IsMultiregion() { 223 // the unblocking/successful states come after blocked, so we 224 // need to make sure we don't revert those states 225 if a.deployment.Status != structs.DeploymentStatusUnblocking && 226 a.deployment.Status != structs.DeploymentStatusSuccessful { 227 a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ 228 DeploymentID: a.deployment.ID, 229 Status: structs.DeploymentStatusBlocked, 230 StatusDescription: structs.DeploymentStatusDescriptionBlocked, 231 }) 232 } 233 } else { 234 a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ 235 DeploymentID: a.deployment.ID, 236 Status: structs.DeploymentStatusSuccessful, 237 StatusDescription: structs.DeploymentStatusDescriptionSuccessful, 238 }) 239 } 240 } 241 242 // Set the description of a created deployment 243 if d := a.result.deployment; d != nil { 244 if d.RequiresPromotion() { 245 if d.HasAutoPromote() { 246 d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion 247 } else { 248 d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion 249 } 250 } 251 } 252 253 return a.result 254} 255 256// cancelDeployments cancels any deployment that is not needed 257func (a *allocReconciler) cancelDeployments() { 258 // If the job is stopped and there is a non-terminal deployment, cancel it 259 if a.job.Stopped() { 260 if a.deployment != nil && a.deployment.Active() { 261 a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ 262 DeploymentID: a.deployment.ID, 263 Status: structs.DeploymentStatusCancelled, 264 StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, 265 }) 266 } 267 268 // Nothing else to do 269 a.oldDeployment = a.deployment 270 a.deployment = nil 271 return 272 } 273 274 d := a.deployment 275 if d == nil { 276 return 277 } 278 279 // Check if the deployment is active and referencing an older job and cancel it 280 if d.JobCreateIndex != a.job.CreateIndex || d.JobVersion != a.job.Version { 281 if d.Active() { 282 a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ 283 DeploymentID: a.deployment.ID, 284 Status: structs.DeploymentStatusCancelled, 285 StatusDescription: structs.DeploymentStatusDescriptionNewerJob, 286 }) 287 } 288 289 a.oldDeployment = d 290 a.deployment = nil 291 } 292 293 // Clear it as the current deployment if it is successful 294 if d.Status == structs.DeploymentStatusSuccessful { 295 a.oldDeployment = d 296 a.deployment = nil 297 } 298} 299 300// handleStop marks all allocations to be stopped, handling the lost case 301func (a *allocReconciler) handleStop(m allocMatrix) { 302 for group, as := range m { 303 as = filterByTerminal(as) 304 untainted, migrate, lost := as.filterByTainted(a.taintedNodes) 305 a.markStop(untainted, "", allocNotNeeded) 306 a.markStop(migrate, "", allocNotNeeded) 307 a.markStop(lost, structs.AllocClientStatusLost, allocLost) 308 desiredChanges := new(structs.DesiredUpdates) 309 desiredChanges.Stop = uint64(len(as)) 310 a.result.desiredTGUpdates[group] = desiredChanges 311 } 312} 313 314// markStop is a helper for marking a set of allocation for stop with a 315// particular client status and description. 316func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) { 317 for _, alloc := range allocs { 318 a.result.stop = append(a.result.stop, allocStopResult{ 319 alloc: alloc, 320 clientStatus: clientStatus, 321 statusDescription: statusDescription, 322 }) 323 } 324} 325 326// markDelayed does markStop, but optionally includes a FollowupEvalID so that we can update 327// the stopped alloc with its delayed rescheduling evalID 328func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescription string, followupEvals map[string]string) { 329 for _, alloc := range allocs { 330 a.result.stop = append(a.result.stop, allocStopResult{ 331 alloc: alloc, 332 clientStatus: clientStatus, 333 statusDescription: statusDescription, 334 followupEvalID: followupEvals[alloc.ID], 335 }) 336 } 337} 338 339// computeGroup reconciles state for a particular task group. It returns whether 340// the deployment it is for is complete with regards to the task group. 341func (a *allocReconciler) computeGroup(group string, all allocSet) bool { 342 // Create the desired update object for the group 343 desiredChanges := new(structs.DesiredUpdates) 344 a.result.desiredTGUpdates[group] = desiredChanges 345 346 // Get the task group. The task group may be nil if the job was updates such 347 // that the task group no longer exists 348 tg := a.job.LookupTaskGroup(group) 349 350 // If the task group is nil, then the task group has been removed so all we 351 // need to do is stop everything 352 if tg == nil { 353 untainted, migrate, lost := all.filterByTainted(a.taintedNodes) 354 a.markStop(untainted, "", allocNotNeeded) 355 a.markStop(migrate, "", allocNotNeeded) 356 a.markStop(lost, structs.AllocClientStatusLost, allocLost) 357 desiredChanges.Stop = uint64(len(untainted) + len(migrate) + len(lost)) 358 return true 359 } 360 361 // Get the deployment state for the group 362 var dstate *structs.DeploymentState 363 existingDeployment := false 364 if a.deployment != nil { 365 dstate, existingDeployment = a.deployment.TaskGroups[group] 366 } 367 if !existingDeployment { 368 dstate = &structs.DeploymentState{} 369 if !tg.Update.IsEmpty() { 370 dstate.AutoRevert = tg.Update.AutoRevert 371 dstate.AutoPromote = tg.Update.AutoPromote 372 dstate.ProgressDeadline = tg.Update.ProgressDeadline 373 } 374 } 375 376 // Filter allocations that do not need to be considered because they are 377 // from an older job version and are terminal. 378 all, ignore := a.filterOldTerminalAllocs(all) 379 desiredChanges.Ignore += uint64(len(ignore)) 380 381 // canaries is the set of canaries for the current deployment and all is all 382 // allocs including the canaries 383 canaries, all := a.handleGroupCanaries(all, desiredChanges) 384 385 // Determine what set of allocations are on tainted nodes 386 untainted, migrate, lost := all.filterByTainted(a.taintedNodes) 387 388 // Determine what set of terminal allocations need to be rescheduled 389 untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment) 390 391 // Find delays for any lost allocs that have stop_after_client_disconnect 392 lostLater := lost.delayByStopAfterClientDisconnect() 393 lostLaterEvals := a.handleDelayedLost(lostLater, all, tg.Name) 394 395 // Create batched follow up evaluations for allocations that are 396 // reschedulable later and mark the allocations for in place updating 397 a.handleDelayedReschedules(rescheduleLater, all, tg.Name) 398 399 // Create a structure for choosing names. Seed with the taken names 400 // which is the union of untainted, rescheduled, allocs on migrating 401 // nodes, and allocs on down nodes (includes canaries) 402 nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow, lost)) 403 404 // Stop any unneeded allocations and update the untainted set to not 405 // include stopped allocations. 406 canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted 407 stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState, lostLaterEvals) 408 desiredChanges.Stop += uint64(len(stop)) 409 untainted = untainted.difference(stop) 410 411 // Do inplace upgrades where possible and capture the set of upgrades that 412 // need to be done destructively. 413 ignore, inplace, destructive := a.computeUpdates(tg, untainted) 414 desiredChanges.Ignore += uint64(len(ignore)) 415 desiredChanges.InPlaceUpdate += uint64(len(inplace)) 416 if !existingDeployment { 417 dstate.DesiredTotal += len(destructive) + len(inplace) 418 } 419 420 // Remove the canaries now that we have handled rescheduling so that we do 421 // not consider them when making placement decisions. 422 if canaryState { 423 untainted = untainted.difference(canaries) 424 } 425 426 // The fact that we have destructive updates and have less canaries than is 427 // desired means we need to create canaries 428 strategy := tg.Update 429 canariesPromoted := dstate != nil && dstate.Promoted 430 requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted 431 if requireCanary { 432 dstate.DesiredCanaries = strategy.Canary 433 } 434 if requireCanary && !a.deploymentPaused && !a.deploymentFailed { 435 number := strategy.Canary - len(canaries) 436 desiredChanges.Canary += uint64(number) 437 438 for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) { 439 a.result.place = append(a.result.place, allocPlaceResult{ 440 name: name, 441 canary: true, 442 taskGroup: tg, 443 }) 444 } 445 } 446 447 // Determine how many we can place 448 canaryState = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted 449 limit := a.computeLimit(tg, untainted, destructive, migrate, canaryState) 450 451 // Place if: 452 // * The deployment is not paused or failed 453 // * Not placing any canaries 454 // * If there are any canaries that they have been promoted 455 // * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group 456 // * An alloc was lost 457 var place []allocPlaceResult 458 if len(lostLater) == 0 { 459 place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState, lost) 460 if !existingDeployment { 461 dstate.DesiredTotal += len(place) 462 } 463 } 464 465 // deploymentPlaceReady tracks whether the deployment is in a state where 466 // placements can be made without any other consideration. 467 deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !canaryState 468 469 if deploymentPlaceReady { 470 desiredChanges.Place += uint64(len(place)) 471 a.result.place = append(a.result.place, place...) 472 a.markStop(rescheduleNow, "", allocRescheduled) 473 desiredChanges.Stop += uint64(len(rescheduleNow)) 474 475 min := helper.IntMin(len(place), limit) 476 limit -= min 477 } else if !deploymentPlaceReady { 478 // We do not want to place additional allocations but in the case we 479 // have lost allocations or allocations that require rescheduling now, 480 // we do so regardless to avoid odd user experiences. 481 if len(lost) != 0 { 482 allowed := helper.IntMin(len(lost), len(place)) 483 desiredChanges.Place += uint64(allowed) 484 a.result.place = append(a.result.place, place[:allowed]...) 485 } 486 487 // Handle rescheduling of failed allocations even if the deployment is 488 // failed. We do not reschedule if the allocation is part of the failed 489 // deployment. 490 if now := len(rescheduleNow); now != 0 { 491 for _, p := range place { 492 prev := p.PreviousAllocation() 493 if p.IsRescheduling() && !(a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID) { 494 a.result.place = append(a.result.place, p) 495 desiredChanges.Place++ 496 497 a.result.stop = append(a.result.stop, allocStopResult{ 498 alloc: prev, 499 statusDescription: allocRescheduled, 500 }) 501 desiredChanges.Stop++ 502 } 503 } 504 } 505 } 506 507 if deploymentPlaceReady { 508 // Do all destructive updates 509 min := helper.IntMin(len(destructive), limit) 510 desiredChanges.DestructiveUpdate += uint64(min) 511 desiredChanges.Ignore += uint64(len(destructive) - min) 512 for _, alloc := range destructive.nameOrder()[:min] { 513 a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{ 514 placeName: alloc.Name, 515 placeTaskGroup: tg, 516 stopAlloc: alloc, 517 stopStatusDescription: allocUpdating, 518 }) 519 } 520 } else { 521 desiredChanges.Ignore += uint64(len(destructive)) 522 } 523 524 // Migrate all the allocations 525 desiredChanges.Migrate += uint64(len(migrate)) 526 for _, alloc := range migrate.nameOrder() { 527 a.result.stop = append(a.result.stop, allocStopResult{ 528 alloc: alloc, 529 statusDescription: allocMigrating, 530 }) 531 a.result.place = append(a.result.place, allocPlaceResult{ 532 name: alloc.Name, 533 canary: alloc.DeploymentStatus.IsCanary(), 534 taskGroup: tg, 535 previousAlloc: alloc, 536 537 downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), 538 minJobVersion: alloc.Job.Version, 539 }) 540 } 541 542 // Create new deployment if: 543 // 1. Updating a job specification 544 // 2. No running allocations (first time running a job) 545 updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0 546 hadRunning := false 547 for _, alloc := range all { 548 if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex { 549 hadRunning = true 550 break 551 } 552 } 553 554 // Create a new deployment if necessary 555 if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) { 556 // A previous group may have made the deployment already 557 if a.deployment == nil { 558 a.deployment = structs.NewDeployment(a.job) 559 // in multiregion jobs, most deployments start in a pending state 560 if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) { 561 a.deployment.Status = structs.DeploymentStatusPending 562 a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer 563 } 564 a.result.deployment = a.deployment 565 } 566 567 // Attach the groups deployment state to the deployment 568 a.deployment.TaskGroups[group] = dstate 569 } 570 571 // deploymentComplete is whether the deployment is complete which largely 572 // means that no placements were made or desired to be made 573 deploymentComplete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 && !requireCanary 574 575 // Final check to see if the deployment is complete is to ensure everything 576 // is healthy 577 if deploymentComplete && a.deployment != nil { 578 if dstate, ok := a.deployment.TaskGroups[group]; ok { 579 if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs 580 (dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries 581 deploymentComplete = false 582 } 583 } 584 } 585 586 return deploymentComplete 587} 588 589// filterOldTerminalAllocs filters allocations that should be ignored since they 590// are allocations that are terminal from a previous job version. 591func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) { 592 if !a.batch { 593 return all, nil 594 } 595 596 filtered = filtered.union(all) 597 ignored := make(map[string]*structs.Allocation) 598 599 // Ignore terminal batch jobs from older versions 600 for id, alloc := range filtered { 601 older := alloc.Job.Version < a.job.Version || alloc.Job.CreateIndex < a.job.CreateIndex 602 if older && alloc.TerminalStatus() { 603 delete(filtered, id) 604 ignored[id] = alloc 605 } 606 } 607 608 return filtered, ignored 609} 610 611// handleGroupCanaries handles the canaries for the group by stopping the 612// unneeded ones and returning the current set of canaries and the updated total 613// set of allocs for the group 614func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) { 615 // Stop any canary from an older deployment or from a failed one 616 var stop []string 617 618 // Cancel any non-promoted canaries from the older deployment 619 if a.oldDeployment != nil { 620 for _, dstate := range a.oldDeployment.TaskGroups { 621 if !dstate.Promoted { 622 stop = append(stop, dstate.PlacedCanaries...) 623 } 624 } 625 } 626 627 // Cancel any non-promoted canaries from a failed deployment 628 if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusFailed { 629 for _, dstate := range a.deployment.TaskGroups { 630 if !dstate.Promoted { 631 stop = append(stop, dstate.PlacedCanaries...) 632 } 633 } 634 } 635 636 // stopSet is the allocSet that contains the canaries we desire to stop from 637 // above. 638 stopSet := all.fromKeys(stop) 639 a.markStop(stopSet, "", allocNotNeeded) 640 desiredChanges.Stop += uint64(len(stopSet)) 641 all = all.difference(stopSet) 642 643 // Capture our current set of canaries and handle any migrations that are 644 // needed by just stopping them. 645 if a.deployment != nil { 646 var canaryIDs []string 647 for _, dstate := range a.deployment.TaskGroups { 648 canaryIDs = append(canaryIDs, dstate.PlacedCanaries...) 649 } 650 651 canaries = all.fromKeys(canaryIDs) 652 untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes) 653 a.markStop(migrate, "", allocMigrating) 654 a.markStop(lost, structs.AllocClientStatusLost, allocLost) 655 656 canaries = untainted 657 all = all.difference(migrate, lost) 658 } 659 660 return canaries, all 661} 662 663// computeLimit returns the placement limit for a particular group. The inputs 664// are the group definition, the untainted, destructive, and migrate allocation 665// set and whether we are in a canary state. 666func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int { 667 // If there is no update strategy or deployment for the group we can deploy 668 // as many as the group has 669 if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 { 670 return group.Count 671 } else if a.deploymentPaused || a.deploymentFailed { 672 // If the deployment is paused or failed, do not create anything else 673 return 0 674 } 675 676 // If we have canaries and they have not been promoted the limit is 0 677 if canaryState { 678 return 0 679 } 680 681 // If we have been promoted or there are no canaries, the limit is the 682 // configured MaxParallel minus any outstanding non-healthy alloc for the 683 // deployment 684 limit := group.Update.MaxParallel 685 if a.deployment != nil { 686 partOf, _ := untainted.filterByDeployment(a.deployment.ID) 687 for _, alloc := range partOf { 688 // An unhealthy allocation means nothing else should be happen. 689 if alloc.DeploymentStatus.IsUnhealthy() { 690 return 0 691 } 692 693 if !alloc.DeploymentStatus.IsHealthy() { 694 limit-- 695 } 696 } 697 } 698 699 // The limit can be less than zero in the case that the job was changed such 700 // that it required destructive changes and the count was scaled up. 701 if limit < 0 { 702 return 0 703 } 704 705 return limit 706} 707 708// computePlacement returns the set of allocations to place given the group 709// definition, the set of untainted, migrating and reschedule allocations for the group. 710// 711// Placements will meet or exceed group count. 712func (a *allocReconciler) computePlacements(group *structs.TaskGroup, 713 nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, 714 canaryState bool, lost allocSet) []allocPlaceResult { 715 716 // Add rescheduled placement results 717 var place []allocPlaceResult 718 for _, alloc := range reschedule { 719 place = append(place, allocPlaceResult{ 720 name: alloc.Name, 721 taskGroup: group, 722 previousAlloc: alloc, 723 reschedule: true, 724 canary: alloc.DeploymentStatus.IsCanary(), 725 726 downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), 727 minJobVersion: alloc.Job.Version, 728 lost: false, 729 }) 730 } 731 732 // Add replacements for lost allocs up to group.Count 733 existing := len(untainted) + len(migrate) + len(reschedule) 734 735 for _, alloc := range lost { 736 if existing >= group.Count { 737 // Reached desired count, do not replace remaining lost 738 // allocs 739 break 740 } 741 742 existing++ 743 place = append(place, allocPlaceResult{ 744 name: alloc.Name, 745 taskGroup: group, 746 previousAlloc: alloc, 747 reschedule: false, 748 canary: alloc.DeploymentStatus.IsCanary(), 749 downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), 750 minJobVersion: alloc.Job.Version, 751 lost: true, 752 }) 753 } 754 755 // Add remaining placement results 756 if existing < group.Count { 757 for _, name := range nameIndex.Next(uint(group.Count - existing)) { 758 place = append(place, allocPlaceResult{ 759 name: name, 760 taskGroup: group, 761 downgradeNonCanary: canaryState, 762 }) 763 } 764 } 765 766 return place 767} 768 769// computeStop returns the set of allocations that are marked for stopping given 770// the group definition, the set of allocations in various states and whether we 771// are canarying. 772func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, 773 untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet { 774 775 // Mark all lost allocations for stop. 776 var stop allocSet 777 stop = stop.union(lost) 778 a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals) 779 780 // If we are still deploying or creating canaries, don't stop them 781 if canaryState { 782 untainted = untainted.difference(canaries) 783 } 784 785 // Hot path the nothing to do case 786 remove := len(untainted) + len(migrate) - group.Count 787 if remove <= 0 { 788 return stop 789 } 790 791 // Filter out any terminal allocations from the untainted set 792 // This is so that we don't try to mark them as stopped redundantly 793 untainted = filterByTerminal(untainted) 794 795 // Prefer stopping any alloc that has the same name as the canaries if we 796 // are promoted 797 if !canaryState && len(canaries) != 0 { 798 canaryNames := canaries.nameSet() 799 for id, alloc := range untainted.difference(canaries) { 800 if _, match := canaryNames[alloc.Name]; match { 801 stop[id] = alloc 802 a.result.stop = append(a.result.stop, allocStopResult{ 803 alloc: alloc, 804 statusDescription: allocNotNeeded, 805 }) 806 delete(untainted, id) 807 808 remove-- 809 if remove == 0 { 810 return stop 811 } 812 } 813 } 814 } 815 816 // Prefer selecting from the migrating set before stopping existing allocs 817 if len(migrate) != 0 { 818 mNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate) 819 removeNames := mNames.Highest(uint(remove)) 820 for id, alloc := range migrate { 821 if _, match := removeNames[alloc.Name]; !match { 822 continue 823 } 824 a.result.stop = append(a.result.stop, allocStopResult{ 825 alloc: alloc, 826 statusDescription: allocNotNeeded, 827 }) 828 delete(migrate, id) 829 stop[id] = alloc 830 nameIndex.UnsetIndex(alloc.Index()) 831 832 remove-- 833 if remove == 0 { 834 return stop 835 } 836 } 837 } 838 839 // Select the allocs with the highest count to remove 840 removeNames := nameIndex.Highest(uint(remove)) 841 for id, alloc := range untainted { 842 if _, ok := removeNames[alloc.Name]; ok { 843 stop[id] = alloc 844 a.result.stop = append(a.result.stop, allocStopResult{ 845 alloc: alloc, 846 statusDescription: allocNotNeeded, 847 }) 848 delete(untainted, id) 849 850 remove-- 851 if remove == 0 { 852 return stop 853 } 854 } 855 } 856 857 // It is possible that we didn't stop as many as we should have if there 858 // were allocations with duplicate names. 859 for id, alloc := range untainted { 860 stop[id] = alloc 861 a.result.stop = append(a.result.stop, allocStopResult{ 862 alloc: alloc, 863 statusDescription: allocNotNeeded, 864 }) 865 delete(untainted, id) 866 867 remove-- 868 if remove == 0 { 869 return stop 870 } 871 } 872 873 return stop 874} 875 876// computeUpdates determines which allocations for the passed group require 877// updates. Three groups are returned: 878// 1. Those that require no upgrades 879// 2. Those that can be upgraded in-place. These are added to the results 880// automatically since the function contains the correct state to do so, 881// 3. Those that require destructive updates 882func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) { 883 // Determine the set of allocations that need to be updated 884 ignore = make(map[string]*structs.Allocation) 885 inplace = make(map[string]*structs.Allocation) 886 destructive = make(map[string]*structs.Allocation) 887 888 for _, alloc := range untainted { 889 ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group) 890 if ignoreChange { 891 ignore[alloc.ID] = alloc 892 } else if destructiveChange { 893 destructive[alloc.ID] = alloc 894 } else { 895 inplace[alloc.ID] = alloc 896 a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc) 897 } 898 } 899 900 return 901} 902 903// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field 904// set for allocations that are eligible to be rescheduled later, and marks the alloc with 905// the followupEvalID 906func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) { 907 // followupEvals are created in the same way as for delayed lost allocs 908 allocIDToFollowupEvalID := a.handleDelayedLost(rescheduleLater, all, tgName) 909 910 // Initialize the annotations 911 if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil { 912 a.result.attributeUpdates = make(map[string]*structs.Allocation) 913 } 914 915 // Create updates that will be applied to the allocs to mark the FollowupEvalID 916 for allocID, evalID := range allocIDToFollowupEvalID { 917 existingAlloc := all[allocID] 918 updatedAlloc := existingAlloc.Copy() 919 updatedAlloc.FollowupEvalID = evalID 920 a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc 921 } 922} 923 924// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for 925// lost allocations. followupEvals are appended to a.result as a side effect, we return a 926// map of alloc IDs to their followupEval IDs 927func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string { 928 if len(rescheduleLater) == 0 { 929 return map[string]string{} 930 } 931 932 // Sort by time 933 sort.Slice(rescheduleLater, func(i, j int) bool { 934 return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime) 935 }) 936 937 var evals []*structs.Evaluation 938 nextReschedTime := rescheduleLater[0].rescheduleTime 939 allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater)) 940 941 // Create a new eval for the first batch 942 eval := &structs.Evaluation{ 943 ID: uuid.Generate(), 944 Namespace: a.job.Namespace, 945 Priority: a.job.Priority, 946 Type: a.job.Type, 947 TriggeredBy: structs.EvalTriggerRetryFailedAlloc, 948 JobID: a.job.ID, 949 JobModifyIndex: a.job.ModifyIndex, 950 Status: structs.EvalStatusPending, 951 StatusDescription: reschedulingFollowupEvalDesc, 952 WaitUntil: nextReschedTime, 953 } 954 evals = append(evals, eval) 955 956 for _, allocReschedInfo := range rescheduleLater { 957 if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize { 958 allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID 959 } else { 960 // Start a new batch 961 nextReschedTime = allocReschedInfo.rescheduleTime 962 // Create a new eval for the new batch 963 eval = &structs.Evaluation{ 964 ID: uuid.Generate(), 965 Namespace: a.job.Namespace, 966 Priority: a.job.Priority, 967 Type: a.job.Type, 968 TriggeredBy: structs.EvalTriggerRetryFailedAlloc, 969 JobID: a.job.ID, 970 JobModifyIndex: a.job.ModifyIndex, 971 Status: structs.EvalStatusPending, 972 WaitUntil: nextReschedTime, 973 } 974 evals = append(evals, eval) 975 // Set the evalID for the first alloc in this new batch 976 allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID 977 } 978 } 979 980 a.result.desiredFollowupEvals[tgName] = evals 981 982 return allocIDToFollowupEvalID 983} 984