1package taskrunner
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"strings"
8	"sync"
9	"time"
10
11	"github.com/hashicorp/nomad/client/lib/cgutil"
12
13	metrics "github.com/armon/go-metrics"
14	log "github.com/hashicorp/go-hclog"
15	multierror "github.com/hashicorp/go-multierror"
16	"github.com/hashicorp/hcl/v2/hcldec"
17	"github.com/hashicorp/nomad/client/allocdir"
18	"github.com/hashicorp/nomad/client/allocrunner/interfaces"
19	"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
20	"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
21	"github.com/hashicorp/nomad/client/config"
22	"github.com/hashicorp/nomad/client/consul"
23	"github.com/hashicorp/nomad/client/devicemanager"
24	"github.com/hashicorp/nomad/client/dynamicplugins"
25	cinterfaces "github.com/hashicorp/nomad/client/interfaces"
26	"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
27	"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
28	cstate "github.com/hashicorp/nomad/client/state"
29	cstructs "github.com/hashicorp/nomad/client/structs"
30	"github.com/hashicorp/nomad/client/taskenv"
31	"github.com/hashicorp/nomad/client/vaultclient"
32	"github.com/hashicorp/nomad/helper"
33	"github.com/hashicorp/nomad/helper/pluginutils/hclspecutils"
34	"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
35	"github.com/hashicorp/nomad/helper/uuid"
36	"github.com/hashicorp/nomad/nomad/structs"
37	bstructs "github.com/hashicorp/nomad/plugins/base/structs"
38	"github.com/hashicorp/nomad/plugins/drivers"
39)
40
41const (
42	// defaultMaxEvents is the default max capacity for task events on the
43	// task state. Overrideable for testing.
44	defaultMaxEvents = 10
45
46	// killBackoffBaseline is the baseline time for exponential backoff while
47	// killing a task.
48	killBackoffBaseline = 5 * time.Second
49
50	// killBackoffLimit is the limit of the exponential backoff for killing
51	// the task.
52	killBackoffLimit = 2 * time.Minute
53
54	// killFailureLimit is how many times we will attempt to kill a task before
55	// giving up and potentially leaking resources.
56	killFailureLimit = 5
57
58	// triggerUpdateChCap is the capacity for the triggerUpdateCh used for
59	// triggering updates. It should be exactly 1 as even if multiple
60	// updates have come in since the last one was handled, we only need to
61	// handle the last one.
62	triggerUpdateChCap = 1
63)
64
65type TaskRunner struct {
66	// allocID, taskName, taskLeader, and taskResources are immutable so these fields may
67	// be accessed without locks
68	allocID       string
69	taskName      string
70	taskLeader    bool
71	taskResources *structs.AllocatedTaskResources
72
73	alloc     *structs.Allocation
74	allocLock sync.Mutex
75
76	clientConfig *config.Config
77
78	// stateUpdater is used to emit updated task state
79	stateUpdater interfaces.TaskStateHandler
80
81	// state captures the state of the task for updating the allocation
82	// Must acquire stateLock to access.
83	state *structs.TaskState
84
85	// localState captures the node-local state of the task for when the
86	// Nomad agent restarts.
87	// Must acquire stateLock to access.
88	localState *state.LocalState
89
90	// stateLock must be acquired when accessing state or localState.
91	stateLock sync.RWMutex
92
93	// stateDB is for persisting localState and taskState
94	stateDB cstate.StateDB
95
96	// shutdownCtx is used to exit the TaskRunner *without* affecting task state.
97	shutdownCtx context.Context
98
99	// shutdownCtxCancel causes the TaskRunner to exit immediately without
100	// affecting task state. Useful for testing or graceful agent shutdown.
101	shutdownCtxCancel context.CancelFunc
102
103	// killCtx is the task runner's context representing the tasks's lifecycle.
104	// The context is canceled when the task is killed.
105	killCtx context.Context
106
107	// killCtxCancel is called when killing a task.
108	killCtxCancel context.CancelFunc
109
110	// killErr is populated when killing a task. Access should be done use the
111	// getter/setter
112	killErr     error
113	killErrLock sync.Mutex
114
115	// Logger is the logger for the task runner.
116	logger log.Logger
117
118	// triggerUpdateCh is ticked whenever update hooks need to be run and
119	// must be created with cap=1 to signal a pending update and prevent
120	// callers from deadlocking if the receiver has exited.
121	triggerUpdateCh chan struct{}
122
123	// waitCh is closed when the task runner has transitioned to a terminal
124	// state
125	waitCh chan struct{}
126
127	// driver is the driver for the task.
128	driver drivers.DriverPlugin
129
130	// driverCapabilities is the set capabilities the driver supports
131	driverCapabilities *drivers.Capabilities
132
133	// taskSchema is the hcl spec for the task driver configuration
134	taskSchema hcldec.Spec
135
136	// handleLock guards access to handle and handleResult
137	handleLock sync.Mutex
138
139	// handle to the running driver
140	handle *DriverHandle
141
142	// task is the task being run
143	task     *structs.Task
144	taskLock sync.RWMutex
145
146	// taskDir is the directory structure for this task.
147	taskDir *allocdir.TaskDir
148
149	// envBuilder is used to build the task's environment
150	envBuilder *taskenv.Builder
151
152	// restartTracker is used to decide if the task should be restarted.
153	restartTracker *restarts.RestartTracker
154
155	// runnerHooks are task runner lifecycle hooks that should be run on state
156	// transistions.
157	runnerHooks []interfaces.TaskHook
158
159	// hookResources captures the resources provided by hooks
160	hookResources *hookResources
161
162	// consulClient is the client used by the consul service hook for
163	// registering services and checks
164	consulServiceClient consul.ConsulServiceAPI
165
166	// consulProxiesClient is the client used by the envoy version hook for
167	// asking consul what version of envoy nomad should inject into the connect
168	// sidecar or gateway task.
169	consulProxiesClient consul.SupportedProxiesAPI
170
171	// sidsClient is the client used by the service identity hook for managing
172	// service identity tokens
173	siClient consul.ServiceIdentityAPI
174
175	// vaultClient is the client to use to derive and renew Vault tokens
176	vaultClient vaultclient.VaultClient
177
178	// vaultToken is the current Vault token. It should be accessed with the
179	// getter.
180	vaultToken     string
181	vaultTokenLock sync.Mutex
182
183	// baseLabels are used when emitting tagged metrics. All task runner metrics
184	// will have these tags, and optionally more.
185	baseLabels []metrics.Label
186
187	// logmonHookConfig is used to get the paths to the stdout and stderr fifos
188	// to be passed to the driver for task logging
189	logmonHookConfig *logmonHookConfig
190
191	// resourceUsage is written via UpdateStats and read via
192	// LatestResourceUsage. May be nil at all times.
193	resourceUsage     *cstructs.TaskResourceUsage
194	resourceUsageLock sync.Mutex
195
196	// deviceStatsReporter is used to lookup resource usage for alloc devices
197	deviceStatsReporter cinterfaces.DeviceStatsReporter
198
199	// csiManager is used to manage the mounting of CSI volumes into tasks
200	csiManager csimanager.Manager
201
202	// devicemanager is used to mount devices as well as lookup device
203	// statistics
204	devicemanager devicemanager.Manager
205
206	// cpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform
207	cpusetCgroupPathGetter cgutil.CgroupPathGetter
208
209	// driverManager is used to dispense driver plugins and register event
210	// handlers
211	driverManager drivermanager.Manager
212
213	// dynamicRegistry is where dynamic plugins should be registered.
214	dynamicRegistry dynamicplugins.Registry
215
216	// maxEvents is the capacity of the TaskEvents on the TaskState.
217	// Defaults to defaultMaxEvents but overrideable for testing.
218	maxEvents int
219
220	// serversContactedCh is passed to TaskRunners so they can detect when
221	// GetClientAllocs has been called in case of a failed restore.
222	serversContactedCh <-chan struct{}
223
224	// startConditionMetCtx is done when TR should start the task
225	startConditionMetCtx <-chan struct{}
226
227	// waitOnServers defaults to false but will be set true if a restore
228	// fails and the Run method should wait until serversContactedCh is
229	// closed.
230	waitOnServers bool
231
232	networkIsolationLock sync.Mutex
233	networkIsolationSpec *drivers.NetworkIsolationSpec
234
235	allocHookResources *cstructs.AllocHookResources
236}
237
238type Config struct {
239	Alloc        *structs.Allocation
240	ClientConfig *config.Config
241	Task         *structs.Task
242	TaskDir      *allocdir.TaskDir
243	Logger       log.Logger
244
245	// Consul is the client to use for managing Consul service registrations
246	Consul consul.ConsulServiceAPI
247
248	// ConsulProxies is the client to use for looking up supported envoy versions
249	// from Consul.
250	ConsulProxies consul.SupportedProxiesAPI
251
252	// ConsulSI is the client to use for managing Consul SI tokens
253	ConsulSI consul.ServiceIdentityAPI
254
255	// DynamicRegistry is where dynamic plugins should be registered.
256	DynamicRegistry dynamicplugins.Registry
257
258	// Vault is the client to use to derive and renew Vault tokens
259	Vault vaultclient.VaultClient
260
261	// StateDB is used to store and restore state.
262	StateDB cstate.StateDB
263
264	// StateUpdater is used to emit updated task state
265	StateUpdater interfaces.TaskStateHandler
266
267	// deviceStatsReporter is used to lookup resource usage for alloc devices
268	DeviceStatsReporter cinterfaces.DeviceStatsReporter
269
270	// CSIManager is used to manage the mounting of CSI volumes into tasks
271	CSIManager csimanager.Manager
272
273	// CpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform
274	CpusetCgroupPathGetter cgutil.CgroupPathGetter
275
276	// DeviceManager is used to mount devices as well as lookup device
277	// statistics
278	DeviceManager devicemanager.Manager
279
280	// DriverManager is used to dispense driver plugins and register event
281	// handlers
282	DriverManager drivermanager.Manager
283
284	// ServersContactedCh is closed when the first GetClientAllocs call to
285	// servers succeeds and allocs are synced.
286	ServersContactedCh chan struct{}
287
288	// startConditionMetCtx is done when TR should start the task
289	StartConditionMetCtx <-chan struct{}
290}
291
292func NewTaskRunner(config *Config) (*TaskRunner, error) {
293	// Create a context for causing the runner to exit
294	trCtx, trCancel := context.WithCancel(context.Background())
295
296	// Create a context for killing the runner
297	killCtx, killCancel := context.WithCancel(context.Background())
298
299	// Initialize the environment builder
300	envBuilder := taskenv.NewBuilder(
301		config.ClientConfig.Node,
302		config.Alloc,
303		config.Task,
304		config.ClientConfig.Region,
305	)
306
307	// Initialize state from alloc if it is set
308	tstate := structs.NewTaskState()
309	if ts := config.Alloc.TaskStates[config.Task.Name]; ts != nil {
310		tstate = ts.Copy()
311	}
312
313	tr := &TaskRunner{
314		alloc:                  config.Alloc,
315		allocID:                config.Alloc.ID,
316		clientConfig:           config.ClientConfig,
317		task:                   config.Task,
318		taskDir:                config.TaskDir,
319		taskName:               config.Task.Name,
320		taskLeader:             config.Task.Leader,
321		envBuilder:             envBuilder,
322		dynamicRegistry:        config.DynamicRegistry,
323		consulServiceClient:    config.Consul,
324		consulProxiesClient:    config.ConsulProxies,
325		siClient:               config.ConsulSI,
326		vaultClient:            config.Vault,
327		state:                  tstate,
328		localState:             state.NewLocalState(),
329		stateDB:                config.StateDB,
330		stateUpdater:           config.StateUpdater,
331		deviceStatsReporter:    config.DeviceStatsReporter,
332		killCtx:                killCtx,
333		killCtxCancel:          killCancel,
334		shutdownCtx:            trCtx,
335		shutdownCtxCancel:      trCancel,
336		triggerUpdateCh:        make(chan struct{}, triggerUpdateChCap),
337		waitCh:                 make(chan struct{}),
338		csiManager:             config.CSIManager,
339		cpusetCgroupPathGetter: config.CpusetCgroupPathGetter,
340		devicemanager:          config.DeviceManager,
341		driverManager:          config.DriverManager,
342		maxEvents:              defaultMaxEvents,
343		serversContactedCh:     config.ServersContactedCh,
344		startConditionMetCtx:   config.StartConditionMetCtx,
345	}
346
347	// Create the logger based on the allocation ID
348	tr.logger = config.Logger.Named("task_runner").With("task", config.Task.Name)
349
350	// Pull out the task's resources
351	ares := tr.alloc.AllocatedResources
352	if ares == nil {
353		return nil, fmt.Errorf("no task resources found on allocation")
354	}
355
356	tres, ok := ares.Tasks[tr.taskName]
357	if !ok {
358		return nil, fmt.Errorf("no task resources found on allocation")
359	}
360	tr.taskResources = tres
361
362	// Build the restart tracker.
363	rp := config.Task.RestartPolicy
364	if rp == nil {
365		tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
366		if tg == nil {
367			tr.logger.Error("alloc missing task group")
368			return nil, fmt.Errorf("alloc missing task group")
369		}
370		rp = tg.RestartPolicy
371	}
372	tr.restartTracker = restarts.NewRestartTracker(rp, tr.alloc.Job.Type, config.Task.Lifecycle)
373
374	// Get the driver
375	if err := tr.initDriver(); err != nil {
376		tr.logger.Error("failed to create driver", "error", err)
377		return nil, err
378	}
379
380	// Initialize the runners hooks. Must come after initDriver so hooks
381	// can use tr.driverCapabilities
382	tr.initHooks()
383
384	// Initialize base labels
385	tr.initLabels()
386
387	// Initialize initial task received event
388	tr.appendEvent(structs.NewTaskEvent(structs.TaskReceived))
389
390	return tr, nil
391}
392
393func (tr *TaskRunner) initLabels() {
394	alloc := tr.Alloc()
395	tr.baseLabels = []metrics.Label{
396		{
397			Name:  "job",
398			Value: alloc.Job.Name,
399		},
400		{
401			Name:  "task_group",
402			Value: alloc.TaskGroup,
403		},
404		{
405			Name:  "alloc_id",
406			Value: tr.allocID,
407		},
408		{
409			Name:  "task",
410			Value: tr.taskName,
411		},
412		{
413			Name:  "namespace",
414			Value: tr.alloc.Namespace,
415		},
416	}
417
418	if tr.alloc.Job.ParentID != "" {
419		tr.baseLabels = append(tr.baseLabels, metrics.Label{
420			Name:  "parent_id",
421			Value: tr.alloc.Job.ParentID,
422		})
423		if strings.Contains(tr.alloc.Job.Name, "/dispatch-") {
424			tr.baseLabels = append(tr.baseLabels, metrics.Label{
425				Name:  "dispatch_id",
426				Value: strings.Split(tr.alloc.Job.Name, "/dispatch-")[1],
427			})
428		}
429		if strings.Contains(tr.alloc.Job.Name, "/periodic-") {
430			tr.baseLabels = append(tr.baseLabels, metrics.Label{
431				Name:  "periodic_id",
432				Value: strings.Split(tr.alloc.Job.Name, "/periodic-")[1],
433			})
434		}
435	}
436}
437
438// Mark a task as failed and not to run.  Aimed to be invoked when alloc runner
439// prestart hooks failed.
440// Should never be called with Run().
441func (tr *TaskRunner) MarkFailedDead(reason string) {
442	defer close(tr.waitCh)
443
444	tr.stateLock.Lock()
445	if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
446		//TODO Nomad will be unable to restore this task; try to kill
447		//     it now and fail? In general we prefer to leave running
448		//     tasks running even if the agent encounters an error.
449		tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart",
450			"error", err)
451	}
452	tr.stateLock.Unlock()
453
454	event := structs.NewTaskEvent(structs.TaskSetupFailure).
455		SetDisplayMessage(reason).
456		SetFailsTask()
457	tr.UpdateState(structs.TaskStateDead, event)
458
459	// Run the stop hooks in case task was a restored task that failed prestart
460	if err := tr.stop(); err != nil {
461		tr.logger.Error("stop failed while marking task dead", "error", err)
462	}
463}
464
465// Run the TaskRunner. Starts the user's task or reattaches to a restored task.
466// Run closes WaitCh when it exits. Should be started in a goroutine.
467func (tr *TaskRunner) Run() {
468	defer close(tr.waitCh)
469	var result *drivers.ExitResult
470
471	tr.stateLock.RLock()
472	dead := tr.state.State == structs.TaskStateDead
473	tr.stateLock.RUnlock()
474
475	// if restoring a dead task, ensure that task is cleared and all post hooks
476	// are called without additional state updates
477	if dead {
478		// do cleanup functions without emitting any additional events/work
479		// to handle cases where we restored a dead task where client terminated
480		// after task finished before completing post-run actions.
481		tr.clearDriverHandle()
482		tr.stateUpdater.TaskStateUpdated()
483		if err := tr.stop(); err != nil {
484			tr.logger.Error("stop failed on terminal task", "error", err)
485		}
486		return
487	}
488
489	// Updates are handled asynchronously with the other hooks but each
490	// triggered update - whether due to alloc updates or a new vault token
491	// - should be handled serially.
492	go tr.handleUpdates()
493
494	// If restore failed wait until servers are contacted before running.
495	// #1795
496	if tr.waitOnServers {
497		tr.logger.Info("task failed to restore; waiting to contact server before restarting")
498		select {
499		case <-tr.killCtx.Done():
500			tr.logger.Info("task killed while waiting for server contact")
501		case <-tr.shutdownCtx.Done():
502			return
503		case <-tr.serversContactedCh:
504			tr.logger.Info("server contacted; unblocking waiting task")
505		}
506	}
507
508	select {
509	case <-tr.startConditionMetCtx:
510		tr.logger.Debug("lifecycle start condition has been met, proceeding")
511		// yay proceed
512	case <-tr.killCtx.Done():
513	case <-tr.shutdownCtx.Done():
514		return
515	}
516
517MAIN:
518	for !tr.shouldShutdown() {
519		select {
520		case <-tr.killCtx.Done():
521			break MAIN
522		case <-tr.shutdownCtx.Done():
523			// TaskRunner was told to exit immediately
524			return
525		default:
526		}
527
528		// Run the prestart hooks
529		if err := tr.prestart(); err != nil {
530			tr.logger.Error("prestart failed", "error", err)
531			tr.restartTracker.SetStartError(err)
532			goto RESTART
533		}
534
535		select {
536		case <-tr.killCtx.Done():
537			break MAIN
538		case <-tr.shutdownCtx.Done():
539			// TaskRunner was told to exit immediately
540			return
541		default:
542		}
543
544		// Run the task
545		if err := tr.runDriver(); err != nil {
546			tr.logger.Error("running driver failed", "error", err)
547			tr.restartTracker.SetStartError(err)
548			goto RESTART
549		}
550
551		// Run the poststart hooks
552		if err := tr.poststart(); err != nil {
553			tr.logger.Error("poststart failed", "error", err)
554		}
555
556		// Grab the result proxy and wait for task to exit
557	WAIT:
558		{
559			handle := tr.getDriverHandle()
560			result = nil
561
562			// Do *not* use tr.killCtx here as it would cause
563			// Wait() to unblock before the task exits when Kill()
564			// is called.
565			if resultCh, err := handle.WaitCh(context.Background()); err != nil {
566				tr.logger.Error("wait task failed", "error", err)
567			} else {
568				select {
569				case <-tr.killCtx.Done():
570					// We can go through the normal should restart check since
571					// the restart tracker knowns it is killed
572					result = tr.handleKill(resultCh)
573				case <-tr.shutdownCtx.Done():
574					// TaskRunner was told to exit immediately
575					return
576				case result = <-resultCh:
577				}
578
579				// WaitCh returned a result
580				if retryWait := tr.handleTaskExitResult(result); retryWait {
581					goto WAIT
582				}
583			}
584		}
585
586		// Clear the handle
587		tr.clearDriverHandle()
588
589		// Store the wait result on the restart tracker
590		tr.restartTracker.SetExitResult(result)
591
592		if err := tr.exited(); err != nil {
593			tr.logger.Error("exited hooks failed", "error", err)
594		}
595
596	RESTART:
597		restart, restartDelay := tr.shouldRestart()
598		if !restart {
599			break MAIN
600		}
601
602		// Actually restart by sleeping and also watching for destroy events
603		select {
604		case <-time.After(restartDelay):
605		case <-tr.killCtx.Done():
606			tr.logger.Trace("task killed between restarts", "delay", restartDelay)
607			break MAIN
608		case <-tr.shutdownCtx.Done():
609			// TaskRunner was told to exit immediately
610			tr.logger.Trace("gracefully shutting down during restart delay")
611			return
612		}
613	}
614
615	// Ensure handle is cleaned up. Restore could have recovered a task
616	// that should be terminal, so if the handle still exists we should
617	// kill it here.
618	if tr.getDriverHandle() != nil {
619		if result = tr.handleKill(nil); result != nil {
620			tr.emitExitResultEvent(result)
621		}
622
623		tr.clearDriverHandle()
624
625		if err := tr.exited(); err != nil {
626			tr.logger.Error("exited hooks failed while cleaning up terminal task", "error", err)
627		}
628	}
629
630	// Mark the task as dead
631	tr.UpdateState(structs.TaskStateDead, nil)
632
633	// Run the stop hooks
634	if err := tr.stop(); err != nil {
635		tr.logger.Error("stop failed", "error", err)
636	}
637
638	tr.logger.Debug("task run loop exiting")
639}
640
641func (tr *TaskRunner) shouldShutdown() bool {
642	alloc := tr.Alloc()
643	if alloc.ClientTerminalStatus() {
644		return true
645	}
646
647	if !tr.IsPoststopTask() && alloc.ServerTerminalStatus() {
648		return true
649	}
650
651	return false
652}
653
654// handleTaskExitResult handles the results returned by the task exiting. If
655// retryWait is true, the caller should attempt to wait on the task again since
656// it has not actually finished running. This can happen if the driver plugin
657// has exited.
658func (tr *TaskRunner) handleTaskExitResult(result *drivers.ExitResult) (retryWait bool) {
659	if result == nil {
660		return false
661	}
662
663	if result.Err == bstructs.ErrPluginShutdown {
664		dn := tr.Task().Driver
665		tr.logger.Debug("driver plugin has shutdown; attempting to recover task", "driver", dn)
666
667		// Initialize a new driver handle
668		if err := tr.initDriver(); err != nil {
669			tr.logger.Error("failed to initialize driver after it exited unexpectedly", "error", err, "driver", dn)
670			return false
671		}
672
673		// Try to restore the handle
674		tr.stateLock.RLock()
675		h := tr.localState.TaskHandle
676		net := tr.localState.DriverNetwork
677		tr.stateLock.RUnlock()
678		if !tr.restoreHandle(h, net) {
679			tr.logger.Error("failed to restore handle on driver after it exited unexpectedly", "driver", dn)
680			return false
681		}
682
683		tr.logger.Debug("task successfully recovered on driver", "driver", dn)
684		return true
685	}
686
687	// Emit Terminated event
688	tr.emitExitResultEvent(result)
689
690	return false
691}
692
693// emitExitResultEvent emits a TaskTerminated event for an ExitResult.
694func (tr *TaskRunner) emitExitResultEvent(result *drivers.ExitResult) {
695	event := structs.NewTaskEvent(structs.TaskTerminated).
696		SetExitCode(result.ExitCode).
697		SetSignal(result.Signal).
698		SetOOMKilled(result.OOMKilled).
699		SetExitMessage(result.Err)
700
701	tr.EmitEvent(event)
702
703	if result.OOMKilled {
704		metrics.IncrCounterWithLabels([]string{"client", "allocs", "oom_killed"}, 1, tr.baseLabels)
705	}
706}
707
708// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits
709// when Run has returned. Should only be run in a goroutine from Run.
710func (tr *TaskRunner) handleUpdates() {
711	for {
712		select {
713		case <-tr.triggerUpdateCh:
714		case <-tr.waitCh:
715			return
716		}
717
718		// Non-terminal update; run hooks
719		tr.updateHooks()
720	}
721}
722
723// shouldRestart determines whether the task should be restarted and updates
724// the task state unless the task is killed or terminated.
725func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
726	// Determine if we should restart
727	state, when := tr.restartTracker.GetState()
728	reason := tr.restartTracker.GetReason()
729	switch state {
730	case structs.TaskKilled:
731		// Never restart an explicitly killed task. Kill method handles
732		// updating the server.
733		tr.EmitEvent(structs.NewTaskEvent(state))
734		return false, 0
735	case structs.TaskNotRestarting, structs.TaskTerminated:
736		tr.logger.Info("not restarting task", "reason", reason)
737		if state == structs.TaskNotRestarting {
738			tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask())
739		}
740		return false, 0
741	case structs.TaskRestarting:
742		tr.logger.Info("restarting task", "reason", reason, "delay", when)
743		tr.UpdateState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason))
744		return true, when
745	default:
746		tr.logger.Error("restart tracker returned unknown state", "state", state)
747		return true, when
748	}
749}
750
751// runDriver runs the driver and waits for it to exit
752// runDriver emits an appropriate task event on success/failure
753func (tr *TaskRunner) runDriver() error {
754
755	taskConfig := tr.buildTaskConfig()
756	if tr.cpusetCgroupPathGetter != nil {
757		cpusetCgroupPath, err := tr.cpusetCgroupPathGetter(tr.killCtx)
758		if err != nil {
759			return err
760		}
761		taskConfig.Resources.LinuxResources.CpusetCgroupPath = cpusetCgroupPath
762	}
763
764	// Build hcl context variables
765	vars, errs, err := tr.envBuilder.Build().AllValues()
766	if err != nil {
767		return fmt.Errorf("error building environment variables: %v", err)
768	}
769
770	// Handle per-key errors
771	if len(errs) > 0 {
772		keys := make([]string, 0, len(errs))
773		for k, err := range errs {
774			keys = append(keys, k)
775
776			if tr.logger.IsTrace() {
777				// Verbosely log every diagnostic for debugging
778				tr.logger.Trace("error building environment variables", "key", k, "error", err)
779			}
780		}
781
782		tr.logger.Warn("some environment variables not available for rendering", "keys", strings.Join(keys, ", "))
783	}
784
785	val, diag, diagErrs := hclutils.ParseHclInterface(tr.task.Config, tr.taskSchema, vars)
786	if diag.HasErrors() {
787		parseErr := multierror.Append(errors.New("failed to parse config: "), diagErrs...)
788		tr.EmitEvent(structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(parseErr))
789		return parseErr
790	}
791
792	if err := taskConfig.EncodeDriverConfig(val); err != nil {
793		encodeErr := fmt.Errorf("failed to encode driver config: %v", err)
794		tr.EmitEvent(structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(encodeErr))
795		return encodeErr
796	}
797
798	// If there's already a task handle (eg from a Restore) there's nothing
799	// to do except update state.
800	if tr.getDriverHandle() != nil {
801		// Ensure running state is persisted but do *not* append a new
802		// task event as restoring is a client event and not relevant
803		// to a task's lifecycle.
804		if err := tr.updateStateImpl(structs.TaskStateRunning); err != nil {
805			//TODO return error and destroy task to avoid an orphaned task?
806			tr.logger.Warn("error persisting task state", "error", err)
807		}
808		return nil
809	}
810
811	// Start the job if there's no existing handle (or if RecoverTask failed)
812	handle, net, err := tr.driver.StartTask(taskConfig)
813	if err != nil {
814		// The plugin has died, try relaunching it
815		if err == bstructs.ErrPluginShutdown {
816			tr.logger.Info("failed to start task because plugin shutdown unexpectedly; attempting to recover")
817			if err := tr.initDriver(); err != nil {
818				taskErr := fmt.Errorf("failed to initialize driver after it exited unexpectedly: %v", err)
819				tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(taskErr))
820				return taskErr
821			}
822
823			handle, net, err = tr.driver.StartTask(taskConfig)
824			if err != nil {
825				taskErr := fmt.Errorf("failed to start task after driver exited unexpectedly: %v", err)
826				tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(taskErr))
827				return taskErr
828			}
829		} else {
830			// Do *NOT* wrap the error here without maintaining whether or not is Recoverable.
831			// You must emit a task event failure to be considered Recoverable
832			tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
833			return err
834		}
835	}
836
837	tr.stateLock.Lock()
838	tr.localState.TaskHandle = handle
839	tr.localState.DriverNetwork = net
840	if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
841		//TODO Nomad will be unable to restore this task; try to kill
842		//     it now and fail? In general we prefer to leave running
843		//     tasks running even if the agent encounters an error.
844		tr.logger.Warn("error persisting local task state; may be unable to restore after a Nomad restart",
845			"error", err, "task_id", handle.Config.ID)
846	}
847	tr.stateLock.Unlock()
848
849	tr.setDriverHandle(NewDriverHandle(tr.driver, taskConfig.ID, tr.Task(), net))
850
851	// Emit an event that we started
852	tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
853	return nil
854}
855
856// initDriver retrives the DriverPlugin from the plugin loader for this task
857func (tr *TaskRunner) initDriver() error {
858	driver, err := tr.driverManager.Dispense(tr.Task().Driver)
859	if err != nil {
860		return err
861	}
862	tr.driver = driver
863
864	schema, err := tr.driver.TaskConfigSchema()
865	if err != nil {
866		return err
867	}
868	spec, diag := hclspecutils.Convert(schema)
869	if diag.HasErrors() {
870		return multierror.Append(errors.New("failed to convert task schema"), diag.Errs()...)
871	}
872	tr.taskSchema = spec
873
874	caps, err := tr.driver.Capabilities()
875	if err != nil {
876		return err
877	}
878	tr.driverCapabilities = caps
879
880	return nil
881}
882
883// handleKill is used to handle the a request to kill a task. It will return
884// the handle exit result if one is available and store any error in the task
885// runner killErr value.
886func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.ExitResult {
887	// Run the pre killing hooks
888	tr.preKill()
889
890	// Wait for task ShutdownDelay after running prekill hooks
891	// This allows for things like service de-registration to run
892	// before waiting to kill task
893	if delay := tr.Task().ShutdownDelay; delay != 0 {
894		tr.logger.Debug("waiting before killing task", "shutdown_delay", delay)
895
896		select {
897		case result := <-resultCh:
898			return result
899		case <-time.After(delay):
900		}
901	}
902
903	// Tell the restart tracker that the task has been killed so it doesn't
904	// attempt to restart it.
905	tr.restartTracker.SetKilled()
906
907	// Check it is running
908	select {
909	case result := <-resultCh:
910		return result
911	default:
912	}
913
914	handle := tr.getDriverHandle()
915	if handle == nil {
916		return nil
917	}
918
919	// Kill the task using an exponential backoff in-case of failures.
920	result, killErr := tr.killTask(handle, resultCh)
921	if killErr != nil {
922		// We couldn't successfully destroy the resource created.
923		tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
924		tr.setKillErr(killErr)
925	}
926
927	if result != nil {
928		return result
929	}
930
931	// Block until task has exited.
932	if resultCh == nil {
933		var err error
934		resultCh, err = handle.WaitCh(tr.shutdownCtx)
935
936		// The error should be nil or TaskNotFound, if it's something else then a
937		// failure in the driver or transport layer occurred
938		if err != nil {
939			if err == drivers.ErrTaskNotFound {
940				return nil
941			}
942			tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
943			tr.setKillErr(killErr)
944			return nil
945		}
946	}
947
948	select {
949	case result := <-resultCh:
950		return result
951	case <-tr.shutdownCtx.Done():
952		return nil
953	}
954}
955
956// killTask kills the task handle. In the case that killing fails,
957// killTask will retry with an exponential backoff and will give up at a
958// given limit. Returns an error if the task could not be killed.
959func (tr *TaskRunner) killTask(handle *DriverHandle, resultCh <-chan *drivers.ExitResult) (*drivers.ExitResult, error) {
960	// Cap the number of times we attempt to kill the task.
961	var err error
962	for i := 0; i < killFailureLimit; i++ {
963		if err = handle.Kill(); err != nil {
964			if err == drivers.ErrTaskNotFound {
965				tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID())
966				return nil, nil
967			}
968			// Calculate the new backoff
969			backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
970			if backoff > killBackoffLimit {
971				backoff = killBackoffLimit
972			}
973
974			tr.logger.Error("failed to kill task", "backoff", backoff, "error", err)
975			select {
976			case result := <-resultCh:
977				return result, nil
978			case <-time.After(backoff):
979			}
980		} else {
981			// Kill was successful
982			return nil, nil
983		}
984	}
985	return nil, err
986}
987
988// persistLocalState persists local state to disk synchronously.
989func (tr *TaskRunner) persistLocalState() error {
990	tr.stateLock.RLock()
991	defer tr.stateLock.RUnlock()
992
993	return tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState)
994}
995
996// buildTaskConfig builds a drivers.TaskConfig with an unique ID for the task.
997// The ID is unique for every invocation, it is built from the alloc ID, task
998// name and 8 random characters.
999func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
1000	task := tr.Task()
1001	alloc := tr.Alloc()
1002	invocationid := uuid.Generate()[:8]
1003	taskResources := tr.taskResources
1004	ports := tr.Alloc().AllocatedResources.Shared.Ports
1005	env := tr.envBuilder.Build()
1006	tr.networkIsolationLock.Lock()
1007	defer tr.networkIsolationLock.Unlock()
1008
1009	var dns *drivers.DNSConfig
1010	if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 {
1011		allocDNS := alloc.AllocatedResources.Shared.Networks[0].DNS
1012		if allocDNS != nil {
1013			dns = &drivers.DNSConfig{
1014				Servers:  allocDNS.Servers,
1015				Searches: allocDNS.Searches,
1016				Options:  allocDNS.Options,
1017			}
1018		}
1019	}
1020
1021	memoryLimit := taskResources.Memory.MemoryMB
1022	if max := taskResources.Memory.MemoryMaxMB; max > memoryLimit {
1023		memoryLimit = max
1024	}
1025
1026	cpusetCpus := make([]string, len(taskResources.Cpu.ReservedCores))
1027	for i, v := range taskResources.Cpu.ReservedCores {
1028		cpusetCpus[i] = fmt.Sprintf("%d", v)
1029	}
1030
1031	return &drivers.TaskConfig{
1032		ID:            fmt.Sprintf("%s/%s/%s", alloc.ID, task.Name, invocationid),
1033		Name:          task.Name,
1034		JobName:       alloc.Job.Name,
1035		JobID:         alloc.Job.ID,
1036		TaskGroupName: alloc.TaskGroup,
1037		Namespace:     alloc.Namespace,
1038		NodeName:      alloc.NodeName,
1039		NodeID:        alloc.NodeID,
1040		Resources: &drivers.Resources{
1041			NomadResources: taskResources,
1042			LinuxResources: &drivers.LinuxResources{
1043				MemoryLimitBytes: memoryLimit * 1024 * 1024,
1044				CPUShares:        taskResources.Cpu.CpuShares,
1045				CpusetCpus:       strings.Join(cpusetCpus, ","),
1046				PercentTicks:     float64(taskResources.Cpu.CpuShares) / float64(tr.clientConfig.Node.NodeResources.Cpu.CpuShares),
1047			},
1048			Ports: &ports,
1049		},
1050		Devices:          tr.hookResources.getDevices(),
1051		Mounts:           tr.hookResources.getMounts(),
1052		Env:              env.Map(),
1053		DeviceEnv:        env.DeviceEnv(),
1054		User:             task.User,
1055		AllocDir:         tr.taskDir.AllocDir,
1056		StdoutPath:       tr.logmonHookConfig.stdoutFifo,
1057		StderrPath:       tr.logmonHookConfig.stderrFifo,
1058		AllocID:          tr.allocID,
1059		NetworkIsolation: tr.networkIsolationSpec,
1060		DNS:              dns,
1061	}
1062}
1063
1064// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner
1065// but before Run so no locks need to be acquired.
1066func (tr *TaskRunner) Restore() error {
1067	ls, ts, err := tr.stateDB.GetTaskRunnerState(tr.allocID, tr.taskName)
1068	if err != nil {
1069		return err
1070	}
1071
1072	if ls != nil {
1073		ls.Canonicalize()
1074		tr.localState = ls
1075	}
1076
1077	if ts != nil {
1078		ts.Canonicalize()
1079		tr.state = ts
1080	}
1081
1082	// If a TaskHandle was persisted, ensure it is valid or destroy it.
1083	if taskHandle := tr.localState.TaskHandle; taskHandle != nil {
1084		//TODO if RecoverTask returned the DriverNetwork we wouldn't
1085		//     have to persist it at all!
1086		restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
1087
1088		// If the handle could not be restored, the alloc is
1089		// non-terminal, and the task isn't a system job: wait until
1090		// servers have been contacted before running. #1795
1091		if restored {
1092			return nil
1093		}
1094
1095		alloc := tr.Alloc()
1096		if tr.state.State == structs.TaskStateDead || alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
1097			return nil
1098		}
1099
1100		tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
1101		tr.waitOnServers = true
1102
1103		ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
1104			SetDisplayMessage("failed to restore task; will not run until server is contacted")
1105		tr.UpdateState(structs.TaskStatePending, ev)
1106	}
1107
1108	return nil
1109}
1110
1111// restoreHandle ensures a TaskHandle is valid by calling Driver.RecoverTask
1112// and sets the driver handle. If the TaskHandle is not valid, DestroyTask is
1113// called.
1114func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *drivers.DriverNetwork) (success bool) {
1115	// Ensure handle is well-formed
1116	if taskHandle.Config == nil {
1117		return true
1118	}
1119
1120	if err := tr.driver.RecoverTask(taskHandle); err != nil {
1121		if tr.TaskState().State != structs.TaskStateRunning {
1122			// RecoverTask should fail if the Task wasn't running
1123			return true
1124		}
1125
1126		tr.logger.Error("error recovering task; cleaning up",
1127			"error", err, "task_id", taskHandle.Config.ID)
1128
1129		// Try to cleanup any existing task state in the plugin before restarting
1130		if err := tr.driver.DestroyTask(taskHandle.Config.ID, true); err != nil {
1131			// Ignore ErrTaskNotFound errors as ideally
1132			// this task has already been stopped and
1133			// therefore doesn't exist.
1134			if err != drivers.ErrTaskNotFound {
1135				tr.logger.Warn("error destroying unrecoverable task",
1136					"error", err, "task_id", taskHandle.Config.ID)
1137			}
1138
1139			return false
1140		}
1141
1142		return true
1143	}
1144
1145	// Update driver handle on task runner
1146	tr.setDriverHandle(NewDriverHandle(tr.driver, taskHandle.Config.ID, tr.Task(), net))
1147	return true
1148}
1149
1150// UpdateState sets the task runners allocation state and triggers a server
1151// update.
1152func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
1153	tr.stateLock.Lock()
1154	defer tr.stateLock.Unlock()
1155
1156	if event != nil {
1157		tr.logger.Trace("setting task state", "state", state, "event", event.Type)
1158
1159		// Append the event
1160		tr.appendEvent(event)
1161	}
1162
1163	// Update the state
1164	if err := tr.updateStateImpl(state); err != nil {
1165		// Only log the error as we persistence errors should not
1166		// affect task state.
1167		tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
1168	}
1169
1170	// Store task handle for remote tasks
1171	if tr.driverCapabilities != nil && tr.driverCapabilities.RemoteTasks {
1172		tr.logger.Trace("storing remote task handle state")
1173		tr.localState.TaskHandle.Store(tr.state)
1174	}
1175
1176	// Notify the alloc runner of the transition
1177	tr.stateUpdater.TaskStateUpdated()
1178}
1179
1180// updateStateImpl updates the in-memory task state and persists to disk.
1181func (tr *TaskRunner) updateStateImpl(state string) error {
1182
1183	// Update the task state
1184	oldState := tr.state.State
1185	taskState := tr.state
1186	taskState.State = state
1187
1188	// Handle the state transition.
1189	switch state {
1190	case structs.TaskStateRunning:
1191		// Capture the start time if it is just starting
1192		if oldState != structs.TaskStateRunning {
1193			taskState.StartedAt = time.Now().UTC()
1194			metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"}, 1, tr.baseLabels)
1195		}
1196	case structs.TaskStateDead:
1197		// Capture the finished time if not already set
1198		if taskState.FinishedAt.IsZero() {
1199			taskState.FinishedAt = time.Now().UTC()
1200		}
1201
1202		// Emitting metrics to indicate task complete and failures
1203		if taskState.Failed {
1204			metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"}, 1, tr.baseLabels)
1205		} else {
1206			metrics.IncrCounterWithLabels([]string{"client", "allocs", "complete"}, 1, tr.baseLabels)
1207		}
1208	}
1209
1210	// Persist the state and event
1211	return tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState)
1212}
1213
1214// EmitEvent appends a new TaskEvent to this task's TaskState. The actual
1215// TaskState.State (pending, running, dead) is not changed. Use UpdateState to
1216// transition states.
1217// Events are persisted locally and sent to the server, but errors are simply
1218// logged. Use AppendEvent to simply add a new event.
1219func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
1220	tr.stateLock.Lock()
1221	defer tr.stateLock.Unlock()
1222
1223	tr.appendEvent(event)
1224
1225	if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil {
1226		// Only a warning because the next event/state-transition will
1227		// try to persist it again.
1228		tr.logger.Warn("error persisting event", "error", err, "event", event)
1229	}
1230
1231	// Notify the alloc runner of the event
1232	tr.stateUpdater.TaskStateUpdated()
1233}
1234
1235// AppendEvent appends a new TaskEvent to this task's TaskState. The actual
1236// TaskState.State (pending, running, dead) is not changed. Use UpdateState to
1237// transition states.
1238// Events are persisted locally and errors are simply logged. Use EmitEvent
1239// also update AllocRunner.
1240func (tr *TaskRunner) AppendEvent(event *structs.TaskEvent) {
1241	tr.stateLock.Lock()
1242	defer tr.stateLock.Unlock()
1243
1244	tr.appendEvent(event)
1245
1246	if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil {
1247		// Only a warning because the next event/state-transition will
1248		// try to persist it again.
1249		tr.logger.Warn("error persisting event", "error", err, "event", event)
1250	}
1251}
1252
1253// appendEvent to task's event slice. Caller must acquire stateLock.
1254func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error {
1255	// Ensure the event is populated with human readable strings
1256	event.PopulateEventDisplayMessage()
1257
1258	// Propagate failure from event to task state
1259	if event.FailsTask {
1260		tr.state.Failed = true
1261	}
1262
1263	// XXX This seems like a super awkward spot for this? Why not shouldRestart?
1264	// Update restart metrics
1265	if event.Type == structs.TaskRestarting {
1266		metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels)
1267		tr.state.Restarts++
1268		tr.state.LastRestart = time.Unix(0, event.Time)
1269	}
1270
1271	// Append event to slice
1272	appendTaskEvent(tr.state, event, tr.maxEvents)
1273
1274	return nil
1275}
1276
1277// WaitCh is closed when TaskRunner.Run exits.
1278func (tr *TaskRunner) WaitCh() <-chan struct{} {
1279	return tr.waitCh
1280}
1281
1282// Update the running allocation with a new version received from the server.
1283// Calls Update hooks asynchronously with Run.
1284//
1285// This method is safe for calling concurrently with Run and does not modify
1286// the passed in allocation.
1287func (tr *TaskRunner) Update(update *structs.Allocation) {
1288	task := update.LookupTask(tr.taskName)
1289	if task == nil {
1290		// This should not happen and likely indicates a bug in the
1291		// server or client.
1292		tr.logger.Error("allocation update is missing task; killing",
1293			"group", update.TaskGroup)
1294		te := structs.NewTaskEvent(structs.TaskKilled).
1295			SetKillReason("update missing task").
1296			SetFailsTask()
1297		tr.Kill(context.Background(), te)
1298		return
1299	}
1300
1301	// Update tr.alloc
1302	tr.setAlloc(update, task)
1303
1304	// Trigger update hooks if not terminal
1305	if !update.TerminalStatus() {
1306		tr.triggerUpdateHooks()
1307	}
1308}
1309
1310// SetNetworkIsolation is called by the PreRun allocation hook after configuring
1311// the network isolation for the allocation
1312func (tr *TaskRunner) SetNetworkIsolation(n *drivers.NetworkIsolationSpec) {
1313	tr.networkIsolationLock.Lock()
1314	tr.networkIsolationSpec = n
1315	tr.networkIsolationLock.Unlock()
1316}
1317
1318// triggerUpdate if there isn't already an update pending. Should be called
1319// instead of calling updateHooks directly to serialize runs of update hooks.
1320// TaskRunner state should be updated prior to triggering update hooks.
1321//
1322// Does not block.
1323func (tr *TaskRunner) triggerUpdateHooks() {
1324	select {
1325	case tr.triggerUpdateCh <- struct{}{}:
1326	default:
1327		// already an update hook pending
1328	}
1329}
1330
1331// Shutdown TaskRunner gracefully without affecting the state of the task.
1332// Shutdown blocks until the main Run loop exits.
1333func (tr *TaskRunner) Shutdown() {
1334	tr.logger.Trace("shutting down")
1335	tr.shutdownCtxCancel()
1336
1337	<-tr.WaitCh()
1338
1339	// Run shutdown hooks to cleanup
1340	tr.shutdownHooks()
1341
1342	// Persist once more
1343	tr.persistLocalState()
1344}
1345
1346// LatestResourceUsage returns the last resource utilization datapoint
1347// collected. May return nil if the task is not running or no resource
1348// utilization has been collected yet.
1349func (tr *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage {
1350	tr.resourceUsageLock.Lock()
1351	ru := tr.resourceUsage
1352	tr.resourceUsageLock.Unlock()
1353
1354	// Look up device statistics lazily when fetched, as currently we do not emit any stats for them yet
1355	if ru != nil && tr.deviceStatsReporter != nil {
1356		deviceResources := tr.taskResources.Devices
1357		ru.ResourceUsage.DeviceStats = tr.deviceStatsReporter.LatestDeviceResourceStats(deviceResources)
1358	}
1359	return ru
1360}
1361
1362// UpdateStats updates and emits the latest stats from the driver.
1363func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) {
1364	tr.resourceUsageLock.Lock()
1365	tr.resourceUsage = ru
1366	tr.resourceUsageLock.Unlock()
1367	if ru != nil {
1368		tr.emitStats(ru)
1369	}
1370}
1371
1372//TODO Remove Backwardscompat or use tr.Alloc()?
1373func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
1374	alloc := tr.Alloc()
1375	var allocatedMem float32
1376	if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
1377		// Convert to bytes to match other memory metrics
1378		allocatedMem = float32(taskRes.Memory.MemoryMB) * 1024 * 1024
1379	}
1380
1381	ms := ru.ResourceUsage.MemoryStats
1382
1383	publishMetric := func(v uint64, reported, measured string) {
1384		if v != 0 || helper.SliceStringContains(ms.Measured, measured) {
1385			metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", reported},
1386				float32(v), tr.baseLabels)
1387		}
1388	}
1389
1390	publishMetric(ms.RSS, "rss", "RSS")
1391	publishMetric(ms.Cache, "cache", "Cache")
1392	publishMetric(ms.Swap, "swap", "Swap")
1393	publishMetric(ms.Usage, "usage", "Usage")
1394	publishMetric(ms.MaxUsage, "max_usage", "Max Usage")
1395	publishMetric(ms.KernelUsage, "kernel_usage", "Kernel Usage")
1396	publishMetric(ms.KernelMaxUsage, "kernel_max_usage", "Kernel Max Usage")
1397	if allocatedMem > 0 {
1398		metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "allocated"},
1399			allocatedMem, tr.baseLabels)
1400	}
1401}
1402
1403//TODO Remove Backwardscompat or use tr.Alloc()?
1404func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) {
1405	alloc := tr.Alloc()
1406	var allocatedCPU float32
1407	if taskRes := alloc.AllocatedResources.Tasks[tr.taskName]; taskRes != nil {
1408		allocatedCPU = float32(taskRes.Cpu.CpuShares)
1409	}
1410
1411	metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"},
1412		float32(ru.ResourceUsage.CpuStats.Percent), tr.baseLabels)
1413	metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"},
1414		float32(ru.ResourceUsage.CpuStats.SystemMode), tr.baseLabels)
1415	metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"},
1416		float32(ru.ResourceUsage.CpuStats.UserMode), tr.baseLabels)
1417	metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"},
1418		float32(ru.ResourceUsage.CpuStats.ThrottledTime), tr.baseLabels)
1419	metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"},
1420		float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), tr.baseLabels)
1421	metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"},
1422		float32(ru.ResourceUsage.CpuStats.TotalTicks), tr.baseLabels)
1423	if allocatedCPU > 0 {
1424		metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "allocated"},
1425			allocatedCPU, tr.baseLabels)
1426	}
1427}
1428
1429// emitStats emits resource usage stats of tasks to remote metrics collector
1430// sinks
1431func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
1432	if !tr.clientConfig.PublishAllocationMetrics {
1433		return
1434	}
1435
1436	if ru.ResourceUsage.MemoryStats != nil {
1437		tr.setGaugeForMemory(ru)
1438	} else {
1439		tr.logger.Debug("Skipping memory stats for allocation", "reason", "MemoryStats is nil")
1440	}
1441
1442	if ru.ResourceUsage.CpuStats != nil {
1443		tr.setGaugeForCPU(ru)
1444	} else {
1445		tr.logger.Debug("Skipping cpu stats for allocation", "reason", "CpuStats is nil")
1446	}
1447}
1448
1449// appendTaskEvent updates the task status by appending the new event.
1450func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent, capacity int) {
1451	if state.Events == nil {
1452		state.Events = make([]*structs.TaskEvent, 1, capacity)
1453		state.Events[0] = event
1454		return
1455	}
1456
1457	// If we hit capacity, then shift it.
1458	if len(state.Events) == capacity {
1459		old := state.Events
1460		state.Events = make([]*structs.TaskEvent, 0, capacity)
1461		state.Events = append(state.Events, old[1:]...)
1462	}
1463
1464	state.Events = append(state.Events, event)
1465}
1466
1467func (tr *TaskRunner) TaskExecHandler() drivermanager.TaskExecHandler {
1468	// Check it is running
1469	handle := tr.getDriverHandle()
1470	if handle == nil {
1471		return nil
1472	}
1473	return handle.ExecStreaming
1474}
1475
1476func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) {
1477	return tr.driver.Capabilities()
1478}
1479
1480func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
1481	tr.allocHookResources = res
1482}
1483