1package taskrunner
2
3import (
4	"context"
5	"sync"
6	"time"
7
8	hclog "github.com/hashicorp/go-hclog"
9	"github.com/hashicorp/nomad/client/allocrunner/interfaces"
10	cstructs "github.com/hashicorp/nomad/client/structs"
11	"github.com/hashicorp/nomad/nomad/structs"
12	bstructs "github.com/hashicorp/nomad/plugins/base/structs"
13)
14
15// StatsUpdater is the interface required by the StatsHook to update stats.
16// Satisfied by TaskRunner.
17type StatsUpdater interface {
18	UpdateStats(*cstructs.TaskResourceUsage)
19}
20
21// statsHook manages the task stats collection goroutine.
22type statsHook struct {
23	updater  StatsUpdater
24	interval time.Duration
25
26	// cancel is called by Exited
27	cancel context.CancelFunc
28
29	mu sync.Mutex
30
31	logger hclog.Logger
32}
33
34func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook {
35	h := &statsHook{
36		updater:  su,
37		interval: interval,
38	}
39	h.logger = logger.Named(h.Name())
40	return h
41}
42
43func (*statsHook) Name() string {
44	return "stats_hook"
45}
46
47func (h *statsHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
48	h.mu.Lock()
49	defer h.mu.Unlock()
50
51	// This shouldn't happen, but better safe than risk leaking a goroutine
52	if h.cancel != nil {
53		h.logger.Debug("poststart called twice without exiting between")
54		h.cancel()
55	}
56
57	// Using a new context here because the existing context is for the scope of
58	// the Poststart request. If that context was used, stats collection would
59	// stop when the task was killed. It makes for more readable code and better
60	// follows the taskrunner hook model to create a new context that can be
61	// canceled on the Exited hook.
62	ctx, cancel := context.WithCancel(context.Background())
63	h.cancel = cancel
64	go h.collectResourceUsageStats(ctx, req.DriverStats)
65
66	return nil
67}
68
69func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
70	h.mu.Lock()
71	defer h.mu.Unlock()
72
73	if h.cancel == nil {
74		// No stats running
75		return nil
76	}
77
78	// Call cancel to stop stats collection
79	h.cancel()
80
81	// Clear cancel func so we don't double call for any reason
82	h.cancel = nil
83
84	return nil
85}
86
87// collectResourceUsageStats starts collecting resource usage stats of a Task.
88// Collection ends when the passed channel is closed
89func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) {
90
91MAIN:
92	ch, err := h.callStatsWithRetry(ctx, handle)
93	if err != nil {
94		return
95	}
96
97	for {
98		select {
99		case ru, ok := <-ch:
100			// if channel closes, re-establish a new one
101			if !ok {
102				goto MAIN
103			}
104
105			// Update stats on TaskRunner and emit them
106			h.updater.UpdateStats(ru)
107
108		case <-ctx.Done():
109			return
110		}
111	}
112}
113
114// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established
115// successfully.  Returns an error if it encounters a permanent error.
116//
117// It logs the errors with appropriate log levels; don't log returned error
118func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) {
119	var retry int
120
121MAIN:
122	if ctx.Err() != nil {
123		return nil, ctx.Err()
124	}
125
126	ch, err := handle.Stats(ctx, h.interval)
127	if err == nil {
128		return ch, nil
129	}
130
131	// Check if the driver doesn't implement stats
132	if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
133		h.logger.Debug("driver does not support stats")
134		return nil, err
135	}
136
137	// check if the error is terminal otherwise it's likely a
138	// transport error and we should retry
139	if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() {
140		h.logger.Error("failed to start stats collection for task with unrecoverable error", "error", err)
141		return nil, err
142	}
143
144	// We do not warn when the plugin is shutdown since this is
145	// likely because the driver plugin has unexpectedly exited,
146	// in which case sleeping and trying again or returning based
147	// on the stop channel is the correct behavior
148	if err == bstructs.ErrPluginShutdown {
149		h.logger.Debug("failed to fetching stats of task", "error", err)
150	} else {
151		h.logger.Error("failed to start stats collection for task", "error", err)
152	}
153
154	limit := time.Second * 5
155	backoff := 1 << (2 * uint64(retry)) * time.Second
156	if backoff > limit || retry > 5 {
157		backoff = limit
158	}
159
160	// Increment retry counter
161	retry++
162
163	time.Sleep(backoff)
164	goto MAIN
165}
166
167func (h *statsHook) Shutdown() {
168	h.mu.Lock()
169	defer h.mu.Unlock()
170
171	if h.cancel == nil {
172		return
173	}
174
175	h.cancel()
176}
177