1package taskreaper
2
3import (
4	"context"
5	"sort"
6	"sync"
7	"time"
8
9	"github.com/docker/swarmkit/api"
10	"github.com/docker/swarmkit/log"
11	"github.com/docker/swarmkit/manager/orchestrator"
12	"github.com/docker/swarmkit/manager/state"
13	"github.com/docker/swarmkit/manager/state/store"
14)
15
16const (
17	// maxDirty is the size threshold for running a task pruning operation.
18	maxDirty = 1000
19	// reaperBatchingInterval is how often to prune old tasks.
20	reaperBatchingInterval = 250 * time.Millisecond
21)
22
23// A TaskReaper deletes old tasks when more than TaskHistoryRetentionLimit tasks
24// exist for the same service/instance or service/nodeid combination.
25type TaskReaper struct {
26	store *store.MemoryStore
27
28	// closeOnce ensures that stopChan is closed only once
29	closeOnce sync.Once
30
31	// taskHistory is the number of tasks to keep
32	taskHistory int64
33
34	// List of slot tuples to be inspected for task history cleanup.
35	dirty map[orchestrator.SlotTuple]struct{}
36
37	// List of tasks collected for cleanup, which includes two kinds of tasks
38	// - serviceless orphaned tasks
39	// - tasks with desired state REMOVE that have already been shut down
40	cleanup  []string
41	stopChan chan struct{}
42	doneChan chan struct{}
43
44	// tickSignal is a channel that, if non-nil and available, will be written
45	// to to signal that a tick has occurred. its sole purpose is for testing
46	// code, to verify that take cleanup attempts are happening when they
47	// should be.
48	tickSignal chan struct{}
49}
50
51// New creates a new TaskReaper.
52func New(store *store.MemoryStore) *TaskReaper {
53	return &TaskReaper{
54		store:    store,
55		dirty:    make(map[orchestrator.SlotTuple]struct{}),
56		stopChan: make(chan struct{}),
57		doneChan: make(chan struct{}),
58	}
59}
60
61// Run is the TaskReaper's watch loop which collects candidates for cleanup.
62// Task history is mainly used in task restarts but is also available for administrative purposes.
63// Note that the task history is stored per-slot-per-service for replicated services
64// and per-node-per-service for global services. History does not apply to serviceless tasks
65// since they are not attached to a service. In addition, the TaskReaper watch loop is also
66// responsible for cleaning up tasks associated with slots that were removed as part of
67// service scale down or service removal.
68func (tr *TaskReaper) Run(ctx context.Context) {
69	watcher, watchCancel := state.Watch(tr.store.WatchQueue(), api.EventCreateTask{}, api.EventUpdateTask{}, api.EventUpdateCluster{})
70
71	defer func() {
72		close(tr.doneChan)
73		watchCancel()
74	}()
75
76	var orphanedTasks []*api.Task
77	var removeTasks []*api.Task
78	tr.store.View(func(readTx store.ReadTx) {
79		var err error
80
81		clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
82		if err == nil && len(clusters) == 1 {
83			tr.taskHistory = clusters[0].Spec.Orchestration.TaskHistoryRetentionLimit
84		}
85
86		// On startup, scan the entire store and inspect orphaned tasks from previous life.
87		orphanedTasks, err = store.FindTasks(readTx, store.ByTaskState(api.TaskStateOrphaned))
88		if err != nil {
89			log.G(ctx).WithError(err).Error("failed to find Orphaned tasks in task reaper init")
90		}
91		removeTasks, err = store.FindTasks(readTx, store.ByDesiredState(api.TaskStateRemove))
92		if err != nil {
93			log.G(ctx).WithError(err).Error("failed to find tasks with desired state REMOVE in task reaper init")
94		}
95	})
96
97	if len(orphanedTasks)+len(removeTasks) > 0 {
98		for _, t := range orphanedTasks {
99			// Do not reap service tasks immediately.
100			// Let them go through the regular history cleanup process
101			// of checking TaskHistoryRetentionLimit.
102			if t.ServiceID != "" {
103				continue
104			}
105
106			// Serviceless tasks can be cleaned up right away since they are not attached to a service.
107			tr.cleanup = append(tr.cleanup, t.ID)
108		}
109		// tasks with desired state REMOVE that have progressed beyond COMPLETE or
110		// haven't been assigned yet can be cleaned up right away
111		for _, t := range removeTasks {
112			if t.Status.State < api.TaskStateAssigned || t.Status.State >= api.TaskStateCompleted {
113				tr.cleanup = append(tr.cleanup, t.ID)
114			}
115		}
116		// Clean up tasks in 'cleanup' right away
117		if len(tr.cleanup) > 0 {
118			tr.tick()
119		}
120	}
121
122	// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
123	// whichever happens first.
124	//
125	// Specifically, the way this should work:
126	// - Create a timer and immediately stop it. We don't want to fire the
127	//   cleanup routine yet, because we just did a cleanup as part of the
128	//   initialization above.
129	// - Launch into an event loop
130	// - When we receive an event, handle the event as needed
131	// - After receiving the event:
132	//   - If minimum batch size (maxDirty) is exceeded with dirty + cleanup,
133	//     then immediately launch into the cleanup routine
134	//   - Otherwise, if the timer is stopped, start it (reset).
135	// - If the timer expires and the timer channel is signaled, then Stop the
136	//   timer (so that it will be ready to be started again as needed), and
137	//   execute the cleanup routine (tick)
138	timer := time.NewTimer(reaperBatchingInterval)
139	timer.Stop()
140
141	// If stop is somehow called AFTER the timer has expired, there will be a
142	// value in the timer.C channel. If there is such a value, we should drain
143	// it out. This select statement allows us to drain that value if it's
144	// present, or continue straight through otherwise.
145	select {
146	case <-timer.C:
147	default:
148	}
149
150	// keep track with a boolean of whether the timer is currently stopped
151	isTimerStopped := true
152
153	// Watch for:
154	// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
155	// 2. EventUpdateTask for cleaning
156	//    - serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED)
157	//    - tasks which have desired state REMOVE and have been shut down by the agent
158	//      (these are tasks which are associated with slots removed as part of service
159	//       remove or scale down)
160	// 3. EventUpdateCluster for TaskHistoryRetentionLimit update.
161	for {
162		select {
163		case event := <-watcher:
164			switch v := event.(type) {
165			case api.EventCreateTask:
166				t := v.Task
167				tr.dirty[orchestrator.SlotTuple{
168					Slot:      t.Slot,
169					ServiceID: t.ServiceID,
170					NodeID:    t.NodeID,
171				}] = struct{}{}
172			case api.EventUpdateTask:
173				t := v.Task
174				// add serviceless orphaned tasks
175				if t.Status.State >= api.TaskStateOrphaned && t.ServiceID == "" {
176					tr.cleanup = append(tr.cleanup, t.ID)
177				}
178				// add tasks that are yet unassigned or have progressed beyond COMPLETE, with
179				// desired state REMOVE. These tasks are associated with slots that were removed
180				// as part of a service scale down or service removal.
181				if t.DesiredState == api.TaskStateRemove && (t.Status.State < api.TaskStateAssigned || t.Status.State >= api.TaskStateCompleted) {
182					tr.cleanup = append(tr.cleanup, t.ID)
183				}
184			case api.EventUpdateCluster:
185				tr.taskHistory = v.Cluster.Spec.Orchestration.TaskHistoryRetentionLimit
186			}
187
188			if len(tr.dirty)+len(tr.cleanup) > maxDirty {
189				// stop the timer, so we don't fire it. if we get another event
190				// after we do this cleaning, we will reset the timer then
191				timer.Stop()
192				// if the timer had fired, drain out the value.
193				select {
194				case <-timer.C:
195				default:
196				}
197				isTimerStopped = true
198				tr.tick()
199			} else if isTimerStopped {
200				timer.Reset(reaperBatchingInterval)
201				isTimerStopped = false
202			}
203		case <-timer.C:
204			// we can safely ignore draining off of the timer channel, because
205			// we already know that the timer has expired.
206			isTimerStopped = true
207			tr.tick()
208		case <-tr.stopChan:
209			// even though this doesn't really matter in this context, it's
210			// good hygiene to drain the value.
211			timer.Stop()
212			select {
213			case <-timer.C:
214			default:
215			}
216			return
217		}
218	}
219}
220
221// taskInTerminalState returns true if task is in a terminal state.
222func taskInTerminalState(task *api.Task) bool {
223	return task.Status.State > api.TaskStateRunning
224}
225
226// taskWillNeverRun returns true if task will never reach running state.
227func taskWillNeverRun(task *api.Task) bool {
228	return task.Status.State < api.TaskStateAssigned && task.DesiredState > api.TaskStateRunning
229}
230
231// tick performs task history cleanup.
232func (tr *TaskReaper) tick() {
233	// this signals that a tick has occurred. it exists solely for testing.
234	if tr.tickSignal != nil {
235		// try writing to this channel, but if it's full, fall straight through
236		// and ignore it.
237		select {
238		case tr.tickSignal <- struct{}{}:
239		default:
240		}
241	}
242
243	if len(tr.dirty) == 0 && len(tr.cleanup) == 0 {
244		return
245	}
246
247	defer func() {
248		tr.cleanup = nil
249	}()
250
251	deleteTasks := make(map[string]struct{})
252	for _, tID := range tr.cleanup {
253		deleteTasks[tID] = struct{}{}
254	}
255
256	// Check history of dirty tasks for cleanup.
257	// Note: Clean out the dirty set at the end of this tick iteration
258	// in all but one scenarios (documented below).
259	// When tick() finishes, the tasks in the slot were either cleaned up,
260	// or it was skipped because it didn't meet the criteria for cleaning.
261	// Either way, we can discard the dirty set because future events on
262	// that slot will cause the task to be readded to the dirty set
263	// at that point.
264	//
265	// The only case when we keep the slot dirty is when there are more
266	// than one running tasks present for a given slot.
267	// In that case, we need to keep the slot dirty to allow it to be
268	// cleaned when tick() is called next and one or more the tasks
269	// in that slot have stopped running.
270	tr.store.View(func(tx store.ReadTx) {
271		for dirty := range tr.dirty {
272			service := store.GetService(tx, dirty.ServiceID)
273			if service == nil {
274				delete(tr.dirty, dirty)
275				continue
276			}
277
278			taskHistory := tr.taskHistory
279
280			// If MaxAttempts is set, keep at least one more than
281			// that number of tasks (this overrides TaskHistoryRetentionLimit).
282			// This is necessary to reconstruct restart history when the orchestrator starts up.
283			// TODO(aaronl): Consider hiding tasks beyond the normal
284			// retention limit in the UI.
285			// TODO(aaronl): There are some ways to cut down the
286			// number of retained tasks at the cost of more
287			// complexity:
288			//   - Don't force retention of tasks with an older spec
289			//     version.
290			//   - Don't force retention of tasks outside of the
291			//     time window configured for restart lookback.
292			if service.Spec.Task.Restart != nil && service.Spec.Task.Restart.MaxAttempts > 0 {
293				taskHistory = int64(service.Spec.Task.Restart.MaxAttempts) + 1
294			}
295
296			// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
297			if taskHistory < 0 {
298				delete(tr.dirty, dirty)
299				continue
300			}
301
302			var historicTasks []*api.Task
303
304			switch service.Spec.GetMode().(type) {
305			case *api.ServiceSpec_Replicated:
306				// Clean out the slot for which we received EventCreateTask.
307				var err error
308				historicTasks, err = store.FindTasks(tx, store.BySlot(dirty.ServiceID, dirty.Slot))
309				if err != nil {
310					continue
311				}
312
313			case *api.ServiceSpec_Global:
314				// Clean out the node history in case of global services.
315				tasksByNode, err := store.FindTasks(tx, store.ByNodeID(dirty.NodeID))
316				if err != nil {
317					continue
318				}
319
320				for _, t := range tasksByNode {
321					if t.ServiceID == dirty.ServiceID {
322						historicTasks = append(historicTasks, t)
323					}
324				}
325			}
326
327			if int64(len(historicTasks)) <= taskHistory {
328				delete(tr.dirty, dirty)
329				continue
330			}
331
332			// TODO(aaronl): This could filter for non-running tasks and use quickselect
333			// instead of sorting the whole slice.
334			// TODO(aaronl): This sort should really use lamport time instead of wall
335			// clock time. We should store a Version in the Status field.
336			sort.Sort(orchestrator.TasksByTimestamp(historicTasks))
337
338			runningTasks := 0
339			for _, t := range historicTasks {
340				// Historical tasks can be considered for cleanup if:
341				// 1. The task has reached a terminal state i.e. actual state beyond TaskStateRunning.
342				// 2. The task has not yet become running and desired state is a terminal state i.e.
343				// actual state not yet TaskStateAssigned and desired state beyond TaskStateRunning.
344				if taskInTerminalState(t) || taskWillNeverRun(t) {
345					deleteTasks[t.ID] = struct{}{}
346
347					taskHistory++
348					if int64(len(historicTasks)) <= taskHistory {
349						break
350					}
351				} else {
352					// all other tasks are counted as running.
353					runningTasks++
354				}
355			}
356
357			// The only case when we keep the slot dirty at the end of tick()
358			// is when there are more than one running tasks present
359			// for a given slot.
360			// In that case, we keep the slot dirty to allow it to be
361			// cleaned when tick() is called next and one or more of
362			// the tasks in that slot have stopped running.
363			if runningTasks <= 1 {
364				delete(tr.dirty, dirty)
365			}
366		}
367	})
368
369	// Perform cleanup.
370	if len(deleteTasks) > 0 {
371		tr.store.Batch(func(batch *store.Batch) error {
372			for taskID := range deleteTasks {
373				batch.Update(func(tx store.Tx) error {
374					return store.DeleteTask(tx, taskID)
375				})
376			}
377			return nil
378		})
379	}
380}
381
382// Stop stops the TaskReaper and waits for the main loop to exit.
383// Stop can be called in two cases. One when the manager is
384// shutting down, and the other when the manager (the leader) is
385// becoming a follower. Since these two instances could race with
386// each other, we use closeOnce here to ensure that TaskReaper.Stop()
387// is called only once to avoid a panic.
388func (tr *TaskReaper) Stop() {
389	tr.closeOnce.Do(func() {
390		close(tr.stopChan)
391	})
392	<-tr.doneChan
393}
394