1package taskrunner 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 hclog "github.com/hashicorp/go-hclog" 9 "github.com/hashicorp/nomad/client/allocrunner/interfaces" 10 cstructs "github.com/hashicorp/nomad/client/structs" 11 "github.com/hashicorp/nomad/nomad/structs" 12 bstructs "github.com/hashicorp/nomad/plugins/base/structs" 13) 14 15// StatsUpdater is the interface required by the StatsHook to update stats. 16// Satisfied by TaskRunner. 17type StatsUpdater interface { 18 UpdateStats(*cstructs.TaskResourceUsage) 19} 20 21// statsHook manages the task stats collection goroutine. 22type statsHook struct { 23 updater StatsUpdater 24 interval time.Duration 25 26 // cancel is called by Exited 27 cancel context.CancelFunc 28 29 mu sync.Mutex 30 31 logger hclog.Logger 32} 33 34func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook { 35 h := &statsHook{ 36 updater: su, 37 interval: interval, 38 } 39 h.logger = logger.Named(h.Name()) 40 return h 41} 42 43func (*statsHook) Name() string { 44 return "stats_hook" 45} 46 47func (h *statsHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error { 48 h.mu.Lock() 49 defer h.mu.Unlock() 50 51 // This shouldn't happen, but better safe than risk leaking a goroutine 52 if h.cancel != nil { 53 h.logger.Debug("poststart called twice without exiting between") 54 h.cancel() 55 } 56 57 // Using a new context here because the existing context is for the scope of 58 // the Poststart request. If that context was used, stats collection would 59 // stop when the task was killed. It makes for more readable code and better 60 // follows the taskrunner hook model to create a new context that can be 61 // canceled on the Exited hook. 62 ctx, cancel := context.WithCancel(context.Background()) 63 h.cancel = cancel 64 go h.collectResourceUsageStats(ctx, req.DriverStats) 65 66 return nil 67} 68 69func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error { 70 h.mu.Lock() 71 defer h.mu.Unlock() 72 73 if h.cancel == nil { 74 // No stats running 75 return nil 76 } 77 78 // Call cancel to stop stats collection 79 h.cancel() 80 81 // Clear cancel func so we don't double call for any reason 82 h.cancel = nil 83 84 return nil 85} 86 87// collectResourceUsageStats starts collecting resource usage stats of a Task. 88// Collection ends when the passed channel is closed 89func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) { 90 91MAIN: 92 ch, err := h.callStatsWithRetry(ctx, handle) 93 if err != nil { 94 return 95 } 96 97 for { 98 select { 99 case ru, ok := <-ch: 100 // if channel closes, re-establish a new one 101 if !ok { 102 goto MAIN 103 } 104 105 // Update stats on TaskRunner and emit them 106 h.updater.UpdateStats(ru) 107 108 case <-ctx.Done(): 109 return 110 } 111 } 112} 113 114// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established 115// successfully. Returns an error if it encounters a permanent error. 116// 117// It logs the errors with appropriate log levels; don't log returned error 118func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) { 119 var retry int 120 121MAIN: 122 if ctx.Err() != nil { 123 return nil, ctx.Err() 124 } 125 126 ch, err := handle.Stats(ctx, h.interval) 127 if err == nil { 128 return ch, nil 129 } 130 131 // Check if the driver doesn't implement stats 132 if err.Error() == cstructs.DriverStatsNotImplemented.Error() { 133 h.logger.Debug("driver does not support stats") 134 return nil, err 135 } 136 137 // check if the error is terminal otherwise it's likely a 138 // transport error and we should retry 139 if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() { 140 h.logger.Error("failed to start stats collection for task with unrecoverable error", "error", err) 141 return nil, err 142 } 143 144 // We do not warn when the plugin is shutdown since this is 145 // likely because the driver plugin has unexpectedly exited, 146 // in which case sleeping and trying again or returning based 147 // on the stop channel is the correct behavior 148 if err == bstructs.ErrPluginShutdown { 149 h.logger.Debug("failed to fetching stats of task", "error", err) 150 } else { 151 h.logger.Error("failed to start stats collection for task", "error", err) 152 } 153 154 limit := time.Second * 5 155 backoff := 1 << (2 * uint64(retry)) * time.Second 156 if backoff > limit || retry > 5 { 157 backoff = limit 158 } 159 160 // Increment retry counter 161 retry++ 162 163 time.Sleep(backoff) 164 goto MAIN 165} 166 167func (h *statsHook) Shutdown() { 168 h.mu.Lock() 169 defer h.mu.Unlock() 170 171 if h.cancel == nil { 172 return 173 } 174 175 h.cancel() 176} 177