1package restart
2
3import (
4	"container/list"
5	"context"
6	"errors"
7	"sync"
8	"time"
9
10	"github.com/docker/go-events"
11	"github.com/docker/swarmkit/api"
12	"github.com/docker/swarmkit/api/defaults"
13	"github.com/docker/swarmkit/log"
14	"github.com/docker/swarmkit/manager/orchestrator"
15	"github.com/docker/swarmkit/manager/state"
16	"github.com/docker/swarmkit/manager/state/store"
17	gogotypes "github.com/gogo/protobuf/types"
18)
19
20const defaultOldTaskTimeout = time.Minute
21
22type restartedInstance struct {
23	timestamp time.Time
24}
25
26type instanceRestartInfo struct {
27	// counter of restarts for this instance.
28	totalRestarts uint64
29	// Linked list of restartedInstance structs. Only used when
30	// Restart.MaxAttempts and Restart.Window are both
31	// nonzero.
32	restartedInstances *list.List
33	// Why is specVersion in this structure and not in the map key? While
34	// putting it in the key would be a very simple solution, it wouldn't
35	// be easy to clean up map entries corresponding to old specVersions.
36	// Making the key version-agnostic and clearing the value whenever the
37	// version changes avoids the issue of stale map entries for old
38	// versions.
39	specVersion api.Version
40}
41
42type delayedStart struct {
43	// cancel is called to cancel the delayed start.
44	cancel func()
45	doneCh chan struct{}
46
47	// waiter is set to true if the next restart is waiting for this delay
48	// to complete.
49	waiter bool
50}
51
52// Supervisor initiates and manages restarts. It's responsible for
53// delaying restarts when applicable.
54type Supervisor struct {
55	mu               sync.Mutex
56	store            *store.MemoryStore
57	delays           map[string]*delayedStart
58	historyByService map[string]map[orchestrator.SlotTuple]*instanceRestartInfo
59	TaskTimeout      time.Duration
60}
61
62// NewSupervisor creates a new RestartSupervisor.
63func NewSupervisor(store *store.MemoryStore) *Supervisor {
64	return &Supervisor{
65		store:            store,
66		delays:           make(map[string]*delayedStart),
67		historyByService: make(map[string]map[orchestrator.SlotTuple]*instanceRestartInfo),
68		TaskTimeout:      defaultOldTaskTimeout,
69	}
70}
71
72func (r *Supervisor) waitRestart(ctx context.Context, oldDelay *delayedStart, cluster *api.Cluster, taskID string) {
73	// Wait for the last restart delay to elapse.
74	select {
75	case <-oldDelay.doneCh:
76	case <-ctx.Done():
77		return
78	}
79
80	// Start the next restart
81	err := r.store.Update(func(tx store.Tx) error {
82		t := store.GetTask(tx, taskID)
83		if t == nil {
84			return nil
85		}
86		if t.DesiredState > api.TaskStateRunning {
87			return nil
88		}
89		service := store.GetService(tx, t.ServiceID)
90		if service == nil {
91			return nil
92		}
93		return r.Restart(ctx, tx, cluster, service, *t)
94	})
95
96	if err != nil {
97		log.G(ctx).WithError(err).Errorf("failed to restart task after waiting for previous restart")
98	}
99}
100
101// Restart initiates a new task to replace t if appropriate under the service's
102// restart policy.
103func (r *Supervisor) Restart(ctx context.Context, tx store.Tx, cluster *api.Cluster, service *api.Service, t api.Task) error {
104	// TODO(aluzzardi): This function should not depend on `service`.
105
106	// Is the old task still in the process of restarting? If so, wait for
107	// its restart delay to elapse, to avoid tight restart loops (for
108	// example, when the image doesn't exist).
109	r.mu.Lock()
110	oldDelay, ok := r.delays[t.ID]
111	if ok {
112		if !oldDelay.waiter {
113			oldDelay.waiter = true
114			go r.waitRestart(ctx, oldDelay, cluster, t.ID)
115		}
116		r.mu.Unlock()
117		return nil
118	}
119	r.mu.Unlock()
120
121	// Sanity check: was the task shut down already by a separate call to
122	// Restart? If so, we must avoid restarting it, because this will create
123	// an extra task. This should never happen unless there is a bug.
124	if t.DesiredState > api.TaskStateRunning {
125		return errors.New("Restart called on task that was already shut down")
126	}
127
128	t.DesiredState = api.TaskStateShutdown
129	err := store.UpdateTask(tx, &t)
130	if err != nil {
131		log.G(ctx).WithError(err).Errorf("failed to set task desired state to dead")
132		return err
133	}
134
135	if !r.shouldRestart(ctx, &t, service) {
136		return nil
137	}
138
139	var restartTask *api.Task
140
141	if orchestrator.IsReplicatedService(service) {
142		restartTask = orchestrator.NewTask(cluster, service, t.Slot, "")
143	} else if orchestrator.IsGlobalService(service) {
144		restartTask = orchestrator.NewTask(cluster, service, 0, t.NodeID)
145	} else {
146		log.G(ctx).Error("service not supported by restart supervisor")
147		return nil
148	}
149
150	n := store.GetNode(tx, t.NodeID)
151
152	restartTask.DesiredState = api.TaskStateReady
153
154	var restartDelay time.Duration
155	// Restart delay is not applied to drained nodes
156	if n == nil || n.Spec.Availability != api.NodeAvailabilityDrain {
157		if t.Spec.Restart != nil && t.Spec.Restart.Delay != nil {
158			var err error
159			restartDelay, err = gogotypes.DurationFromProto(t.Spec.Restart.Delay)
160			if err != nil {
161				log.G(ctx).WithError(err).Error("invalid restart delay; using default")
162				restartDelay, _ = gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay)
163			}
164		} else {
165			restartDelay, _ = gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay)
166		}
167	}
168
169	waitStop := true
170
171	// Normally we wait for the old task to stop running, but we skip this
172	// if the old task is already dead or the node it's assigned to is down.
173	if (n != nil && n.Status.State == api.NodeStatus_DOWN) || t.Status.State > api.TaskStateRunning {
174		waitStop = false
175	}
176
177	if err := store.CreateTask(tx, restartTask); err != nil {
178		log.G(ctx).WithError(err).WithField("task.id", restartTask.ID).Error("task create failed")
179		return err
180	}
181
182	tuple := orchestrator.SlotTuple{
183		Slot:      restartTask.Slot,
184		ServiceID: restartTask.ServiceID,
185		NodeID:    restartTask.NodeID,
186	}
187	r.RecordRestartHistory(tuple, restartTask)
188
189	r.DelayStart(ctx, tx, &t, restartTask.ID, restartDelay, waitStop)
190	return nil
191}
192
193// shouldRestart returns true if a task should be restarted according to the
194// restart policy.
195func (r *Supervisor) shouldRestart(ctx context.Context, t *api.Task, service *api.Service) bool {
196	// TODO(aluzzardi): This function should not depend on `service`.
197	condition := orchestrator.RestartCondition(t)
198
199	if condition != api.RestartOnAny &&
200		(condition != api.RestartOnFailure || t.Status.State == api.TaskStateCompleted) {
201		return false
202	}
203
204	if t.Spec.Restart == nil || t.Spec.Restart.MaxAttempts == 0 {
205		return true
206	}
207
208	instanceTuple := orchestrator.SlotTuple{
209		Slot:      t.Slot,
210		ServiceID: t.ServiceID,
211	}
212
213	// Slot is not meaningful for "global" tasks, so they need to be
214	// indexed by NodeID.
215	if orchestrator.IsGlobalService(service) {
216		instanceTuple.NodeID = t.NodeID
217	}
218
219	r.mu.Lock()
220	defer r.mu.Unlock()
221
222	restartInfo := r.historyByService[t.ServiceID][instanceTuple]
223	if restartInfo == nil || (t.SpecVersion != nil && *t.SpecVersion != restartInfo.specVersion) {
224		return true
225	}
226
227	if t.Spec.Restart.Window == nil || (t.Spec.Restart.Window.Seconds == 0 && t.Spec.Restart.Window.Nanos == 0) {
228		return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts
229	}
230
231	if restartInfo.restartedInstances == nil {
232		return true
233	}
234
235	window, err := gogotypes.DurationFromProto(t.Spec.Restart.Window)
236	if err != nil {
237		log.G(ctx).WithError(err).Error("invalid restart lookback window")
238		return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts
239	}
240
241	var timestamp time.Time
242	// Prefer the manager's timestamp over the agent's, since manager
243	// clocks are more trustworthy.
244	if t.Status.AppliedAt != nil {
245		timestamp, err = gogotypes.TimestampFromProto(t.Status.AppliedAt)
246		if err != nil {
247			log.G(ctx).WithError(err).Error("invalid task status AppliedAt timestamp")
248			return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts
249		}
250	} else {
251		// It's safe to call TimestampFromProto with a nil timestamp
252		timestamp, err = gogotypes.TimestampFromProto(t.Status.Timestamp)
253		if t.Status.Timestamp == nil || err != nil {
254			log.G(ctx).WithError(err).Error("invalid task completion timestamp")
255			return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts
256		}
257	}
258	lookback := timestamp.Add(-window)
259
260	numRestarts := uint64(restartInfo.restartedInstances.Len())
261
262	// Disregard any restarts that happened before the lookback window,
263	// and remove them from the linked list since they will no longer
264	// be relevant to figuring out if tasks should be restarted going
265	// forward.
266	var next *list.Element
267	for e := restartInfo.restartedInstances.Front(); e != nil; e = next {
268		next = e.Next()
269
270		if e.Value.(restartedInstance).timestamp.After(lookback) {
271			break
272		}
273		restartInfo.restartedInstances.Remove(e)
274		numRestarts--
275	}
276
277	// Ignore restarts that didn't happen before the task we're looking at.
278	for e2 := restartInfo.restartedInstances.Back(); e2 != nil; e2 = e2.Prev() {
279		if e2.Value.(restartedInstance).timestamp.Before(timestamp) {
280			break
281		}
282		numRestarts--
283	}
284
285	if restartInfo.restartedInstances.Len() == 0 {
286		restartInfo.restartedInstances = nil
287	}
288
289	return numRestarts < t.Spec.Restart.MaxAttempts
290}
291
292// UpdatableTasksInSlot returns the set of tasks that should be passed to the
293// updater from this slot, or an empty slice if none should be.  An updatable
294// slot has either at least one task that with desired state <= RUNNING, or its
295// most recent task has stopped running and should not be restarted. The latter
296// case is for making sure that tasks that shouldn't normally be restarted will
297// still be handled by rolling updates when they become outdated.  There is a
298// special case for rollbacks to make sure that a rollback always takes the
299// service to a converged state, instead of ignoring tasks with the original
300// spec that stopped running and shouldn't be restarted according to the
301// restart policy.
302func (r *Supervisor) UpdatableTasksInSlot(ctx context.Context, slot orchestrator.Slot, service *api.Service) orchestrator.Slot {
303	if len(slot) < 1 {
304		return nil
305	}
306
307	var updatable orchestrator.Slot
308	for _, t := range slot {
309		if t.DesiredState <= api.TaskStateRunning {
310			updatable = append(updatable, t)
311		}
312	}
313	if len(updatable) > 0 {
314		return updatable
315	}
316
317	if service.UpdateStatus != nil && service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_STARTED {
318		return nil
319	}
320
321	// Find most recent task
322	byTimestamp := orchestrator.TasksByTimestamp(slot)
323	newestIndex := 0
324	for i := 1; i != len(slot); i++ {
325		if byTimestamp.Less(newestIndex, i) {
326			newestIndex = i
327		}
328	}
329
330	if !r.shouldRestart(ctx, slot[newestIndex], service) {
331		return orchestrator.Slot{slot[newestIndex]}
332	}
333	return nil
334}
335
336// RecordRestartHistory updates the historyByService map to reflect the restart
337// of restartedTask.
338func (r *Supervisor) RecordRestartHistory(tuple orchestrator.SlotTuple, replacementTask *api.Task) {
339	if replacementTask.Spec.Restart == nil || replacementTask.Spec.Restart.MaxAttempts == 0 {
340		// No limit on the number of restarts, so no need to record
341		// history.
342		return
343	}
344
345	r.mu.Lock()
346	defer r.mu.Unlock()
347
348	serviceID := replacementTask.ServiceID
349	if r.historyByService[serviceID] == nil {
350		r.historyByService[serviceID] = make(map[orchestrator.SlotTuple]*instanceRestartInfo)
351	}
352	if r.historyByService[serviceID][tuple] == nil {
353		r.historyByService[serviceID][tuple] = &instanceRestartInfo{}
354	}
355
356	restartInfo := r.historyByService[serviceID][tuple]
357
358	if replacementTask.SpecVersion != nil && *replacementTask.SpecVersion != restartInfo.specVersion {
359		// This task has a different SpecVersion from the one we're
360		// tracking. Most likely, the service was updated. Past failures
361		// shouldn't count against the new service definition, so clear
362		// the history for this instance.
363		*restartInfo = instanceRestartInfo{
364			specVersion: *replacementTask.SpecVersion,
365		}
366	}
367
368	restartInfo.totalRestarts++
369
370	if replacementTask.Spec.Restart.Window != nil && (replacementTask.Spec.Restart.Window.Seconds != 0 || replacementTask.Spec.Restart.Window.Nanos != 0) {
371		if restartInfo.restartedInstances == nil {
372			restartInfo.restartedInstances = list.New()
373		}
374
375		// it's okay to call TimestampFromProto with a nil argument
376		timestamp, err := gogotypes.TimestampFromProto(replacementTask.Meta.CreatedAt)
377		if replacementTask.Meta.CreatedAt == nil || err != nil {
378			timestamp = time.Now()
379		}
380
381		restartedInstance := restartedInstance{
382			timestamp: timestamp,
383		}
384
385		restartInfo.restartedInstances.PushBack(restartedInstance)
386	}
387}
388
389// DelayStart starts a timer that moves the task from READY to RUNNING once:
390// - The restart delay has elapsed (if applicable)
391// - The old task that it's replacing has stopped running (or this times out)
392// It must be called during an Update transaction to ensure that it does not
393// miss events. The purpose of the store.Tx argument is to avoid accidental
394// calls outside an Update transaction.
395func (r *Supervisor) DelayStart(ctx context.Context, _ store.Tx, oldTask *api.Task, newTaskID string, delay time.Duration, waitStop bool) <-chan struct{} {
396	ctx, cancel := context.WithCancel(context.Background())
397	doneCh := make(chan struct{})
398
399	r.mu.Lock()
400	for {
401		oldDelay, ok := r.delays[newTaskID]
402		if !ok {
403			break
404		}
405		oldDelay.cancel()
406		r.mu.Unlock()
407		// Note that this channel read should only block for a very
408		// short time, because we cancelled the existing delay and
409		// that should cause it to stop immediately.
410		<-oldDelay.doneCh
411		r.mu.Lock()
412	}
413	r.delays[newTaskID] = &delayedStart{cancel: cancel, doneCh: doneCh}
414	r.mu.Unlock()
415
416	var watch chan events.Event
417	cancelWatch := func() {}
418
419	waitForTask := waitStop && oldTask != nil && oldTask.Status.State <= api.TaskStateRunning
420
421	if waitForTask {
422		// Wait for either the old task to complete, or the old task's
423		// node to become unavailable.
424		watch, cancelWatch = state.Watch(
425			r.store.WatchQueue(),
426			api.EventUpdateTask{
427				Task:   &api.Task{ID: oldTask.ID, Status: api.TaskStatus{State: api.TaskStateRunning}},
428				Checks: []api.TaskCheckFunc{api.TaskCheckID, state.TaskCheckStateGreaterThan},
429			},
430			api.EventUpdateNode{
431				Node:   &api.Node{ID: oldTask.NodeID, Status: api.NodeStatus{State: api.NodeStatus_DOWN}},
432				Checks: []api.NodeCheckFunc{api.NodeCheckID, state.NodeCheckState},
433			},
434			api.EventDeleteNode{
435				Node:   &api.Node{ID: oldTask.NodeID},
436				Checks: []api.NodeCheckFunc{api.NodeCheckID},
437			},
438		)
439	}
440
441	go func() {
442		defer func() {
443			cancelWatch()
444			r.mu.Lock()
445			delete(r.delays, newTaskID)
446			r.mu.Unlock()
447			close(doneCh)
448		}()
449
450		oldTaskTimer := time.NewTimer(r.TaskTimeout)
451		defer oldTaskTimer.Stop()
452
453		// Wait for the delay to elapse, if one is specified.
454		if delay != 0 {
455			select {
456			case <-time.After(delay):
457			case <-ctx.Done():
458				return
459			}
460		}
461
462		if waitForTask {
463			select {
464			case <-watch:
465			case <-oldTaskTimer.C:
466			case <-ctx.Done():
467				return
468			}
469		}
470
471		err := r.store.Update(func(tx store.Tx) error {
472			err := r.StartNow(tx, newTaskID)
473			if err != nil {
474				log.G(ctx).WithError(err).WithField("task.id", newTaskID).Error("moving task out of delayed state failed")
475			}
476			return nil
477		})
478		if err != nil {
479			log.G(ctx).WithError(err).WithField("task.id", newTaskID).Error("task restart transaction failed")
480		}
481	}()
482
483	return doneCh
484}
485
486// StartNow moves the task into the RUNNING state so it will proceed to start
487// up.
488func (r *Supervisor) StartNow(tx store.Tx, taskID string) error {
489	t := store.GetTask(tx, taskID)
490	if t == nil || t.DesiredState >= api.TaskStateRunning {
491		return nil
492	}
493	t.DesiredState = api.TaskStateRunning
494	return store.UpdateTask(tx, t)
495}
496
497// Cancel cancels a pending restart.
498func (r *Supervisor) Cancel(taskID string) {
499	r.mu.Lock()
500	delay, ok := r.delays[taskID]
501	r.mu.Unlock()
502
503	if !ok {
504		return
505	}
506
507	delay.cancel()
508	<-delay.doneCh
509}
510
511// CancelAll aborts all pending restarts and waits for any instances of
512// StartNow that have already triggered to complete.
513func (r *Supervisor) CancelAll() {
514	var cancelled []delayedStart
515
516	r.mu.Lock()
517	for _, delay := range r.delays {
518		delay.cancel()
519	}
520	r.mu.Unlock()
521
522	for _, delay := range cancelled {
523		<-delay.doneCh
524	}
525}
526
527// ClearServiceHistory forgets restart history related to a given service ID.
528func (r *Supervisor) ClearServiceHistory(serviceID string) {
529	r.mu.Lock()
530	delete(r.historyByService, serviceID)
531	r.mu.Unlock()
532}
533