1package taskreaper 2 3import ( 4 "sort" 5 "sync" 6 "time" 7 8 "github.com/docker/swarmkit/api" 9 "github.com/docker/swarmkit/log" 10 "github.com/docker/swarmkit/manager/orchestrator" 11 "github.com/docker/swarmkit/manager/state" 12 "github.com/docker/swarmkit/manager/state/store" 13 "golang.org/x/net/context" 14) 15 16const ( 17 // maxDirty is the size threshold for running a task pruning operation. 18 maxDirty = 1000 19 // reaperBatchingInterval is how often to prune old tasks. 20 reaperBatchingInterval = 250 * time.Millisecond 21) 22 23// A TaskReaper deletes old tasks when more than TaskHistoryRetentionLimit tasks 24// exist for the same service/instance or service/nodeid combination. 25type TaskReaper struct { 26 store *store.MemoryStore 27 28 // closeOnce ensures that stopChan is closed only once 29 closeOnce sync.Once 30 31 // taskHistory is the number of tasks to keep 32 taskHistory int64 33 34 // List of slot tubles to be inspected for task history cleanup. 35 dirty map[orchestrator.SlotTuple]struct{} 36 37 // List of tasks collected for cleanup, which includes two kinds of tasks 38 // - serviceless orphaned tasks 39 // - tasks with desired state REMOVE that have already been shut down 40 cleanup []string 41 stopChan chan struct{} 42 doneChan chan struct{} 43 44 // tickSignal is a channel that, if non-nil and available, will be written 45 // to to signal that a tick has occurred. its sole purpose is for testing 46 // code, to verify that take cleanup attempts are happening when they 47 // should be. 48 tickSignal chan struct{} 49} 50 51// New creates a new TaskReaper. 52func New(store *store.MemoryStore) *TaskReaper { 53 return &TaskReaper{ 54 store: store, 55 dirty: make(map[orchestrator.SlotTuple]struct{}), 56 stopChan: make(chan struct{}), 57 doneChan: make(chan struct{}), 58 } 59} 60 61// Run is the TaskReaper's watch loop which collects candidates for cleanup. 62// Task history is mainly used in task restarts but is also available for administrative purposes. 63// Note that the task history is stored per-slot-per-service for replicated services 64// and per-node-per-service for global services. History does not apply to serviceless 65// since they are not attached to a service. In addition, the TaskReaper watch loop is also 66// responsible for cleaning up tasks associated with slots that were removed as part of 67// service scale down or service removal. 68func (tr *TaskReaper) Run(ctx context.Context) { 69 watcher, watchCancel := state.Watch(tr.store.WatchQueue(), api.EventCreateTask{}, api.EventUpdateTask{}, api.EventUpdateCluster{}) 70 71 defer func() { 72 close(tr.doneChan) 73 watchCancel() 74 }() 75 76 var orphanedTasks []*api.Task 77 var removeTasks []*api.Task 78 tr.store.View(func(readTx store.ReadTx) { 79 var err error 80 81 clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName)) 82 if err == nil && len(clusters) == 1 { 83 tr.taskHistory = clusters[0].Spec.Orchestration.TaskHistoryRetentionLimit 84 } 85 86 // On startup, scan the entire store and inspect orphaned tasks from previous life. 87 orphanedTasks, err = store.FindTasks(readTx, store.ByTaskState(api.TaskStateOrphaned)) 88 if err != nil { 89 log.G(ctx).WithError(err).Error("failed to find Orphaned tasks in task reaper init") 90 } 91 removeTasks, err = store.FindTasks(readTx, store.ByDesiredState(api.TaskStateRemove)) 92 if err != nil { 93 log.G(ctx).WithError(err).Error("failed to find tasks with desired state REMOVE in task reaper init") 94 } 95 }) 96 97 if len(orphanedTasks)+len(removeTasks) > 0 { 98 for _, t := range orphanedTasks { 99 // Do not reap service tasks immediately. 100 // Let them go through the regular history cleanup process 101 // of checking TaskHistoryRetentionLimit. 102 if t.ServiceID != "" { 103 continue 104 } 105 106 // Serviceless tasks can be cleaned up right away since they are not attached to a service. 107 tr.cleanup = append(tr.cleanup, t.ID) 108 } 109 // tasks with desired state REMOVE that have progressed beyond COMPLETE or 110 // haven't been assigned yet can be cleaned up right away 111 for _, t := range removeTasks { 112 if t.Status.State < api.TaskStateAssigned || t.Status.State >= api.TaskStateCompleted { 113 tr.cleanup = append(tr.cleanup, t.ID) 114 } 115 } 116 // Clean up tasks in 'cleanup' right away 117 if len(tr.cleanup) > 0 { 118 tr.tick() 119 } 120 } 121 122 // Clean up when we hit TaskHistoryRetentionLimit or when the timer expires, 123 // whichever happens first. 124 // 125 // Specifically, the way this should work: 126 // - Create a timer and immediately stop it. We don't want to fire the 127 // cleanup routine yet, because we just did a cleanup as part of the 128 // initialization above. 129 // - Launch into an event loop 130 // - When we receive an event, handle the event as needed 131 // - After receiving the event: 132 // - If minimum batch size (maxDirty) is exceeded with dirty + cleanup, 133 // then immediately launch into the cleanup routine 134 // - Otherwise, if the timer is stopped, start it (reset). 135 // - If the timer expires and the timer channel is signaled, then Stop the 136 // timer (so that it will be ready to be started again as needed), and 137 // execute the cleanup routine (tick) 138 timer := time.NewTimer(reaperBatchingInterval) 139 timer.Stop() 140 141 // If stop is somehow called AFTER the timer has expired, there will be a 142 // value in the timer.C channel. If there is such a value, we should drain 143 // it out. This select statement allows us to drain that value if it's 144 // present, or continue straight through otherwise. 145 select { 146 case <-timer.C: 147 default: 148 } 149 150 // keep track with a boolean of whether the timer is currently stopped 151 isTimerStopped := true 152 153 // Watch for: 154 // 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot. 155 // 2. EventUpdateTask for cleaning 156 // - serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED) 157 // - tasks which have desired state REMOVE and have been shut down by the agent 158 // (these are tasks which are associated with slots removed as part of service 159 // remove or scale down) 160 // 3. EventUpdateCluster for TaskHistoryRetentionLimit update. 161 for { 162 select { 163 case event := <-watcher: 164 switch v := event.(type) { 165 case api.EventCreateTask: 166 t := v.Task 167 tr.dirty[orchestrator.SlotTuple{ 168 Slot: t.Slot, 169 ServiceID: t.ServiceID, 170 NodeID: t.NodeID, 171 }] = struct{}{} 172 case api.EventUpdateTask: 173 t := v.Task 174 // add serviceless orphaned tasks 175 if t.Status.State >= api.TaskStateOrphaned && t.ServiceID == "" { 176 tr.cleanup = append(tr.cleanup, t.ID) 177 } 178 // add tasks that are yet unassigned or have progressed beyond COMPLETE, with 179 // desired state REMOVE. These tasks are associated with slots that were removed 180 // as part of a service scale down or service removal. 181 if t.DesiredState == api.TaskStateRemove && (t.Status.State < api.TaskStateAssigned || t.Status.State >= api.TaskStateCompleted) { 182 tr.cleanup = append(tr.cleanup, t.ID) 183 } 184 case api.EventUpdateCluster: 185 tr.taskHistory = v.Cluster.Spec.Orchestration.TaskHistoryRetentionLimit 186 } 187 188 if len(tr.dirty)+len(tr.cleanup) > maxDirty { 189 // stop the timer, so we don't fire it. if we get another event 190 // after we do this cleaning, we will reset the timer then 191 timer.Stop() 192 // if the timer had fired, drain out the value. 193 select { 194 case <-timer.C: 195 default: 196 } 197 isTimerStopped = true 198 tr.tick() 199 } else { 200 if isTimerStopped { 201 timer.Reset(reaperBatchingInterval) 202 isTimerStopped = false 203 } 204 } 205 case <-timer.C: 206 // we can safely ignore draining off of the timer channel, because 207 // we already know that the timer has expired. 208 isTimerStopped = true 209 tr.tick() 210 case <-tr.stopChan: 211 // even though this doesn't really matter in this context, it's 212 // good hygiene to drain the value. 213 timer.Stop() 214 select { 215 case <-timer.C: 216 default: 217 } 218 return 219 } 220 } 221} 222 223// taskInTerminalState returns true if task is in a terminal state. 224func taskInTerminalState(task *api.Task) bool { 225 return task.Status.State > api.TaskStateRunning 226} 227 228// taskWillNeverRun returns true if task will never reach running state. 229func taskWillNeverRun(task *api.Task) bool { 230 return task.Status.State < api.TaskStateAssigned && task.DesiredState > api.TaskStateRunning 231} 232 233// tick performs task history cleanup. 234func (tr *TaskReaper) tick() { 235 // this signals that a tick has occurred. it exists solely for testing. 236 if tr.tickSignal != nil { 237 // try writing to this channel, but if it's full, fall straight through 238 // and ignore it. 239 select { 240 case tr.tickSignal <- struct{}{}: 241 default: 242 } 243 } 244 245 if len(tr.dirty) == 0 && len(tr.cleanup) == 0 { 246 return 247 } 248 249 defer func() { 250 tr.cleanup = nil 251 }() 252 253 deleteTasks := make(map[string]struct{}) 254 for _, tID := range tr.cleanup { 255 deleteTasks[tID] = struct{}{} 256 } 257 258 // Check history of dirty tasks for cleanup. 259 // Note: Clean out the dirty set at the end of this tick iteration 260 // in all but one scenarios (documented below). 261 // When tick() finishes, the tasks in the slot were either cleaned up, 262 // or it was skipped because it didn't meet the criteria for cleaning. 263 // Either way, we can discard the dirty set because future events on 264 // that slot will cause the task to be readded to the dirty set 265 // at that point. 266 // 267 // The only case when we keep the slot dirty is when there are more 268 // than one running tasks present for a given slot. 269 // In that case, we need to keep the slot dirty to allow it to be 270 // cleaned when tick() is called next and one or more the tasks 271 // in that slot have stopped running. 272 tr.store.View(func(tx store.ReadTx) { 273 for dirty := range tr.dirty { 274 service := store.GetService(tx, dirty.ServiceID) 275 if service == nil { 276 delete(tr.dirty, dirty) 277 continue 278 } 279 280 taskHistory := tr.taskHistory 281 282 // If MaxAttempts is set, keep at least one more than 283 // that number of tasks (this overrides TaskHistoryRetentionLimit). 284 // This is necessary to reconstruct restart history when the orchestrator starts up. 285 // TODO(aaronl): Consider hiding tasks beyond the normal 286 // retention limit in the UI. 287 // TODO(aaronl): There are some ways to cut down the 288 // number of retained tasks at the cost of more 289 // complexity: 290 // - Don't force retention of tasks with an older spec 291 // version. 292 // - Don't force retention of tasks outside of the 293 // time window configured for restart lookback. 294 if service.Spec.Task.Restart != nil && service.Spec.Task.Restart.MaxAttempts > 0 { 295 taskHistory = int64(service.Spec.Task.Restart.MaxAttempts) + 1 296 } 297 298 // Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history. 299 if taskHistory < 0 { 300 delete(tr.dirty, dirty) 301 continue 302 } 303 304 var historicTasks []*api.Task 305 306 switch service.Spec.GetMode().(type) { 307 case *api.ServiceSpec_Replicated: 308 // Clean out the slot for which we received EventCreateTask. 309 var err error 310 historicTasks, err = store.FindTasks(tx, store.BySlot(dirty.ServiceID, dirty.Slot)) 311 if err != nil { 312 continue 313 } 314 315 case *api.ServiceSpec_Global: 316 // Clean out the node history in case of global services. 317 tasksByNode, err := store.FindTasks(tx, store.ByNodeID(dirty.NodeID)) 318 if err != nil { 319 continue 320 } 321 322 for _, t := range tasksByNode { 323 if t.ServiceID == dirty.ServiceID { 324 historicTasks = append(historicTasks, t) 325 } 326 } 327 } 328 329 if int64(len(historicTasks)) <= taskHistory { 330 delete(tr.dirty, dirty) 331 continue 332 } 333 334 // TODO(aaronl): This could filter for non-running tasks and use quickselect 335 // instead of sorting the whole slice. 336 // TODO(aaronl): This sort should really use lamport time instead of wall 337 // clock time. We should store a Version in the Status field. 338 sort.Sort(orchestrator.TasksByTimestamp(historicTasks)) 339 340 runningTasks := 0 341 for _, t := range historicTasks { 342 // Historical tasks can be considered for cleanup if: 343 // 1. The task has reached a terminal state i.e. actual state beyond TaskStateRunning. 344 // 2. The task has not yet become running and desired state is a terminal state i.e. 345 // actual state not yet TaskStateAssigned and desired state beyond TaskStateRunning. 346 if taskInTerminalState(t) || taskWillNeverRun(t) { 347 deleteTasks[t.ID] = struct{}{} 348 349 taskHistory++ 350 if int64(len(historicTasks)) <= taskHistory { 351 break 352 } 353 } else { 354 // all other tasks are counted as running. 355 runningTasks++ 356 } 357 } 358 359 // The only case when we keep the slot dirty at the end of tick() 360 // is when there are more than one running tasks present 361 // for a given slot. 362 // In that case, we keep the slot dirty to allow it to be 363 // cleaned when tick() is called next and one or more of 364 // the tasks in that slot have stopped running. 365 if runningTasks <= 1 { 366 delete(tr.dirty, dirty) 367 } 368 } 369 }) 370 371 // Perform cleanup. 372 if len(deleteTasks) > 0 { 373 tr.store.Batch(func(batch *store.Batch) error { 374 for taskID := range deleteTasks { 375 batch.Update(func(tx store.Tx) error { 376 return store.DeleteTask(tx, taskID) 377 }) 378 } 379 return nil 380 }) 381 } 382} 383 384// Stop stops the TaskReaper and waits for the main loop to exit. 385// Stop can be called in two cases. One when the manager is 386// shutting down, and the other when the manager (the leader) is 387// becoming a follower. Since these two instances could race with 388// each other, we use closeOnce here to ensure that TaskReaper.Stop() 389// is called only once to avoid a panic. 390func (tr *TaskReaper) Stop() { 391 tr.closeOnce.Do(func() { 392 close(tr.stopChan) 393 }) 394 <-tr.doneChan 395} 396