1package taskrunner 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "strings" 8 "sync" 9 "time" 10 11 "github.com/hashicorp/nomad/client/lib/cgutil" 12 13 metrics "github.com/armon/go-metrics" 14 log "github.com/hashicorp/go-hclog" 15 multierror "github.com/hashicorp/go-multierror" 16 "github.com/hashicorp/hcl/v2/hcldec" 17 "github.com/hashicorp/nomad/client/allocdir" 18 "github.com/hashicorp/nomad/client/allocrunner/interfaces" 19 "github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts" 20 "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" 21 "github.com/hashicorp/nomad/client/config" 22 "github.com/hashicorp/nomad/client/consul" 23 "github.com/hashicorp/nomad/client/devicemanager" 24 "github.com/hashicorp/nomad/client/dynamicplugins" 25 cinterfaces "github.com/hashicorp/nomad/client/interfaces" 26 "github.com/hashicorp/nomad/client/pluginmanager/csimanager" 27 "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" 28 cstate "github.com/hashicorp/nomad/client/state" 29 cstructs "github.com/hashicorp/nomad/client/structs" 30 "github.com/hashicorp/nomad/client/taskenv" 31 "github.com/hashicorp/nomad/client/vaultclient" 32 "github.com/hashicorp/nomad/helper" 33 "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" 34 "github.com/hashicorp/nomad/helper/pluginutils/hclutils" 35 "github.com/hashicorp/nomad/helper/uuid" 36 "github.com/hashicorp/nomad/nomad/structs" 37 bstructs "github.com/hashicorp/nomad/plugins/base/structs" 38 "github.com/hashicorp/nomad/plugins/drivers" 39) 40 41const ( 42 // defaultMaxEvents is the default max capacity for task events on the 43 // task state. Overrideable for testing. 44 defaultMaxEvents = 10 45 46 // killBackoffBaseline is the baseline time for exponential backoff while 47 // killing a task. 48 killBackoffBaseline = 5 * time.Second 49 50 // killBackoffLimit is the limit of the exponential backoff for killing 51 // the task. 52 killBackoffLimit = 2 * time.Minute 53 54 // killFailureLimit is how many times we will attempt to kill a task before 55 // giving up and potentially leaking resources. 56 killFailureLimit = 5 57 58 // triggerUpdateChCap is the capacity for the triggerUpdateCh used for 59 // triggering updates. It should be exactly 1 as even if multiple 60 // updates have come in since the last one was handled, we only need to 61 // handle the last one. 62 triggerUpdateChCap = 1 63) 64 65type TaskRunner struct { 66 // allocID, taskName, taskLeader, and taskResources are immutable so these fields may 67 // be accessed without locks 68 allocID string 69 taskName string 70 taskLeader bool 71 taskResources *structs.AllocatedTaskResources 72 73 alloc *structs.Allocation 74 allocLock sync.Mutex 75 76 clientConfig *config.Config 77 78 // stateUpdater is used to emit updated task state 79 stateUpdater interfaces.TaskStateHandler 80 81 // state captures the state of the task for updating the allocation 82 // Must acquire stateLock to access. 83 state *structs.TaskState 84 85 // localState captures the node-local state of the task for when the 86 // Nomad agent restarts. 87 // Must acquire stateLock to access. 88 localState *state.LocalState 89 90 // stateLock must be acquired when accessing state or localState. 91 stateLock sync.RWMutex 92 93 // stateDB is for persisting localState and taskState 94 stateDB cstate.StateDB 95 96 // shutdownCtx is used to exit the TaskRunner *without* affecting task state. 97 shutdownCtx context.Context 98 99 // shutdownCtxCancel causes the TaskRunner to exit immediately without 100 // affecting task state. Useful for testing or graceful agent shutdown. 101 shutdownCtxCancel context.CancelFunc 102 103 // killCtx is the task runner's context representing the tasks's lifecycle. 104 // The context is canceled when the task is killed. 105 killCtx context.Context 106 107 // killCtxCancel is called when killing a task. 108 killCtxCancel context.CancelFunc 109 110 // killErr is populated when killing a task. Access should be done use the 111 // getter/setter 112 killErr error 113 killErrLock sync.Mutex 114 115 // Logger is the logger for the task runner. 116 logger log.Logger 117 118 // triggerUpdateCh is ticked whenever update hooks need to be run and 119 // must be created with cap=1 to signal a pending update and prevent 120 // callers from deadlocking if the receiver has exited. 121 triggerUpdateCh chan struct{} 122 123 // waitCh is closed when the task runner has transitioned to a terminal 124 // state 125 waitCh chan struct{} 126 127 // driver is the driver for the task. 128 driver drivers.DriverPlugin 129 130 // driverCapabilities is the set capabilities the driver supports 131 driverCapabilities *drivers.Capabilities 132 133 // taskSchema is the hcl spec for the task driver configuration 134 taskSchema hcldec.Spec 135 136 // handleLock guards access to handle and handleResult 137 handleLock sync.Mutex 138 139 // handle to the running driver 140 handle *DriverHandle 141 142 // task is the task being run 143 task *structs.Task 144 taskLock sync.RWMutex 145 146 // taskDir is the directory structure for this task. 147 taskDir *allocdir.TaskDir 148 149 // envBuilder is used to build the task's environment 150 envBuilder *taskenv.Builder 151 152 // restartTracker is used to decide if the task should be restarted. 153 restartTracker *restarts.RestartTracker 154 155 // runnerHooks are task runner lifecycle hooks that should be run on state 156 // transistions. 157 runnerHooks []interfaces.TaskHook 158 159 // hookResources captures the resources provided by hooks 160 hookResources *hookResources 161 162 // consulClient is the client used by the consul service hook for 163 // registering services and checks 164 consulServiceClient consul.ConsulServiceAPI 165 166 // consulProxiesClient is the client used by the envoy version hook for 167 // asking consul what version of envoy nomad should inject into the connect 168 // sidecar or gateway task. 169 consulProxiesClient consul.SupportedProxiesAPI 170 171 // sidsClient is the client used by the service identity hook for managing 172 // service identity tokens 173 siClient consul.ServiceIdentityAPI 174 175 // vaultClient is the client to use to derive and renew Vault tokens 176 vaultClient vaultclient.VaultClient 177 178 // vaultToken is the current Vault token. It should be accessed with the 179 // getter. 180 vaultToken string 181 vaultTokenLock sync.Mutex 182 183 // baseLabels are used when emitting tagged metrics. All task runner metrics 184 // will have these tags, and optionally more. 185 baseLabels []metrics.Label 186 187 // logmonHookConfig is used to get the paths to the stdout and stderr fifos 188 // to be passed to the driver for task logging 189 logmonHookConfig *logmonHookConfig 190 191 // resourceUsage is written via UpdateStats and read via 192 // LatestResourceUsage. May be nil at all times. 193 resourceUsage *cstructs.TaskResourceUsage 194 resourceUsageLock sync.Mutex 195 196 // deviceStatsReporter is used to lookup resource usage for alloc devices 197 deviceStatsReporter cinterfaces.DeviceStatsReporter 198 199 // csiManager is used to manage the mounting of CSI volumes into tasks 200 csiManager csimanager.Manager 201 202 // devicemanager is used to mount devices as well as lookup device 203 // statistics 204 devicemanager devicemanager.Manager 205 206 // cpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform 207 cpusetCgroupPathGetter cgutil.CgroupPathGetter 208 209 // driverManager is used to dispense driver plugins and register event 210 // handlers 211 driverManager drivermanager.Manager 212 213 // dynamicRegistry is where dynamic plugins should be registered. 214 dynamicRegistry dynamicplugins.Registry 215 216 // maxEvents is the capacity of the TaskEvents on the TaskState. 217 // Defaults to defaultMaxEvents but overrideable for testing. 218 maxEvents int 219 220 // serversContactedCh is passed to TaskRunners so they can detect when 221 // GetClientAllocs has been called in case of a failed restore. 222 serversContactedCh <-chan struct{} 223 224 // startConditionMetCtx is done when TR should start the task 225 startConditionMetCtx <-chan struct{} 226 227 // waitOnServers defaults to false but will be set true if a restore 228 // fails and the Run method should wait until serversContactedCh is 229 // closed. 230 waitOnServers bool 231 232 networkIsolationLock sync.Mutex 233 networkIsolationSpec *drivers.NetworkIsolationSpec 234 235 allocHookResources *cstructs.AllocHookResources 236} 237 238type Config struct { 239 Alloc *structs.Allocation 240 ClientConfig *config.Config 241 Task *structs.Task 242 TaskDir *allocdir.TaskDir 243 Logger log.Logger 244 245 // Consul is the client to use for managing Consul service registrations 246 Consul consul.ConsulServiceAPI 247 248 // ConsulProxies is the client to use for looking up supported envoy versions 249 // from Consul. 250 ConsulProxies consul.SupportedProxiesAPI 251 252 // ConsulSI is the client to use for managing Consul SI tokens 253 ConsulSI consul.ServiceIdentityAPI 254 255 // DynamicRegistry is where dynamic plugins should be registered. 256 DynamicRegistry dynamicplugins.Registry 257 258 // Vault is the client to use to derive and renew Vault tokens 259 Vault vaultclient.VaultClient 260 261 // StateDB is used to store and restore state. 262 StateDB cstate.StateDB 263 264 // StateUpdater is used to emit updated task state 265 StateUpdater interfaces.TaskStateHandler 266 267 // deviceStatsReporter is used to lookup resource usage for alloc devices 268 DeviceStatsReporter cinterfaces.DeviceStatsReporter 269 270 // CSIManager is used to manage the mounting of CSI volumes into tasks 271 CSIManager csimanager.Manager 272 273 // CpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform 274 CpusetCgroupPathGetter cgutil.CgroupPathGetter 275 276 // DeviceManager is used to mount devices as well as lookup device 277 // statistics 278 DeviceManager devicemanager.Manager 279 280 // DriverManager is used to dispense driver plugins and register event 281 // handlers 282 DriverManager drivermanager.Manager 283 284 // ServersContactedCh is closed when the first GetClientAllocs call to 285 // servers succeeds and allocs are synced. 286 ServersContactedCh chan struct{} 287 288 // startConditionMetCtx is done when TR should start the task 289 StartConditionMetCtx <-chan struct{} 290} 291 292func NewTaskRunner(config *Config) (*TaskRunner, error) { 293 // Create a context for causing the runner to exit 294 trCtx, trCancel := context.WithCancel(context.Background()) 295 296 // Create a context for killing the runner 297 killCtx, killCancel := context.WithCancel(context.Background()) 298 299 // Initialize the environment builder 300 envBuilder := taskenv.NewBuilder( 301 config.ClientConfig.Node, 302 config.Alloc, 303 config.Task, 304 config.ClientConfig.Region, 305 ) 306 307 // Initialize state from alloc if it is set 308 tstate := structs.NewTaskState() 309 if ts := config.Alloc.TaskStates[config.Task.Name]; ts != nil { 310 tstate = ts.Copy() 311 } 312 313 tr := &TaskRunner{ 314 alloc: config.Alloc, 315 allocID: config.Alloc.ID, 316 clientConfig: config.ClientConfig, 317 task: config.Task, 318 taskDir: config.TaskDir, 319 taskName: config.Task.Name, 320 taskLeader: config.Task.Leader, 321 envBuilder: envBuilder, 322 dynamicRegistry: config.DynamicRegistry, 323 consulServiceClient: config.Consul, 324 consulProxiesClient: config.ConsulProxies, 325 siClient: config.ConsulSI, 326 vaultClient: config.Vault, 327 state: tstate, 328 localState: state.NewLocalState(), 329 stateDB: config.StateDB, 330 stateUpdater: config.StateUpdater, 331 deviceStatsReporter: config.DeviceStatsReporter, 332 killCtx: killCtx, 333 killCtxCancel: killCancel, 334 shutdownCtx: trCtx, 335 shutdownCtxCancel: trCancel, 336 triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), 337 waitCh: make(chan struct{}), 338 csiManager: config.CSIManager, 339 cpusetCgroupPathGetter: config.CpusetCgroupPathGetter, 340 devicemanager: config.DeviceManager, 341 driverManager: config.DriverManager, 342 maxEvents: defaultMaxEvents, 343 serversContactedCh: config.ServersContactedCh, 344 startConditionMetCtx: config.StartConditionMetCtx, 345 } 346 347 // Create the logger based on the allocation ID 348 tr.logger = config.Logger.Named("task_runner").With("task", config.Task.Name) 349 350 // Pull out the task's resources 351 ares := tr.alloc.AllocatedResources 352 if ares == nil { 353 return nil, fmt.Errorf("no task resources found on allocation") 354 } 355 356 tres, ok := ares.Tasks[tr.taskName] 357 if !ok { 358 return nil, fmt.Errorf("no task resources found on allocation") 359 } 360 tr.taskResources = tres 361 362 // Build the restart tracker. 363 rp := config.Task.RestartPolicy 364 if rp == nil { 365 tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup) 366 if tg == nil { 367 tr.logger.Error("alloc missing task group") 368 return nil, fmt.Errorf("alloc missing task group") 369 } 370 rp = tg.RestartPolicy 371 } 372 tr.restartTracker = restarts.NewRestartTracker(rp, tr.alloc.Job.Type, config.Task.Lifecycle) 373 374 // Get the driver 375 if err := tr.initDriver(); err != nil { 376 tr.logger.Error("failed to create driver", "error", err) 377 return nil, err 378 } 379 380 // Initialize the runners hooks. Must come after initDriver so hooks 381 // can use tr.driverCapabilities 382 tr.initHooks() 383 384 // Initialize base labels 385 tr.initLabels() 386 387 // Initialize initial task received event 388 tr.appendEvent(structs.NewTaskEvent(structs.TaskReceived)) 389 390 return tr, nil 391} 392 393func (tr *TaskRunner) initLabels() { 394 alloc := tr.Alloc() 395 tr.baseLabels = []metrics.Label{ 396 { 397 Name: "job", 398 Value: alloc.Job.Name, 399 }, 400 { 401 Name: "task_group", 402 Value: alloc.TaskGroup, 403 }, 404 { 405 Name: "alloc_id", 406 Value: tr.allocID, 407 }, 408 { 409 Name: "task", 410 Value: tr.taskName, 411 }, 412 { 413 Name: "namespace", 414 Value: tr.alloc.Namespace, 415 }, 416 } 417 418 if tr.alloc.Job.ParentID != "" { 419 tr.baseLabels = append(tr.baseLabels, metrics.Label{ 420 Name: "parent_id", 421 Value: tr.alloc.Job.ParentID, 422 }) 423 if strings.Contains(tr.alloc.Job.Name, "/dispatch-") { 424 tr.baseLabels = append(tr.baseLabels, metrics.Label{ 425 Name: "dispatch_id", 426 Value: strings.Split(tr.alloc.Job.Name, "/dispatch-")[1], 427 }) 428 } 429 if strings.Contains(tr.alloc.Job.Name, "/periodic-") { 430 tr.baseLabels = append(tr.baseLabels, metrics.Label{ 431 Name: "periodic_id", 432 Value: strings.Split(tr.alloc.Job.Name, "/periodic-")[1], 433 }) 434 } 435 } 436} 437 438// Mark a task as failed and not to run. Aimed to be invoked when alloc runner 439// prestart hooks failed. 440// Should never be called with Run(). 441func (tr *TaskRunner) MarkFailedDead(reason string) { 442 defer close(tr.waitCh) 443 444 tr.stateLock.Lock() 445 if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil { 446 //TODO Nomad will be unable to restore this task; try to kill 447 // it now and fail? In general we prefer to leave running 448 // tasks running even if the agent encounters an error. 449 tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart", 450 "error", err) 451 } 452 tr.stateLock.Unlock() 453 454 event := structs.NewTaskEvent(structs.TaskSetupFailure). 455 SetDisplayMessage(reason). 456 SetFailsTask() 457 tr.UpdateState(structs.TaskStateDead, event) 458 459 // Run the stop hooks in case task was a restored task that failed prestart 460 if err := tr.stop(); err != nil { 461 tr.logger.Error("stop failed while marking task dead", "error", err) 462 } 463} 464 465// Run the TaskRunner. Starts the user's task or reattaches to a restored task. 466// Run closes WaitCh when it exits. Should be started in a goroutine. 467func (tr *TaskRunner) Run() { 468 defer close(tr.waitCh) 469 var result *drivers.ExitResult 470 471 tr.stateLock.RLock() 472 dead := tr.state.State == structs.TaskStateDead 473 tr.stateLock.RUnlock() 474 475 // if restoring a dead task, ensure that task is cleared and all post hooks 476 // are called without additional state updates 477 if dead { 478 // do cleanup functions without emitting any additional events/work 479 // to handle cases where we restored a dead task where client terminated 480 // after task finished before completing post-run actions. 481 tr.clearDriverHandle() 482 tr.stateUpdater.TaskStateUpdated() 483 if err := tr.stop(); err != nil { 484 tr.logger.Error("stop failed on terminal task", "error", err) 485 } 486 return 487 } 488 489 // Updates are handled asynchronously with the other hooks but each 490 // triggered update - whether due to alloc updates or a new vault token 491 // - should be handled serially. 492 go tr.handleUpdates() 493 494 // If restore failed wait until servers are contacted before running. 495 // #1795 496 if tr.waitOnServers { 497 tr.logger.Info("task failed to restore; waiting to contact server before restarting") 498 select { 499 case <-tr.killCtx.Done(): 500 tr.logger.Info("task killed while waiting for server contact") 501 case <-tr.shutdownCtx.Done(): 502 return 503 case <-tr.serversContactedCh: 504 tr.logger.Info("server contacted; unblocking waiting task") 505 } 506 } 507 508 select { 509 case <-tr.startConditionMetCtx: 510 tr.logger.Debug("lifecycle start condition has been met, proceeding") 511 // yay proceed 512 case <-tr.killCtx.Done(): 513 case <-tr.shutdownCtx.Done(): 514 return 515 } 516 517MAIN: 518 for !tr.shouldShutdown() { 519 select { 520 case <-tr.killCtx.Done(): 521 break MAIN 522 case <-tr.shutdownCtx.Done(): 523 // TaskRunner was told to exit immediately 524 return 525 default: 526 } 527 528 // Run the prestart hooks 529 if err := tr.prestart(); err != nil { 530 tr.logger.Error("prestart failed", "error", err) 531 tr.restartTracker.SetStartError(err) 532 goto RESTART 533 } 534 535 select { 536 case <-tr.killCtx.Done(): 537 break MAIN 538 case <-tr.shutdownCtx.Done(): 539 // TaskRunner was told to exit immediately 540 return 541 default: 542 } 543 544 // Run the task 545 if err := tr.runDriver(); err != nil { 546 tr.logger.Error("running driver failed", "error", err) 547 tr.restartTracker.SetStartError(err) 548 goto RESTART 549 } 550 551 // Run the poststart hooks 552 if err := tr.poststart(); err != nil { 553 tr.logger.Error("poststart failed", "error", err) 554 } 555 556 // Grab the result proxy and wait for task to exit 557 WAIT: 558 { 559 handle := tr.getDriverHandle() 560 result = nil 561 562 // Do *not* use tr.killCtx here as it would cause 563 // Wait() to unblock before the task exits when Kill() 564 // is called. 565 if resultCh, err := handle.WaitCh(context.Background()); err != nil { 566 tr.logger.Error("wait task failed", "error", err) 567 } else { 568 select { 569 case <-tr.killCtx.Done(): 570 // We can go through the normal should restart check since 571 // the restart tracker knowns it is killed 572 result = tr.handleKill(resultCh) 573 case <-tr.shutdownCtx.Done(): 574 // TaskRunner was told to exit immediately 575 return 576 case result = <-resultCh: 577 } 578 579 // WaitCh returned a result 580 if retryWait := tr.handleTaskExitResult(result); retryWait { 581 goto WAIT 582 } 583 } 584 } 585 586 // Clear the handle 587 tr.clearDriverHandle() 588 589 // Store the wait result on the restart tracker 590 tr.restartTracker.SetExitResult(result) 591 592 if err := tr.exited(); err != nil { 593 tr.logger.Error("exited hooks failed", "error", err) 594 } 595 596 RESTART: 597 restart, restartDelay := tr.shouldRestart() 598 if !restart { 599 break MAIN 600 } 601 602 // Actually restart by sleeping and also watching for destroy events 603 select { 604 case <-time.After(restartDelay): 605 case <-tr.killCtx.Done(): 606 tr.logger.Trace("task killed between restarts", "delay", restartDelay) 607 break MAIN 608 case <-tr.shutdownCtx.Done(): 609 // TaskRunner was told to exit immediately 610 tr.logger.Trace("gracefully shutting down during restart delay") 611 return 612 } 613 } 614 615 // Ensure handle is cleaned up. Restore could have recovered a task 616 // that should be terminal, so if the handle still exists we should 617 // kill it here. 618 if tr.getDriverHandle() != nil { 619 if result = tr.handleKill(nil); result != nil { 620 tr.emitExitResultEvent(result) 621 } 622 623 tr.clearDriverHandle() 624 625 if err := tr.exited(); err != nil { 626 tr.logger.Error("exited hooks failed while cleaning up terminal task", "error", err) 627 } 628 } 629 630 // Mark the task as dead 631 tr.UpdateState(structs.TaskStateDead, nil) 632 633 // Run the stop hooks 634 if err := tr.stop(); err != nil { 635 tr.logger.Error("stop failed", "error", err) 636 } 637 638 tr.logger.Debug("task run loop exiting") 639} 640 641func (tr *TaskRunner) shouldShutdown() bool { 642 alloc := tr.Alloc() 643 if alloc.ClientTerminalStatus() { 644 return true 645 } 646 647 if !tr.IsPoststopTask() && alloc.ServerTerminalStatus() { 648 return true 649 } 650 651 return false 652} 653 654// handleTaskExitResult handles the results returned by the task exiting. If 655// retryWait is true, the caller should attempt to wait on the task again since 656// it has not actually finished running. This can happen if the driver plugin 657// has exited. 658func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWait bool) { 659 if result == nil { 660 return false 661 } 662 663 if result.Err == bstructs.ErrPluginShutdown { 664 dn := tr.Task().Driver 665 tr.logger.Debug("driver plugin has shutdown; attempting to recover task", "driver", dn) 666 667 // Initialize a new driver handle 668 if err := tr.initDriver(); err != nil { 669 tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err, "driver", dn) 670 return false 671 } 672 673 // Try to restore the handle 674 tr.stateLock.RLock() 675 h := tr.localState.TaskHandle 676 net := tr.localState.DriverNetwork 677 tr.stateLock.RUnlock() 678 if !tr.restoreHandle(h, net) { 679 tr.logger.Error("failed to restore handle on driver after it exited unexpectedly", "driver", dn) 680 return false 681 } 682 683 tr.logger.Debug("task successfully recovered on driver", "driver", dn) 684 return true 685 } 686 687 // Emit Terminated event 688 tr.emitExitResultEvent(result) 689 690 return false 691} 692 693// emitExitResultEvent emits a TaskTerminated event for an ExitResult. 694func (tr *TaskRunner) emitExitResultEvent(result *drivers.ExitResult) { 695 event := structs.NewTaskEvent(structs.TaskTerminated). 696 SetExitCode(result.ExitCode). 697 SetSignal(result.Signal). 698 SetOOMKilled(result.OOMKilled). 699 SetExitMessage(result.Err) 700 701 tr.EmitEvent(event) 702 703 if result.OOMKilled { 704 metrics.IncrCounterWithLabels([]string{"client", "allocs", "oom_killed"}, 1, tr.baseLabels) 705 } 706} 707 708// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits 709// when Run has returned. Should only be run in a goroutine from Run. 710func (tr *TaskRunner) handleUpdates() { 711 for { 712 select { 713 case <-tr.triggerUpdateCh: 714 case <-tr.waitCh: 715 return 716 } 717 718 // Non-terminal update; run hooks 719 tr.updateHooks() 720 } 721} 722 723// shouldRestart determines whether the task should be restarted and updates 724// the task state unless the task is killed or terminated. 725func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { 726 // Determine if we should restart 727 state, when := tr.restartTracker.GetState() 728 reason := tr.restartTracker.GetReason() 729 switch state { 730 case structs.TaskKilled: 731 // Never restart an explicitly killed task. Kill method handles 732 // updating the server. 733 tr.EmitEvent(structs.NewTaskEvent(state)) 734 return false, 0 735 case structs.TaskNotRestarting, structs.TaskTerminated: 736 tr.logger.Info("not restarting task", "reason", reason) 737 if state == structs.TaskNotRestarting { 738 tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask()) 739 } 740 return false, 0 741 case structs.TaskRestarting: 742 tr.logger.Info("restarting task", "reason", reason, "delay", when) 743 tr.UpdateState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason)) 744 return true, when 745 default: 746 tr.logger.Error("restart tracker returned unknown state", "state", state) 747 return true, when 748 } 749} 750 751// runDriver runs the driver and waits for it to exit 752// runDriver emits an appropriate task event on success/failure 753func (tr *TaskRunner) runDriver() error { 754 755 taskConfig := tr.buildTaskConfig() 756 if tr.cpusetCgroupPathGetter != nil { 757 cpusetCgroupPath, err := tr.cpusetCgroupPathGetter(tr.killCtx) 758 if err != nil { 759 return err 760 } 761 taskConfig.Resources.LinuxResources.CpusetCgroupPath = cpusetCgroupPath 762 } 763 764 // Build hcl context variables 765 vars, errs, err := tr.envBuilder.Build().AllValues() 766 if err != nil { 767 return fmt.Errorf("error building environment variables: %v", err) 768 } 769 770 // Handle per-key errors 771 if len(errs) > 0 { 772 keys := make([]string, 0, len(errs)) 773 for k, err := range errs { 774 keys = append(keys, k) 775 776 if tr.logger.IsTrace() { 777 // Verbosely log every diagnostic for debugging 778 tr.logger.Trace("error building environment variables", "key", k, "error", err) 779 } 780 } 781 782 tr.logger.Warn("some environment variables not available for rendering", "keys", strings.Join(keys, ", ")) 783 } 784 785 val, diag, diagErrs := hclutils.ParseHclInterface(tr.task.Config, tr.taskSchema, vars) 786 if diag.HasErrors() { 787 parseErr := multierror.Append(errors.New("failed to parse config: "), diagErrs...) 788 tr.EmitEvent(structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(parseErr)) 789 return parseErr 790 } 791 792 if err := taskConfig.EncodeDriverConfig(val); err != nil { 793 encodeErr := fmt.Errorf("failed to encode driver config: %v", err) 794 tr.EmitEvent(structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(encodeErr)) 795 return encodeErr 796 } 797 798 // If there's already a task handle (eg from a Restore) there's nothing 799 // to do except update state. 800 if tr.getDriverHandle() != nil { 801 // Ensure running state is persisted but do *not* append a new 802 // task event as restoring is a client event and not relevant 803 // to a task's lifecycle. 804 if err := tr.updateStateImpl(structs.TaskStateRunning); err != nil { 805 //TODO return error and destroy task to avoid an orphaned task? 806 tr.logger.Warn("error persisting task state", "error", err) 807 } 808 return nil 809 } 810 811 // Start the job if there's no existing handle (or if RecoverTask failed) 812 handle, net, err := tr.driver.StartTask(taskConfig) 813 if err != nil { 814 // The plugin has died, try relaunching it 815 if err == bstructs.ErrPluginShutdown { 816 tr.logger.Info("failed to start task because plugin shutdown unexpectedly; attempting to recover") 817 if err := tr.initDriver(); err != nil { 818 taskErr := fmt.Errorf("failed to initialize driver after it exited unexpectedly: %v", err) 819 tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(taskErr)) 820 return taskErr 821 } 822 823 handle, net, err = tr.driver.StartTask(taskConfig) 824 if err != nil { 825 taskErr := fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err) 826 tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(taskErr)) 827 return taskErr 828 } 829 } else { 830 // Do *NOT* wrap the error here without maintaining whether or not is Recoverable. 831 // You must emit a task event failure to be considered Recoverable 832 tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) 833 return err 834 } 835 } 836 837 tr.stateLock.Lock() 838 tr.localState.TaskHandle = handle 839 tr.localState.DriverNetwork = net 840 if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil { 841 //TODO Nomad will be unable to restore this task; try to kill 842 // it now and fail? In general we prefer to leave running 843 // tasks running even if the agent encounters an error. 844 tr.logger.Warn("error persisting local task state; may be unable to restore after a Nomad restart", 845 "error", err, "task_id", handle.Config.ID) 846 } 847 tr.stateLock.Unlock() 848 849 tr.setDriverHandle(NewDriverHandle(tr.driver, taskConfig.ID, tr.Task(), net)) 850 851 // Emit an event that we started 852 tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) 853 return nil 854} 855 856// initDriver retrives the DriverPlugin from the plugin loader for this task 857func (tr *TaskRunner) initDriver() error { 858 driver, err := tr.driverManager.Dispense(tr.Task().Driver) 859 if err != nil { 860 return err 861 } 862 tr.driver = driver 863 864 schema, err := tr.driver.TaskConfigSchema() 865 if err != nil { 866 return err 867 } 868 spec, diag := hclspecutils.Convert(schema) 869 if diag.HasErrors() { 870 return multierror.Append(errors.New("failed to convert task schema"), diag.Errs()...) 871 } 872 tr.taskSchema = spec 873 874 caps, err := tr.driver.Capabilities() 875 if err != nil { 876 return err 877 } 878 tr.driverCapabilities = caps 879 880 return nil 881} 882 883// handleKill is used to handle the a request to kill a task. It will return 884// the handle exit result if one is available and store any error in the task 885// runner killErr value. 886func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.ExitResult { 887 // Run the pre killing hooks 888 tr.preKill() 889 890 // Wait for task ShutdownDelay after running prekill hooks 891 // This allows for things like service de-registration to run 892 // before waiting to kill task 893 if delay := tr.Task().ShutdownDelay; delay != 0 { 894 tr.logger.Debug("waiting before killing task", "shutdown_delay", delay) 895 896 select { 897 case result := <-resultCh: 898 return result 899 case <-time.After(delay): 900 } 901 } 902 903 // Tell the restart tracker that the task has been killed so it doesn't 904 // attempt to restart it. 905 tr.restartTracker.SetKilled() 906 907 // Check it is running 908 select { 909 case result := <-resultCh: 910 return result 911 default: 912 } 913 914 handle := tr.getDriverHandle() 915 if handle == nil { 916 return nil 917 } 918 919 // Kill the task using an exponential backoff in-case of failures. 920 result, killErr := tr.killTask(handle, resultCh) 921 if killErr != nil { 922 // We couldn't successfully destroy the resource created. 923 tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr) 924 tr.setKillErr(killErr) 925 } 926 927 if result != nil { 928 return result 929 } 930 931 // Block until task has exited. 932 if resultCh == nil { 933 var err error 934 resultCh, err = handle.WaitCh(tr.shutdownCtx) 935 936 // The error should be nil or TaskNotFound, if it's something else then a 937 // failure in the driver or transport layer occurred 938 if err != nil { 939 if err == drivers.ErrTaskNotFound { 940 return nil 941 } 942 tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err) 943 tr.setKillErr(killErr) 944 return nil 945 } 946 } 947 948 select { 949 case result := <-resultCh: 950 return result 951 case <-tr.shutdownCtx.Done(): 952 return nil 953 } 954} 955 956// killTask kills the task handle. In the case that killing fails, 957// killTask will retry with an exponential backoff and will give up at a 958// given limit. Returns an error if the task could not be killed. 959func (tr *TaskRunner) killTask(handle *DriverHandle, resultCh <-chan *drivers.ExitResult) (*drivers.ExitResult, error) { 960 // Cap the number of times we attempt to kill the task. 961 var err error 962 for i := 0; i < killFailureLimit; i++ { 963 if err = handle.Kill(); err != nil { 964 if err == drivers.ErrTaskNotFound { 965 tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID()) 966 return nil, nil 967 } 968 // Calculate the new backoff 969 backoff := (1 << (2 * uint64(i))) * killBackoffBaseline 970 if backoff > killBackoffLimit { 971 backoff = killBackoffLimit 972 } 973 974 tr.logger.Error("failed to kill task", "backoff", backoff, "error", err) 975 select { 976 case result := <-resultCh: 977 return result, nil 978 case <-time.After(backoff): 979 } 980 } else { 981 // Kill was successful 982 return nil, nil 983 } 984 } 985 return nil, err 986} 987 988// persistLocalState persists local state to disk synchronously. 989func (tr *TaskRunner) persistLocalState() error { 990 tr.stateLock.RLock() 991 defer tr.stateLock.RUnlock() 992 993 return tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState) 994} 995 996// buildTaskConfig builds a drivers.TaskConfig with an unique ID for the task. 997// The ID is unique for every invocation, it is built from the alloc ID, task 998// name and 8 random characters. 999func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig { 1000 task := tr.Task() 1001 alloc := tr.Alloc() 1002 invocationid := uuid.Generate()[:8] 1003 taskResources := tr.taskResources 1004 ports := tr.Alloc().AllocatedResources.Shared.Ports 1005 env := tr.envBuilder.Build() 1006 tr.networkIsolationLock.Lock() 1007 defer tr.networkIsolationLock.Unlock() 1008 1009 var dns *drivers.DNSConfig 1010 if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 { 1011 allocDNS := alloc.AllocatedResources.Shared.Networks[0].DNS 1012 if allocDNS != nil { 1013 dns = &drivers.DNSConfig{ 1014 Servers: allocDNS.Servers, 1015 Searches: allocDNS.Searches, 1016 Options: allocDNS.Options, 1017 } 1018 } 1019 } 1020 1021 memoryLimit := taskResources.Memory.MemoryMB 1022 if max := taskResources.Memory.MemoryMaxMB; max > memoryLimit { 1023 memoryLimit = max 1024 } 1025 1026 cpusetCpus := make([]string, len(taskResources.Cpu.ReservedCores)) 1027 for i, v := range taskResources.Cpu.ReservedCores { 1028 cpusetCpus[i] = fmt.Sprintf("%d", v) 1029 } 1030 1031 return &drivers.TaskConfig{ 1032 ID: fmt.Sprintf("%s/%s/%s", alloc.ID, task.Name, invocationid), 1033 Name: task.Name, 1034 JobName: alloc.Job.Name, 1035 JobID: alloc.Job.ID, 1036 TaskGroupName: alloc.TaskGroup, 1037 Namespace: alloc.Namespace, 1038 NodeName: alloc.NodeName, 1039 NodeID: alloc.NodeID, 1040 Resources: &drivers.Resources{ 1041 NomadResources: taskResources, 1042 LinuxResources: &drivers.LinuxResources{ 1043 MemoryLimitBytes: memoryLimit * 1024 * 1024, 1044 CPUShares: taskResources.Cpu.CpuShares, 1045 CpusetCpus: strings.Join(cpusetCpus, ","), 1046 PercentTicks: float64(taskResources.Cpu.CpuShares) / float64(tr.clientConfig.Node.NodeResources.Cpu.CpuShares), 1047 }, 1048 Ports: &ports, 1049 }, 1050 Devices: tr.hookResources.getDevices(), 1051 Mounts: tr.hookResources.getMounts(), 1052 Env: env.Map(), 1053 DeviceEnv: env.DeviceEnv(), 1054 User: task.User, 1055 AllocDir: tr.taskDir.AllocDir, 1056 StdoutPath: tr.logmonHookConfig.stdoutFifo, 1057 StderrPath: tr.logmonHookConfig.stderrFifo, 1058 AllocID: tr.allocID, 1059 NetworkIsolation: tr.networkIsolationSpec, 1060 DNS: dns, 1061 } 1062} 1063 1064// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner 1065// but before Run so no locks need to be acquired. 1066func (tr *TaskRunner) Restore() error { 1067 ls, ts, err := tr.stateDB.GetTaskRunnerState(tr.allocID, tr.taskName) 1068 if err != nil { 1069 return err 1070 } 1071 1072 if ls != nil { 1073 ls.Canonicalize() 1074 tr.localState = ls 1075 } 1076 1077 if ts != nil { 1078 ts.Canonicalize() 1079 tr.state = ts 1080 } 1081 1082 // If a TaskHandle was persisted, ensure it is valid or destroy it. 1083 if taskHandle := tr.localState.TaskHandle; taskHandle != nil { 1084 //TODO if RecoverTask returned the DriverNetwork we wouldn't 1085 // have to persist it at all! 1086 restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) 1087 1088 // If the handle could not be restored, the alloc is 1089 // non-terminal, and the task isn't a system job: wait until 1090 // servers have been contacted before running. #1795 1091 if restored { 1092 return nil 1093 } 1094 1095 alloc := tr.Alloc() 1096 if tr.state.State == structs.TaskStateDead || alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem { 1097 return nil 1098 } 1099 1100 tr.logger.Trace("failed to reattach to task; will not run until server is contacted") 1101 tr.waitOnServers = true 1102 1103 ev := structs.NewTaskEvent(structs.TaskRestoreFailed). 1104 SetDisplayMessage("failed to restore task; will not run until server is contacted") 1105 tr.UpdateState(structs.TaskStatePending, ev) 1106 } 1107 1108 return nil 1109} 1110 1111// restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask 1112// and sets the driver handle. If the TaskHandle is not valid, DestroyTask is 1113// called. 1114func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *drivers.DriverNetwork) (success bool) { 1115 // Ensure handle is well-formed 1116 if taskHandle.Config == nil { 1117 return true 1118 } 1119 1120 if err := tr.driver.RecoverTask(taskHandle); err != nil { 1121 if tr.TaskState().State != structs.TaskStateRunning { 1122 // RecoverTask should fail if the Task wasn't running 1123 return true 1124 } 1125 1126 tr.logger.Error("error recovering task; cleaning up", 1127 "error", err, "task_id", taskHandle.Config.ID) 1128 1129 // Try to cleanup any existing task state in the plugin before restarting 1130 if err := tr.driver.DestroyTask(taskHandle.Config.ID, true); err != nil { 1131 // Ignore ErrTaskNotFound errors as ideally 1132 // this task has already been stopped and 1133 // therefore doesn't exist. 1134 if err != drivers.ErrTaskNotFound { 1135 tr.logger.Warn("error destroying unrecoverable task", 1136 "error", err, "task_id", taskHandle.Config.ID) 1137 } 1138 1139 return false 1140 } 1141 1142 return true 1143 } 1144 1145 // Update driver handle on task runner 1146 tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net)) 1147 return true 1148} 1149 1150// UpdateState sets the task runners allocation state and triggers a server 1151// update. 1152func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { 1153 tr.stateLock.Lock() 1154 defer tr.stateLock.Unlock() 1155 1156 if event != nil { 1157 tr.logger.Trace("setting task state", "state", state, "event", event.Type) 1158 1159 // Append the event 1160 tr.appendEvent(event) 1161 } 1162 1163 // Update the state 1164 if err := tr.updateStateImpl(state); err != nil { 1165 // Only log the error as we persistence errors should not 1166 // affect task state. 1167 tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state) 1168 } 1169 1170 // Store task handle for remote tasks 1171 if tr.driverCapabilities != nil && tr.driverCapabilities.RemoteTasks { 1172 tr.logger.Trace("storing remote task handle state") 1173 tr.localState.TaskHandle.Store(tr.state) 1174 } 1175 1176 // Notify the alloc runner of the transition 1177 tr.stateUpdater.TaskStateUpdated() 1178} 1179 1180// updateStateImpl updates the in-memory task state and persists to disk. 1181func (tr *TaskRunner) updateStateImpl(state string) error { 1182 1183 // Update the task state 1184 oldState := tr.state.State 1185 taskState := tr.state 1186 taskState.State = state 1187 1188 // Handle the state transition. 1189 switch state { 1190 case structs.TaskStateRunning: 1191 // Capture the start time if it is just starting 1192 if oldState != structs.TaskStateRunning { 1193 taskState.StartedAt = time.Now().UTC() 1194 metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"}, 1, tr.baseLabels) 1195 } 1196 case structs.TaskStateDead: 1197 // Capture the finished time if not already set 1198 if taskState.FinishedAt.IsZero() { 1199 taskState.FinishedAt = time.Now().UTC() 1200 } 1201 1202 // Emitting metrics to indicate task complete and failures 1203 if taskState.Failed { 1204 metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"}, 1, tr.baseLabels) 1205 } else { 1206 metrics.IncrCounterWithLabels([]string{"client", "allocs", "complete"}, 1, tr.baseLabels) 1207 } 1208 } 1209 1210 // Persist the state and event 1211 return tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState) 1212} 1213 1214// EmitEvent appends a new TaskEvent to this task's TaskState. The actual 1215// TaskState.State (pending, running, dead) is not changed. Use UpdateState to 1216// transition states. 1217// Events are persisted locally and sent to the server, but errors are simply 1218// logged. Use AppendEvent to simply add a new event. 1219func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { 1220 tr.stateLock.Lock() 1221 defer tr.stateLock.Unlock() 1222 1223 tr.appendEvent(event) 1224 1225 if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil { 1226 // Only a warning because the next event/state-transition will 1227 // try to persist it again. 1228 tr.logger.Warn("error persisting event", "error", err, "event", event) 1229 } 1230 1231 // Notify the alloc runner of the event 1232 tr.stateUpdater.TaskStateUpdated() 1233} 1234 1235// AppendEvent appends a new TaskEvent to this task's TaskState. The actual 1236// TaskState.State (pending, running, dead) is not changed. Use UpdateState to 1237// transition states. 1238// Events are persisted locally and errors are simply logged. Use EmitEvent 1239// also update AllocRunner. 1240func (tr *TaskRunner) AppendEvent(event *structs.TaskEvent) { 1241 tr.stateLock.Lock() 1242 defer tr.stateLock.Unlock() 1243 1244 tr.appendEvent(event) 1245 1246 if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil { 1247 // Only a warning because the next event/state-transition will 1248 // try to persist it again. 1249 tr.logger.Warn("error persisting event", "error", err, "event", event) 1250 } 1251} 1252 1253// appendEvent to task's event slice. Caller must acquire stateLock. 1254func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error { 1255 // Ensure the event is populated with human readable strings 1256 event.PopulateEventDisplayMessage() 1257 1258 // Propagate failure from event to task state 1259 if event.FailsTask { 1260 tr.state.Failed = true 1261 } 1262 1263 // XXX This seems like a super awkward spot for this? Why not shouldRestart? 1264 // Update restart metrics 1265 if event.Type == structs.TaskRestarting { 1266 metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels) 1267 tr.state.Restarts++ 1268 tr.state.LastRestart = time.Unix(0, event.Time) 1269 } 1270 1271 // Append event to slice 1272 appendTaskEvent(tr.state, event, tr.maxEvents) 1273 1274 return nil 1275} 1276 1277// WaitCh is closed when TaskRunner.Run exits. 1278func (tr *TaskRunner) WaitCh() <-chan struct{} { 1279 return tr.waitCh 1280} 1281 1282// Update the running allocation with a new version received from the server. 1283// Calls Update hooks asynchronously with Run. 1284// 1285// This method is safe for calling concurrently with Run and does not modify 1286// the passed in allocation. 1287func (tr *TaskRunner) Update(update *structs.Allocation) { 1288 task := update.LookupTask(tr.taskName) 1289 if task == nil { 1290 // This should not happen and likely indicates a bug in the 1291 // server or client. 1292 tr.logger.Error("allocation update is missing task; killing", 1293 "group", update.TaskGroup) 1294 te := structs.NewTaskEvent(structs.TaskKilled). 1295 SetKillReason("update missing task"). 1296 SetFailsTask() 1297 tr.Kill(context.Background(), te) 1298 return 1299 } 1300 1301 // Update tr.alloc 1302 tr.setAlloc(update, task) 1303 1304 // Trigger update hooks if not terminal 1305 if !update.TerminalStatus() { 1306 tr.triggerUpdateHooks() 1307 } 1308} 1309 1310// SetNetworkIsolation is called by the PreRun allocation hook after configuring 1311// the network isolation for the allocation 1312func (tr *TaskRunner) SetNetworkIsolation(n *drivers.NetworkIsolationSpec) { 1313 tr.networkIsolationLock.Lock() 1314 tr.networkIsolationSpec = n 1315 tr.networkIsolationLock.Unlock() 1316} 1317 1318// triggerUpdate if there isn't already an update pending. Should be called 1319// instead of calling updateHooks directly to serialize runs of update hooks. 1320// TaskRunner state should be updated prior to triggering update hooks. 1321// 1322// Does not block. 1323func (tr *TaskRunner) triggerUpdateHooks() { 1324 select { 1325 case tr.triggerUpdateCh <- struct{}{}: 1326 default: 1327 // already an update hook pending 1328 } 1329} 1330 1331// Shutdown TaskRunner gracefully without affecting the state of the task. 1332// Shutdown blocks until the main Run loop exits. 1333func (tr *TaskRunner) Shutdown() { 1334 tr.logger.Trace("shutting down") 1335 tr.shutdownCtxCancel() 1336 1337 <-tr.WaitCh() 1338 1339 // Run shutdown hooks to cleanup 1340 tr.shutdownHooks() 1341 1342 // Persist once more 1343 tr.persistLocalState() 1344} 1345 1346// LatestResourceUsage returns the last resource utilization datapoint 1347// collected. May return nil if the task is not running or no resource 1348// utilization has been collected yet. 1349func (tr *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage { 1350 tr.resourceUsageLock.Lock() 1351 ru := tr.resourceUsage 1352 tr.resourceUsageLock.Unlock() 1353 1354 // Look up device statistics lazily when fetched, as currently we do not emit any stats for them yet 1355 if ru != nil && tr.deviceStatsReporter != nil { 1356 deviceResources := tr.taskResources.Devices 1357 ru.ResourceUsage.DeviceStats = tr.deviceStatsReporter.LatestDeviceResourceStats(deviceResources) 1358 } 1359 return ru 1360} 1361 1362// UpdateStats updates and emits the latest stats from the driver. 1363func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) { 1364 tr.resourceUsageLock.Lock() 1365 tr.resourceUsage = ru 1366 tr.resourceUsageLock.Unlock() 1367 if ru != nil { 1368 tr.emitStats(ru) 1369 } 1370} 1371 1372//TODO Remove Backwardscompat or use tr.Alloc()? 1373func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { 1374 alloc := tr.Alloc() 1375 var allocatedMem float32 1376 if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { 1377 // Convert to bytes to match other memory metrics 1378 allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024 1379 } 1380 1381 ms := ru.ResourceUsage.MemoryStats 1382 1383 publishMetric := func(v uint64, reported, measured string) { 1384 if v != 0 || helper.SliceStringContains(ms.Measured, measured) { 1385 metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", reported}, 1386 float32(v), tr.baseLabels) 1387 } 1388 } 1389 1390 publishMetric(ms.RSS, "rss", "RSS") 1391 publishMetric(ms.Cache, "cache", "Cache") 1392 publishMetric(ms.Swap, "swap", "Swap") 1393 publishMetric(ms.Usage, "usage", "Usage") 1394 publishMetric(ms.MaxUsage, "max_usage", "Max Usage") 1395 publishMetric(ms.KernelUsage, "kernel_usage", "Kernel Usage") 1396 publishMetric(ms.KernelMaxUsage, "kernel_max_usage", "Kernel Max Usage") 1397 if allocatedMem > 0 { 1398 metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "allocated"}, 1399 allocatedMem, tr.baseLabels) 1400 } 1401} 1402 1403//TODO Remove Backwardscompat or use tr.Alloc()? 1404func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) { 1405 alloc := tr.Alloc() 1406 var allocatedCPU float32 1407 if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil { 1408 allocatedCPU = float32(taskRes.Cpu.CpuShares) 1409 } 1410 1411 metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"}, 1412 float32(ru.ResourceUsage.CpuStats.Percent), tr.baseLabels) 1413 metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"}, 1414 float32(ru.ResourceUsage.CpuStats.SystemMode), tr.baseLabels) 1415 metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"}, 1416 float32(ru.ResourceUsage.CpuStats.UserMode), tr.baseLabels) 1417 metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"}, 1418 float32(ru.ResourceUsage.CpuStats.ThrottledTime), tr.baseLabels) 1419 metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"}, 1420 float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), tr.baseLabels) 1421 metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"}, 1422 float32(ru.ResourceUsage.CpuStats.TotalTicks), tr.baseLabels) 1423 if allocatedCPU > 0 { 1424 metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "allocated"}, 1425 allocatedCPU, tr.baseLabels) 1426 } 1427} 1428 1429// emitStats emits resource usage stats of tasks to remote metrics collector 1430// sinks 1431func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { 1432 if !tr.clientConfig.PublishAllocationMetrics { 1433 return 1434 } 1435 1436 if ru.ResourceUsage.MemoryStats != nil { 1437 tr.setGaugeForMemory(ru) 1438 } else { 1439 tr.logger.Debug("Skipping memory stats for allocation", "reason", "MemoryStats is nil") 1440 } 1441 1442 if ru.ResourceUsage.CpuStats != nil { 1443 tr.setGaugeForCPU(ru) 1444 } else { 1445 tr.logger.Debug("Skipping cpu stats for allocation", "reason", "CpuStats is nil") 1446 } 1447} 1448 1449// appendTaskEvent updates the task status by appending the new event. 1450func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent, capacity int) { 1451 if state.Events == nil { 1452 state.Events = make([]*structs.TaskEvent, 1, capacity) 1453 state.Events[0] = event 1454 return 1455 } 1456 1457 // If we hit capacity, then shift it. 1458 if len(state.Events) == capacity { 1459 old := state.Events 1460 state.Events = make([]*structs.TaskEvent, 0, capacity) 1461 state.Events = append(state.Events, old[1:]...) 1462 } 1463 1464 state.Events = append(state.Events, event) 1465} 1466 1467func (tr *TaskRunner) TaskExecHandler() drivermanager.TaskExecHandler { 1468 // Check it is running 1469 handle := tr.getDriverHandle() 1470 if handle == nil { 1471 return nil 1472 } 1473 return handle.ExecStreaming 1474} 1475 1476func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) { 1477 return tr.driver.Capabilities() 1478} 1479 1480func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) { 1481 tr.allocHookResources = res 1482} 1483