1package taskreaper
2
3import (
4	"sort"
5	"sync"
6	"time"
7
8	"github.com/docker/swarmkit/api"
9	"github.com/docker/swarmkit/log"
10	"github.com/docker/swarmkit/manager/orchestrator"
11	"github.com/docker/swarmkit/manager/state"
12	"github.com/docker/swarmkit/manager/state/store"
13	"golang.org/x/net/context"
14)
15
16const (
17	// maxDirty is the size threshold for running a task pruning operation.
18	maxDirty = 1000
19	// reaperBatchingInterval is how often to prune old tasks.
20	reaperBatchingInterval = 250 * time.Millisecond
21)
22
23// A TaskReaper deletes old tasks when more than TaskHistoryRetentionLimit tasks
24// exist for the same service/instance or service/nodeid combination.
25type TaskReaper struct {
26	store *store.MemoryStore
27
28	// closeOnce ensures that stopChan is closed only once
29	closeOnce sync.Once
30
31	// taskHistory is the number of tasks to keep
32	taskHistory int64
33
34	// List of slot tubles to be inspected for task history cleanup.
35	dirty map[orchestrator.SlotTuple]struct{}
36
37	// List of tasks collected for cleanup, which includes two kinds of tasks
38	// - serviceless orphaned tasks
39	// - tasks with desired state REMOVE that have already been shut down
40	cleanup  []string
41	stopChan chan struct{}
42	doneChan chan struct{}
43
44	// tickSignal is a channel that, if non-nil and available, will be written
45	// to to signal that a tick has occurred. its sole purpose is for testing
46	// code, to verify that take cleanup attempts are happening when they
47	// should be.
48	tickSignal chan struct{}
49}
50
51// New creates a new TaskReaper.
52func New(store *store.MemoryStore) *TaskReaper {
53	return &TaskReaper{
54		store:    store,
55		dirty:    make(map[orchestrator.SlotTuple]struct{}),
56		stopChan: make(chan struct{}),
57		doneChan: make(chan struct{}),
58	}
59}
60
61// Run is the TaskReaper's watch loop which collects candidates for cleanup.
62// Task history is mainly used in task restarts but is also available for administrative purposes.
63// Note that the task history is stored per-slot-per-service for replicated services
64// and per-node-per-service for global services. History does not apply to serviceless
65// since they are not attached to a service. In addition, the TaskReaper watch loop is also
66// responsible for cleaning up tasks associated with slots that were removed as part of
67// service scale down or service removal.
68func (tr *TaskReaper) Run(ctx context.Context) {
69	watcher, watchCancel := state.Watch(tr.store.WatchQueue(), api.EventCreateTask{}, api.EventUpdateTask{}, api.EventUpdateCluster{})
70
71	defer func() {
72		close(tr.doneChan)
73		watchCancel()
74	}()
75
76	var orphanedTasks []*api.Task
77	var removeTasks []*api.Task
78	tr.store.View(func(readTx store.ReadTx) {
79		var err error
80
81		clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
82		if err == nil && len(clusters) == 1 {
83			tr.taskHistory = clusters[0].Spec.Orchestration.TaskHistoryRetentionLimit
84		}
85
86		// On startup, scan the entire store and inspect orphaned tasks from previous life.
87		orphanedTasks, err = store.FindTasks(readTx, store.ByTaskState(api.TaskStateOrphaned))
88		if err != nil {
89			log.G(ctx).WithError(err).Error("failed to find Orphaned tasks in task reaper init")
90		}
91		removeTasks, err = store.FindTasks(readTx, store.ByDesiredState(api.TaskStateRemove))
92		if err != nil {
93			log.G(ctx).WithError(err).Error("failed to find tasks with desired state REMOVE in task reaper init")
94		}
95	})
96
97	if len(orphanedTasks)+len(removeTasks) > 0 {
98		for _, t := range orphanedTasks {
99			// Do not reap service tasks immediately.
100			// Let them go through the regular history cleanup process
101			// of checking TaskHistoryRetentionLimit.
102			if t.ServiceID != "" {
103				continue
104			}
105
106			// Serviceless tasks can be cleaned up right away since they are not attached to a service.
107			tr.cleanup = append(tr.cleanup, t.ID)
108		}
109		// tasks with desired state REMOVE that have progressed beyond COMPLETE or
110		// haven't been assigned yet can be cleaned up right away
111		for _, t := range removeTasks {
112			if t.Status.State < api.TaskStateAssigned || t.Status.State >= api.TaskStateCompleted {
113				tr.cleanup = append(tr.cleanup, t.ID)
114			}
115		}
116		// Clean up tasks in 'cleanup' right away
117		if len(tr.cleanup) > 0 {
118			tr.tick()
119		}
120	}
121
122	// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
123	// whichever happens first.
124	//
125	// Specifically, the way this should work:
126	// - Create a timer and immediately stop it. We don't want to fire the
127	//   cleanup routine yet, because we just did a cleanup as part of the
128	//   initialization above.
129	// - Launch into an event loop
130	// - When we receive an event, handle the event as needed
131	// - After receiving the event:
132	//   - If minimum batch size (maxDirty) is exceeded with dirty + cleanup,
133	//     then immediately launch into the cleanup routine
134	//   - Otherwise, if the timer is stopped, start it (reset).
135	// - If the timer expires and the timer channel is signaled, then Stop the
136	//   timer (so that it will be ready to be started again as needed), and
137	//   execute the cleanup routine (tick)
138	timer := time.NewTimer(reaperBatchingInterval)
139	timer.Stop()
140
141	// If stop is somehow called AFTER the timer has expired, there will be a
142	// value in the timer.C channel. If there is such a value, we should drain
143	// it out. This select statement allows us to drain that value if it's
144	// present, or continue straight through otherwise.
145	select {
146	case <-timer.C:
147	default:
148	}
149
150	// keep track with a boolean of whether the timer is currently stopped
151	isTimerStopped := true
152
153	// Watch for:
154	// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
155	// 2. EventUpdateTask for cleaning
156	//    - serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED)
157	//    - tasks which have desired state REMOVE and have been shut down by the agent
158	//      (these are tasks which are associated with slots removed as part of service
159	//       remove or scale down)
160	// 3. EventUpdateCluster for TaskHistoryRetentionLimit update.
161	for {
162		select {
163		case event := <-watcher:
164			switch v := event.(type) {
165			case api.EventCreateTask:
166				t := v.Task
167				tr.dirty[orchestrator.SlotTuple{
168					Slot:      t.Slot,
169					ServiceID: t.ServiceID,
170					NodeID:    t.NodeID,
171				}] = struct{}{}
172			case api.EventUpdateTask:
173				t := v.Task
174				// add serviceless orphaned tasks
175				if t.Status.State >= api.TaskStateOrphaned && t.ServiceID == "" {
176					tr.cleanup = append(tr.cleanup, t.ID)
177				}
178				// add tasks that are yet unassigned or have progressed beyond COMPLETE, with
179				// desired state REMOVE. These tasks are associated with slots that were removed
180				// as part of a service scale down or service removal.
181				if t.DesiredState == api.TaskStateRemove && (t.Status.State < api.TaskStateAssigned || t.Status.State >= api.TaskStateCompleted) {
182					tr.cleanup = append(tr.cleanup, t.ID)
183				}
184			case api.EventUpdateCluster:
185				tr.taskHistory = v.Cluster.Spec.Orchestration.TaskHistoryRetentionLimit
186			}
187
188			if len(tr.dirty)+len(tr.cleanup) > maxDirty {
189				// stop the timer, so we don't fire it. if we get another event
190				// after we do this cleaning, we will reset the timer then
191				timer.Stop()
192				// if the timer had fired, drain out the value.
193				select {
194				case <-timer.C:
195				default:
196				}
197				isTimerStopped = true
198				tr.tick()
199			} else {
200				if isTimerStopped {
201					timer.Reset(reaperBatchingInterval)
202					isTimerStopped = false
203				}
204			}
205		case <-timer.C:
206			// we can safely ignore draining off of the timer channel, because
207			// we already know that the timer has expired.
208			isTimerStopped = true
209			tr.tick()
210		case <-tr.stopChan:
211			// even though this doesn't really matter in this context, it's
212			// good hygiene to drain the value.
213			timer.Stop()
214			select {
215			case <-timer.C:
216			default:
217			}
218			return
219		}
220	}
221}
222
223// taskInTerminalState returns true if task is in a terminal state.
224func taskInTerminalState(task *api.Task) bool {
225	return task.Status.State > api.TaskStateRunning
226}
227
228// taskWillNeverRun returns true if task will never reach running state.
229func taskWillNeverRun(task *api.Task) bool {
230	return task.Status.State < api.TaskStateAssigned && task.DesiredState > api.TaskStateRunning
231}
232
233// tick performs task history cleanup.
234func (tr *TaskReaper) tick() {
235	// this signals that a tick has occurred. it exists solely for testing.
236	if tr.tickSignal != nil {
237		// try writing to this channel, but if it's full, fall straight through
238		// and ignore it.
239		select {
240		case tr.tickSignal <- struct{}{}:
241		default:
242		}
243	}
244
245	if len(tr.dirty) == 0 && len(tr.cleanup) == 0 {
246		return
247	}
248
249	defer func() {
250		tr.cleanup = nil
251	}()
252
253	deleteTasks := make(map[string]struct{})
254	for _, tID := range tr.cleanup {
255		deleteTasks[tID] = struct{}{}
256	}
257
258	// Check history of dirty tasks for cleanup.
259	// Note: Clean out the dirty set at the end of this tick iteration
260	// in all but one scenarios (documented below).
261	// When tick() finishes, the tasks in the slot were either cleaned up,
262	// or it was skipped because it didn't meet the criteria for cleaning.
263	// Either way, we can discard the dirty set because future events on
264	// that slot will cause the task to be readded to the dirty set
265	// at that point.
266	//
267	// The only case when we keep the slot dirty is when there are more
268	// than one running tasks present for a given slot.
269	// In that case, we need to keep the slot dirty to allow it to be
270	// cleaned when tick() is called next and one or more the tasks
271	// in that slot have stopped running.
272	tr.store.View(func(tx store.ReadTx) {
273		for dirty := range tr.dirty {
274			service := store.GetService(tx, dirty.ServiceID)
275			if service == nil {
276				delete(tr.dirty, dirty)
277				continue
278			}
279
280			taskHistory := tr.taskHistory
281
282			// If MaxAttempts is set, keep at least one more than
283			// that number of tasks (this overrides TaskHistoryRetentionLimit).
284			// This is necessary to reconstruct restart history when the orchestrator starts up.
285			// TODO(aaronl): Consider hiding tasks beyond the normal
286			// retention limit in the UI.
287			// TODO(aaronl): There are some ways to cut down the
288			// number of retained tasks at the cost of more
289			// complexity:
290			//   - Don't force retention of tasks with an older spec
291			//     version.
292			//   - Don't force retention of tasks outside of the
293			//     time window configured for restart lookback.
294			if service.Spec.Task.Restart != nil && service.Spec.Task.Restart.MaxAttempts > 0 {
295				taskHistory = int64(service.Spec.Task.Restart.MaxAttempts) + 1
296			}
297
298			// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
299			if taskHistory < 0 {
300				delete(tr.dirty, dirty)
301				continue
302			}
303
304			var historicTasks []*api.Task
305
306			switch service.Spec.GetMode().(type) {
307			case *api.ServiceSpec_Replicated:
308				// Clean out the slot for which we received EventCreateTask.
309				var err error
310				historicTasks, err = store.FindTasks(tx, store.BySlot(dirty.ServiceID, dirty.Slot))
311				if err != nil {
312					continue
313				}
314
315			case *api.ServiceSpec_Global:
316				// Clean out the node history in case of global services.
317				tasksByNode, err := store.FindTasks(tx, store.ByNodeID(dirty.NodeID))
318				if err != nil {
319					continue
320				}
321
322				for _, t := range tasksByNode {
323					if t.ServiceID == dirty.ServiceID {
324						historicTasks = append(historicTasks, t)
325					}
326				}
327			}
328
329			if int64(len(historicTasks)) <= taskHistory {
330				delete(tr.dirty, dirty)
331				continue
332			}
333
334			// TODO(aaronl): This could filter for non-running tasks and use quickselect
335			// instead of sorting the whole slice.
336			// TODO(aaronl): This sort should really use lamport time instead of wall
337			// clock time. We should store a Version in the Status field.
338			sort.Sort(orchestrator.TasksByTimestamp(historicTasks))
339
340			runningTasks := 0
341			for _, t := range historicTasks {
342				// Historical tasks can be considered for cleanup if:
343				// 1. The task has reached a terminal state i.e. actual state beyond TaskStateRunning.
344				// 2. The task has not yet become running and desired state is a terminal state i.e.
345				// actual state not yet TaskStateAssigned and desired state beyond TaskStateRunning.
346				if taskInTerminalState(t) || taskWillNeverRun(t) {
347					deleteTasks[t.ID] = struct{}{}
348
349					taskHistory++
350					if int64(len(historicTasks)) <= taskHistory {
351						break
352					}
353				} else {
354					// all other tasks are counted as running.
355					runningTasks++
356				}
357			}
358
359			// The only case when we keep the slot dirty at the end of tick()
360			// is when there are more than one running tasks present
361			// for a given slot.
362			// In that case, we keep the slot dirty to allow it to be
363			// cleaned when tick() is called next and one or more of
364			// the tasks in that slot have stopped running.
365			if runningTasks <= 1 {
366				delete(tr.dirty, dirty)
367			}
368		}
369	})
370
371	// Perform cleanup.
372	if len(deleteTasks) > 0 {
373		tr.store.Batch(func(batch *store.Batch) error {
374			for taskID := range deleteTasks {
375				batch.Update(func(tx store.Tx) error {
376					return store.DeleteTask(tx, taskID)
377				})
378			}
379			return nil
380		})
381	}
382}
383
384// Stop stops the TaskReaper and waits for the main loop to exit.
385// Stop can be called in two cases. One when the manager is
386// shutting down, and the other when the manager (the leader) is
387// becoming a follower. Since these two instances could race with
388// each other, we use closeOnce here to ensure that TaskReaper.Stop()
389// is called only once to avoid a panic.
390func (tr *TaskReaper) Stop() {
391	tr.closeOnce.Do(func() {
392		close(tr.stopChan)
393	})
394	<-tr.doneChan
395}
396