1package restart 2 3import ( 4 "container/list" 5 "context" 6 "errors" 7 "sync" 8 "time" 9 10 "github.com/docker/go-events" 11 "github.com/docker/swarmkit/api" 12 "github.com/docker/swarmkit/api/defaults" 13 "github.com/docker/swarmkit/log" 14 "github.com/docker/swarmkit/manager/orchestrator" 15 "github.com/docker/swarmkit/manager/state" 16 "github.com/docker/swarmkit/manager/state/store" 17 gogotypes "github.com/gogo/protobuf/types" 18) 19 20const defaultOldTaskTimeout = time.Minute 21 22type restartedInstance struct { 23 timestamp time.Time 24} 25 26type instanceRestartInfo struct { 27 // counter of restarts for this instance. 28 totalRestarts uint64 29 // Linked list of restartedInstance structs. Only used when 30 // Restart.MaxAttempts and Restart.Window are both 31 // nonzero. 32 restartedInstances *list.List 33 // Why is specVersion in this structure and not in the map key? While 34 // putting it in the key would be a very simple solution, it wouldn't 35 // be easy to clean up map entries corresponding to old specVersions. 36 // Making the key version-agnostic and clearing the value whenever the 37 // version changes avoids the issue of stale map entries for old 38 // versions. 39 specVersion api.Version 40} 41 42type delayedStart struct { 43 // cancel is called to cancel the delayed start. 44 cancel func() 45 doneCh chan struct{} 46 47 // waiter is set to true if the next restart is waiting for this delay 48 // to complete. 49 waiter bool 50} 51 52// Supervisor initiates and manages restarts. It's responsible for 53// delaying restarts when applicable. 54type Supervisor struct { 55 mu sync.Mutex 56 store *store.MemoryStore 57 delays map[string]*delayedStart 58 historyByService map[string]map[orchestrator.SlotTuple]*instanceRestartInfo 59 TaskTimeout time.Duration 60} 61 62// NewSupervisor creates a new RestartSupervisor. 63func NewSupervisor(store *store.MemoryStore) *Supervisor { 64 return &Supervisor{ 65 store: store, 66 delays: make(map[string]*delayedStart), 67 historyByService: make(map[string]map[orchestrator.SlotTuple]*instanceRestartInfo), 68 TaskTimeout: defaultOldTaskTimeout, 69 } 70} 71 72func (r *Supervisor) waitRestart(ctx context.Context, oldDelay *delayedStart, cluster *api.Cluster, taskID string) { 73 // Wait for the last restart delay to elapse. 74 select { 75 case <-oldDelay.doneCh: 76 case <-ctx.Done(): 77 return 78 } 79 80 // Start the next restart 81 err := r.store.Update(func(tx store.Tx) error { 82 t := store.GetTask(tx, taskID) 83 if t == nil { 84 return nil 85 } 86 if t.DesiredState > api.TaskStateRunning { 87 return nil 88 } 89 service := store.GetService(tx, t.ServiceID) 90 if service == nil { 91 return nil 92 } 93 return r.Restart(ctx, tx, cluster, service, *t) 94 }) 95 96 if err != nil { 97 log.G(ctx).WithError(err).Errorf("failed to restart task after waiting for previous restart") 98 } 99} 100 101// Restart initiates a new task to replace t if appropriate under the service's 102// restart policy. 103func (r *Supervisor) Restart(ctx context.Context, tx store.Tx, cluster *api.Cluster, service *api.Service, t api.Task) error { 104 // TODO(aluzzardi): This function should not depend on `service`. 105 106 // Is the old task still in the process of restarting? If so, wait for 107 // its restart delay to elapse, to avoid tight restart loops (for 108 // example, when the image doesn't exist). 109 r.mu.Lock() 110 oldDelay, ok := r.delays[t.ID] 111 if ok { 112 if !oldDelay.waiter { 113 oldDelay.waiter = true 114 go r.waitRestart(ctx, oldDelay, cluster, t.ID) 115 } 116 r.mu.Unlock() 117 return nil 118 } 119 r.mu.Unlock() 120 121 // Sanity check: was the task shut down already by a separate call to 122 // Restart? If so, we must avoid restarting it, because this will create 123 // an extra task. This should never happen unless there is a bug. 124 if t.DesiredState > api.TaskStateRunning { 125 return errors.New("Restart called on task that was already shut down") 126 } 127 128 t.DesiredState = api.TaskStateShutdown 129 err := store.UpdateTask(tx, &t) 130 if err != nil { 131 log.G(ctx).WithError(err).Errorf("failed to set task desired state to dead") 132 return err 133 } 134 135 if !r.shouldRestart(ctx, &t, service) { 136 return nil 137 } 138 139 var restartTask *api.Task 140 141 if orchestrator.IsReplicatedService(service) { 142 restartTask = orchestrator.NewTask(cluster, service, t.Slot, "") 143 } else if orchestrator.IsGlobalService(service) { 144 restartTask = orchestrator.NewTask(cluster, service, 0, t.NodeID) 145 } else { 146 log.G(ctx).Error("service not supported by restart supervisor") 147 return nil 148 } 149 150 n := store.GetNode(tx, t.NodeID) 151 152 restartTask.DesiredState = api.TaskStateReady 153 154 var restartDelay time.Duration 155 // Restart delay is not applied to drained nodes 156 if n == nil || n.Spec.Availability != api.NodeAvailabilityDrain { 157 if t.Spec.Restart != nil && t.Spec.Restart.Delay != nil { 158 var err error 159 restartDelay, err = gogotypes.DurationFromProto(t.Spec.Restart.Delay) 160 if err != nil { 161 log.G(ctx).WithError(err).Error("invalid restart delay; using default") 162 restartDelay, _ = gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay) 163 } 164 } else { 165 restartDelay, _ = gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay) 166 } 167 } 168 169 waitStop := true 170 171 // Normally we wait for the old task to stop running, but we skip this 172 // if the old task is already dead or the node it's assigned to is down. 173 if (n != nil && n.Status.State == api.NodeStatus_DOWN) || t.Status.State > api.TaskStateRunning { 174 waitStop = false 175 } 176 177 if err := store.CreateTask(tx, restartTask); err != nil { 178 log.G(ctx).WithError(err).WithField("task.id", restartTask.ID).Error("task create failed") 179 return err 180 } 181 182 tuple := orchestrator.SlotTuple{ 183 Slot: restartTask.Slot, 184 ServiceID: restartTask.ServiceID, 185 NodeID: restartTask.NodeID, 186 } 187 r.RecordRestartHistory(tuple, restartTask) 188 189 r.DelayStart(ctx, tx, &t, restartTask.ID, restartDelay, waitStop) 190 return nil 191} 192 193// shouldRestart returns true if a task should be restarted according to the 194// restart policy. 195func (r *Supervisor) shouldRestart(ctx context.Context, t *api.Task, service *api.Service) bool { 196 // TODO(aluzzardi): This function should not depend on `service`. 197 condition := orchestrator.RestartCondition(t) 198 199 if condition != api.RestartOnAny && 200 (condition != api.RestartOnFailure || t.Status.State == api.TaskStateCompleted) { 201 return false 202 } 203 204 if t.Spec.Restart == nil || t.Spec.Restart.MaxAttempts == 0 { 205 return true 206 } 207 208 instanceTuple := orchestrator.SlotTuple{ 209 Slot: t.Slot, 210 ServiceID: t.ServiceID, 211 } 212 213 // Slot is not meaningful for "global" tasks, so they need to be 214 // indexed by NodeID. 215 if orchestrator.IsGlobalService(service) { 216 instanceTuple.NodeID = t.NodeID 217 } 218 219 r.mu.Lock() 220 defer r.mu.Unlock() 221 222 restartInfo := r.historyByService[t.ServiceID][instanceTuple] 223 if restartInfo == nil || (t.SpecVersion != nil && *t.SpecVersion != restartInfo.specVersion) { 224 return true 225 } 226 227 if t.Spec.Restart.Window == nil || (t.Spec.Restart.Window.Seconds == 0 && t.Spec.Restart.Window.Nanos == 0) { 228 return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts 229 } 230 231 if restartInfo.restartedInstances == nil { 232 return true 233 } 234 235 window, err := gogotypes.DurationFromProto(t.Spec.Restart.Window) 236 if err != nil { 237 log.G(ctx).WithError(err).Error("invalid restart lookback window") 238 return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts 239 } 240 241 var timestamp time.Time 242 // Prefer the manager's timestamp over the agent's, since manager 243 // clocks are more trustworthy. 244 if t.Status.AppliedAt != nil { 245 timestamp, err = gogotypes.TimestampFromProto(t.Status.AppliedAt) 246 if err != nil { 247 log.G(ctx).WithError(err).Error("invalid task status AppliedAt timestamp") 248 return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts 249 } 250 } else { 251 // It's safe to call TimestampFromProto with a nil timestamp 252 timestamp, err = gogotypes.TimestampFromProto(t.Status.Timestamp) 253 if t.Status.Timestamp == nil || err != nil { 254 log.G(ctx).WithError(err).Error("invalid task completion timestamp") 255 return restartInfo.totalRestarts < t.Spec.Restart.MaxAttempts 256 } 257 } 258 lookback := timestamp.Add(-window) 259 260 numRestarts := uint64(restartInfo.restartedInstances.Len()) 261 262 // Disregard any restarts that happened before the lookback window, 263 // and remove them from the linked list since they will no longer 264 // be relevant to figuring out if tasks should be restarted going 265 // forward. 266 var next *list.Element 267 for e := restartInfo.restartedInstances.Front(); e != nil; e = next { 268 next = e.Next() 269 270 if e.Value.(restartedInstance).timestamp.After(lookback) { 271 break 272 } 273 restartInfo.restartedInstances.Remove(e) 274 numRestarts-- 275 } 276 277 // Ignore restarts that didn't happen before the task we're looking at. 278 for e2 := restartInfo.restartedInstances.Back(); e2 != nil; e2 = e2.Prev() { 279 if e2.Value.(restartedInstance).timestamp.Before(timestamp) { 280 break 281 } 282 numRestarts-- 283 } 284 285 if restartInfo.restartedInstances.Len() == 0 { 286 restartInfo.restartedInstances = nil 287 } 288 289 return numRestarts < t.Spec.Restart.MaxAttempts 290} 291 292// UpdatableTasksInSlot returns the set of tasks that should be passed to the 293// updater from this slot, or an empty slice if none should be. An updatable 294// slot has either at least one task that with desired state <= RUNNING, or its 295// most recent task has stopped running and should not be restarted. The latter 296// case is for making sure that tasks that shouldn't normally be restarted will 297// still be handled by rolling updates when they become outdated. There is a 298// special case for rollbacks to make sure that a rollback always takes the 299// service to a converged state, instead of ignoring tasks with the original 300// spec that stopped running and shouldn't be restarted according to the 301// restart policy. 302func (r *Supervisor) UpdatableTasksInSlot(ctx context.Context, slot orchestrator.Slot, service *api.Service) orchestrator.Slot { 303 if len(slot) < 1 { 304 return nil 305 } 306 307 var updatable orchestrator.Slot 308 for _, t := range slot { 309 if t.DesiredState <= api.TaskStateRunning { 310 updatable = append(updatable, t) 311 } 312 } 313 if len(updatable) > 0 { 314 return updatable 315 } 316 317 if service.UpdateStatus != nil && service.UpdateStatus.State == api.UpdateStatus_ROLLBACK_STARTED { 318 return nil 319 } 320 321 // Find most recent task 322 byTimestamp := orchestrator.TasksByTimestamp(slot) 323 newestIndex := 0 324 for i := 1; i != len(slot); i++ { 325 if byTimestamp.Less(newestIndex, i) { 326 newestIndex = i 327 } 328 } 329 330 if !r.shouldRestart(ctx, slot[newestIndex], service) { 331 return orchestrator.Slot{slot[newestIndex]} 332 } 333 return nil 334} 335 336// RecordRestartHistory updates the historyByService map to reflect the restart 337// of restartedTask. 338func (r *Supervisor) RecordRestartHistory(tuple orchestrator.SlotTuple, replacementTask *api.Task) { 339 if replacementTask.Spec.Restart == nil || replacementTask.Spec.Restart.MaxAttempts == 0 { 340 // No limit on the number of restarts, so no need to record 341 // history. 342 return 343 } 344 345 r.mu.Lock() 346 defer r.mu.Unlock() 347 348 serviceID := replacementTask.ServiceID 349 if r.historyByService[serviceID] == nil { 350 r.historyByService[serviceID] = make(map[orchestrator.SlotTuple]*instanceRestartInfo) 351 } 352 if r.historyByService[serviceID][tuple] == nil { 353 r.historyByService[serviceID][tuple] = &instanceRestartInfo{} 354 } 355 356 restartInfo := r.historyByService[serviceID][tuple] 357 358 if replacementTask.SpecVersion != nil && *replacementTask.SpecVersion != restartInfo.specVersion { 359 // This task has a different SpecVersion from the one we're 360 // tracking. Most likely, the service was updated. Past failures 361 // shouldn't count against the new service definition, so clear 362 // the history for this instance. 363 *restartInfo = instanceRestartInfo{ 364 specVersion: *replacementTask.SpecVersion, 365 } 366 } 367 368 restartInfo.totalRestarts++ 369 370 if replacementTask.Spec.Restart.Window != nil && (replacementTask.Spec.Restart.Window.Seconds != 0 || replacementTask.Spec.Restart.Window.Nanos != 0) { 371 if restartInfo.restartedInstances == nil { 372 restartInfo.restartedInstances = list.New() 373 } 374 375 // it's okay to call TimestampFromProto with a nil argument 376 timestamp, err := gogotypes.TimestampFromProto(replacementTask.Meta.CreatedAt) 377 if replacementTask.Meta.CreatedAt == nil || err != nil { 378 timestamp = time.Now() 379 } 380 381 restartedInstance := restartedInstance{ 382 timestamp: timestamp, 383 } 384 385 restartInfo.restartedInstances.PushBack(restartedInstance) 386 } 387} 388 389// DelayStart starts a timer that moves the task from READY to RUNNING once: 390// - The restart delay has elapsed (if applicable) 391// - The old task that it's replacing has stopped running (or this times out) 392// It must be called during an Update transaction to ensure that it does not 393// miss events. The purpose of the store.Tx argument is to avoid accidental 394// calls outside an Update transaction. 395func (r *Supervisor) DelayStart(ctx context.Context, _ store.Tx, oldTask *api.Task, newTaskID string, delay time.Duration, waitStop bool) <-chan struct{} { 396 ctx, cancel := context.WithCancel(context.Background()) 397 doneCh := make(chan struct{}) 398 399 r.mu.Lock() 400 for { 401 oldDelay, ok := r.delays[newTaskID] 402 if !ok { 403 break 404 } 405 oldDelay.cancel() 406 r.mu.Unlock() 407 // Note that this channel read should only block for a very 408 // short time, because we cancelled the existing delay and 409 // that should cause it to stop immediately. 410 <-oldDelay.doneCh 411 r.mu.Lock() 412 } 413 r.delays[newTaskID] = &delayedStart{cancel: cancel, doneCh: doneCh} 414 r.mu.Unlock() 415 416 var watch chan events.Event 417 cancelWatch := func() {} 418 419 waitForTask := waitStop && oldTask != nil && oldTask.Status.State <= api.TaskStateRunning 420 421 if waitForTask { 422 // Wait for either the old task to complete, or the old task's 423 // node to become unavailable. 424 watch, cancelWatch = state.Watch( 425 r.store.WatchQueue(), 426 api.EventUpdateTask{ 427 Task: &api.Task{ID: oldTask.ID, Status: api.TaskStatus{State: api.TaskStateRunning}}, 428 Checks: []api.TaskCheckFunc{api.TaskCheckID, state.TaskCheckStateGreaterThan}, 429 }, 430 api.EventUpdateNode{ 431 Node: &api.Node{ID: oldTask.NodeID, Status: api.NodeStatus{State: api.NodeStatus_DOWN}}, 432 Checks: []api.NodeCheckFunc{api.NodeCheckID, state.NodeCheckState}, 433 }, 434 api.EventDeleteNode{ 435 Node: &api.Node{ID: oldTask.NodeID}, 436 Checks: []api.NodeCheckFunc{api.NodeCheckID}, 437 }, 438 ) 439 } 440 441 go func() { 442 defer func() { 443 cancelWatch() 444 r.mu.Lock() 445 delete(r.delays, newTaskID) 446 r.mu.Unlock() 447 close(doneCh) 448 }() 449 450 oldTaskTimer := time.NewTimer(r.TaskTimeout) 451 defer oldTaskTimer.Stop() 452 453 // Wait for the delay to elapse, if one is specified. 454 if delay != 0 { 455 select { 456 case <-time.After(delay): 457 case <-ctx.Done(): 458 return 459 } 460 } 461 462 if waitForTask { 463 select { 464 case <-watch: 465 case <-oldTaskTimer.C: 466 case <-ctx.Done(): 467 return 468 } 469 } 470 471 err := r.store.Update(func(tx store.Tx) error { 472 err := r.StartNow(tx, newTaskID) 473 if err != nil { 474 log.G(ctx).WithError(err).WithField("task.id", newTaskID).Error("moving task out of delayed state failed") 475 } 476 return nil 477 }) 478 if err != nil { 479 log.G(ctx).WithError(err).WithField("task.id", newTaskID).Error("task restart transaction failed") 480 } 481 }() 482 483 return doneCh 484} 485 486// StartNow moves the task into the RUNNING state so it will proceed to start 487// up. 488func (r *Supervisor) StartNow(tx store.Tx, taskID string) error { 489 t := store.GetTask(tx, taskID) 490 if t == nil || t.DesiredState >= api.TaskStateRunning { 491 return nil 492 } 493 t.DesiredState = api.TaskStateRunning 494 return store.UpdateTask(tx, t) 495} 496 497// Cancel cancels a pending restart. 498func (r *Supervisor) Cancel(taskID string) { 499 r.mu.Lock() 500 delay, ok := r.delays[taskID] 501 r.mu.Unlock() 502 503 if !ok { 504 return 505 } 506 507 delay.cancel() 508 <-delay.doneCh 509} 510 511// CancelAll aborts all pending restarts and waits for any instances of 512// StartNow that have already triggered to complete. 513func (r *Supervisor) CancelAll() { 514 var cancelled []delayedStart 515 516 r.mu.Lock() 517 for _, delay := range r.delays { 518 delay.cancel() 519 } 520 r.mu.Unlock() 521 522 for _, delay := range cancelled { 523 <-delay.doneCh 524 } 525} 526 527// ClearServiceHistory forgets restart history related to a given service ID. 528func (r *Supervisor) ClearServiceHistory(serviceID string) { 529 r.mu.Lock() 530 delete(r.historyByService, serviceID) 531 r.mu.Unlock() 532} 533