1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17package server 18 19import ( 20 "io/ioutil" 21 "os" 22 "path/filepath" 23 goruntime "runtime" 24 "time" 25 26 "github.com/containerd/containerd" 27 containerdio "github.com/containerd/containerd/cio" 28 "github.com/containerd/containerd/errdefs" 29 containerdimages "github.com/containerd/containerd/images" 30 "github.com/containerd/containerd/log" 31 "github.com/containerd/containerd/platforms" 32 "github.com/containerd/typeurl" 33 "github.com/pkg/errors" 34 "golang.org/x/net/context" 35 runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" 36 37 cio "github.com/containerd/containerd/pkg/cri/io" 38 containerstore "github.com/containerd/containerd/pkg/cri/store/container" 39 sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" 40 ctrdutil "github.com/containerd/containerd/pkg/cri/util" 41 "github.com/containerd/containerd/pkg/netns" 42) 43 44// NOTE: The recovery logic has following assumption: when the cri plugin is down: 45// 1) Files (e.g. root directory, netns) and checkpoint maintained by the plugin MUST NOT be 46// touched. Or else, recovery logic for those containers/sandboxes may return error. 47// 2) Containerd containers may be deleted, but SHOULD NOT be added. Or else, recovery logic 48// for the newly added container/sandbox will return error, because there is no corresponding root 49// directory created. 50// 3) Containerd container tasks may exit or be stopped, deleted. Even though current logic could 51// tolerant tasks being created or started, we prefer that not to happen. 52 53// recover recovers system state from containerd and status checkpoint. 54func (c *criService) recover(ctx context.Context) error { 55 // Recover all sandboxes. 56 sandboxes, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindSandbox)) 57 if err != nil { 58 return errors.Wrap(err, "failed to list sandbox containers") 59 } 60 for _, sandbox := range sandboxes { 61 sb, err := c.loadSandbox(ctx, sandbox) 62 if err != nil { 63 log.G(ctx).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID()) 64 continue 65 } 66 log.G(ctx).Debugf("Loaded sandbox %+v", sb) 67 if err := c.sandboxStore.Add(sb); err != nil { 68 return errors.Wrapf(err, "failed to add sandbox %q to store", sandbox.ID()) 69 } 70 if err := c.sandboxNameIndex.Reserve(sb.Name, sb.ID); err != nil { 71 return errors.Wrapf(err, "failed to reserve sandbox name %q", sb.Name) 72 } 73 } 74 75 // Recover all containers. 76 containers, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindContainer)) 77 if err != nil { 78 return errors.Wrap(err, "failed to list containers") 79 } 80 for _, container := range containers { 81 cntr, err := c.loadContainer(ctx, container) 82 if err != nil { 83 log.G(ctx).WithError(err).Errorf("Failed to load container %q", container.ID()) 84 continue 85 } 86 log.G(ctx).Debugf("Loaded container %+v", cntr) 87 if err := c.containerStore.Add(cntr); err != nil { 88 return errors.Wrapf(err, "failed to add container %q to store", container.ID()) 89 } 90 if err := c.containerNameIndex.Reserve(cntr.Name, cntr.ID); err != nil { 91 return errors.Wrapf(err, "failed to reserve container name %q", cntr.Name) 92 } 93 } 94 95 // Recover all images. 96 cImages, err := c.client.ListImages(ctx) 97 if err != nil { 98 return errors.Wrap(err, "failed to list images") 99 } 100 c.loadImages(ctx, cImages) 101 102 // It's possible that containerd containers are deleted unexpectedly. In that case, 103 // we can't even get metadata, we should cleanup orphaned sandbox/container directories 104 // with best effort. 105 106 // Cleanup orphaned sandbox and container directories without corresponding containerd container. 107 for _, cleanup := range []struct { 108 cntrs []containerd.Container 109 base string 110 errMsg string 111 }{ 112 { 113 cntrs: sandboxes, 114 base: filepath.Join(c.config.RootDir, sandboxesDir), 115 errMsg: "failed to cleanup orphaned sandbox directories", 116 }, 117 { 118 cntrs: sandboxes, 119 base: filepath.Join(c.config.StateDir, sandboxesDir), 120 errMsg: "failed to cleanup orphaned volatile sandbox directories", 121 }, 122 { 123 cntrs: containers, 124 base: filepath.Join(c.config.RootDir, containersDir), 125 errMsg: "failed to cleanup orphaned container directories", 126 }, 127 { 128 cntrs: containers, 129 base: filepath.Join(c.config.StateDir, containersDir), 130 errMsg: "failed to cleanup orphaned volatile container directories", 131 }, 132 } { 133 if err := cleanupOrphanedIDDirs(ctx, cleanup.cntrs, cleanup.base); err != nil { 134 return errors.Wrap(err, cleanup.errMsg) 135 } 136 } 137 return nil 138} 139 140// loadContainerTimeout is the default timeout for loading a container/sandbox. 141// One container/sandbox hangs (e.g. containerd#2438) should not affect other 142// containers/sandboxes. 143// Most CRI container/sandbox related operations are per container, the ones 144// which handle multiple containers at a time are: 145// * ListPodSandboxes: Don't talk with containerd services. 146// * ListContainers: Don't talk with containerd services. 147// * ListContainerStats: Not in critical code path, a default timeout will 148// be applied at CRI level. 149// * Recovery logic: We should set a time for each container/sandbox recovery. 150// * Event monitor: We should set a timeout for each container/sandbox event handling. 151const loadContainerTimeout = 10 * time.Second 152 153// loadContainer loads container from containerd and status checkpoint. 154func (c *criService) loadContainer(ctx context.Context, cntr containerd.Container) (containerstore.Container, error) { 155 ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) 156 defer cancel() 157 id := cntr.ID() 158 containerDir := c.getContainerRootDir(id) 159 volatileContainerDir := c.getVolatileContainerRootDir(id) 160 var container containerstore.Container 161 // Load container metadata. 162 exts, err := cntr.Extensions(ctx) 163 if err != nil { 164 return container, errors.Wrap(err, "failed to get container extensions") 165 } 166 ext, ok := exts[containerMetadataExtension] 167 if !ok { 168 return container, errors.Errorf("metadata extension %q not found", containerMetadataExtension) 169 } 170 data, err := typeurl.UnmarshalAny(&ext) 171 if err != nil { 172 return container, errors.Wrapf(err, "failed to unmarshal metadata extension %q", ext) 173 } 174 meta := data.(*containerstore.Metadata) 175 176 // Load status from checkpoint. 177 status, err := containerstore.LoadStatus(containerDir, id) 178 if err != nil { 179 log.G(ctx).WithError(err).Warnf("Failed to load container status for %q", id) 180 status = unknownContainerStatus() 181 } 182 183 var containerIO *cio.ContainerIO 184 err = func() error { 185 // Load up-to-date status from containerd. 186 t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) { 187 stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty()) 188 if err != nil { 189 return nil, err 190 } 191 defer func() { 192 if err != nil { 193 if stdoutWC != nil { 194 stdoutWC.Close() 195 } 196 if stderrWC != nil { 197 stderrWC.Close() 198 } 199 } 200 }() 201 containerIO, err = cio.NewContainerIO(id, 202 cio.WithFIFOs(fifos), 203 ) 204 if err != nil { 205 return nil, err 206 } 207 containerIO.AddOutput("log", stdoutWC, stderrWC) 208 containerIO.Pipe() 209 return containerIO, nil 210 }) 211 if err != nil && !errdefs.IsNotFound(err) { 212 return errors.Wrap(err, "failed to load task") 213 } 214 var s containerd.Status 215 var notFound bool 216 if errdefs.IsNotFound(err) { 217 // Task is not found. 218 notFound = true 219 } else { 220 // Task is found. Get task status. 221 s, err = t.Status(ctx) 222 if err != nil { 223 // It's still possible that task is deleted during this window. 224 if !errdefs.IsNotFound(err) { 225 return errors.Wrap(err, "failed to get task status") 226 } 227 notFound = true 228 } 229 } 230 if notFound { 231 // Task is not created or has been deleted, use the checkpointed status 232 // to generate container status. 233 switch status.State() { 234 case runtime.ContainerState_CONTAINER_CREATED: 235 // NOTE: Another possibility is that we've tried to start the container, but 236 // containerd got restarted during that. In that case, we still 237 // treat the container as `CREATED`. 238 containerIO, err = cio.NewContainerIO(id, 239 cio.WithNewFIFOs(volatileContainerDir, meta.Config.GetTty(), meta.Config.GetStdin()), 240 ) 241 if err != nil { 242 return errors.Wrap(err, "failed to create container io") 243 } 244 case runtime.ContainerState_CONTAINER_RUNNING: 245 // Container was in running state, but its task has been deleted, 246 // set unknown exited state. Container io is not needed in this case. 247 status.FinishedAt = time.Now().UnixNano() 248 status.ExitCode = unknownExitCode 249 status.Reason = unknownExitReason 250 default: 251 // Container is in exited/unknown state, return the status as it is. 252 } 253 } else { 254 // Task status is found. Update container status based on the up-to-date task status. 255 switch s.Status { 256 case containerd.Created: 257 // Task has been created, but not started yet. This could only happen if containerd 258 // gets restarted during container start. 259 // Container must be in `CREATED` state. 260 if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { 261 return errors.Wrap(err, "failed to delete task") 262 } 263 if status.State() != runtime.ContainerState_CONTAINER_CREATED { 264 return errors.Errorf("unexpected container state for created task: %q", status.State()) 265 } 266 case containerd.Running: 267 // Task is running. Container must be in `RUNNING` state, based on our assumption that 268 // "task should not be started when containerd is down". 269 switch status.State() { 270 case runtime.ContainerState_CONTAINER_EXITED: 271 return errors.Errorf("unexpected container state for running task: %q", status.State()) 272 case runtime.ContainerState_CONTAINER_RUNNING: 273 default: 274 // This may happen if containerd gets restarted after task is started, but 275 // before status is checkpointed. 276 status.StartedAt = time.Now().UnixNano() 277 status.Pid = t.Pid() 278 } 279 // Wait for the task for exit monitor. 280 // wait is a long running background request, no timeout needed. 281 exitCh, err := t.Wait(ctrdutil.NamespacedContext()) 282 if err != nil { 283 if !errdefs.IsNotFound(err) { 284 return errors.Wrap(err, "failed to wait for task") 285 } 286 // Container was in running state, but its task has been deleted, 287 // set unknown exited state. 288 status.FinishedAt = time.Now().UnixNano() 289 status.ExitCode = unknownExitCode 290 status.Reason = unknownExitReason 291 } else { 292 // Start exit monitor. 293 c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh) 294 } 295 case containerd.Stopped: 296 // Task is stopped. Updata status and delete the task. 297 if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { 298 return errors.Wrap(err, "failed to delete task") 299 } 300 status.FinishedAt = s.ExitTime.UnixNano() 301 status.ExitCode = int32(s.ExitStatus) 302 default: 303 return errors.Errorf("unexpected task status %q", s.Status) 304 } 305 } 306 return nil 307 }() 308 if err != nil { 309 log.G(ctx).WithError(err).Errorf("Failed to load container status for %q", id) 310 // Only set the unknown field in this case, because other fields may 311 // contain useful information loaded from the checkpoint. 312 status.Unknown = true 313 } 314 opts := []containerstore.Opts{ 315 containerstore.WithStatus(status, containerDir), 316 containerstore.WithContainer(cntr), 317 } 318 // containerIO could be nil for container in unknown state. 319 if containerIO != nil { 320 opts = append(opts, containerstore.WithContainerIO(containerIO)) 321 } 322 return containerstore.NewContainer(*meta, opts...) 323} 324 325// loadSandbox loads sandbox from containerd. 326func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { 327 ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout) 328 defer cancel() 329 var sandbox sandboxstore.Sandbox 330 // Load sandbox metadata. 331 exts, err := cntr.Extensions(ctx) 332 if err != nil { 333 return sandbox, errors.Wrap(err, "failed to get sandbox container extensions") 334 } 335 ext, ok := exts[sandboxMetadataExtension] 336 if !ok { 337 return sandbox, errors.Errorf("metadata extension %q not found", sandboxMetadataExtension) 338 } 339 data, err := typeurl.UnmarshalAny(&ext) 340 if err != nil { 341 return sandbox, errors.Wrapf(err, "failed to unmarshal metadata extension %q", ext) 342 } 343 meta := data.(*sandboxstore.Metadata) 344 345 s, err := func() (sandboxstore.Status, error) { 346 status := unknownSandboxStatus() 347 // Load sandbox created timestamp. 348 info, err := cntr.Info(ctx) 349 if err != nil { 350 return status, errors.Wrap(err, "failed to get sandbox container info") 351 } 352 status.CreatedAt = info.CreatedAt 353 354 // Load sandbox state. 355 t, err := cntr.Task(ctx, nil) 356 if err != nil && !errdefs.IsNotFound(err) { 357 return status, errors.Wrap(err, "failed to load task") 358 } 359 var taskStatus containerd.Status 360 var notFound bool 361 if errdefs.IsNotFound(err) { 362 // Task is not found. 363 notFound = true 364 } else { 365 // Task is found. Get task status. 366 taskStatus, err = t.Status(ctx) 367 if err != nil { 368 // It's still possible that task is deleted during this window. 369 if !errdefs.IsNotFound(err) { 370 return status, errors.Wrap(err, "failed to get task status") 371 } 372 notFound = true 373 } 374 } 375 if notFound { 376 // Task does not exist, set sandbox state as NOTREADY. 377 status.State = sandboxstore.StateNotReady 378 } else { 379 if taskStatus.Status == containerd.Running { 380 // Wait for the task for sandbox monitor. 381 // wait is a long running background request, no timeout needed. 382 exitCh, err := t.Wait(ctrdutil.NamespacedContext()) 383 if err != nil { 384 if !errdefs.IsNotFound(err) { 385 return status, errors.Wrap(err, "failed to wait for task") 386 } 387 status.State = sandboxstore.StateNotReady 388 } else { 389 // Task is running, set sandbox state as READY. 390 status.State = sandboxstore.StateReady 391 status.Pid = t.Pid() 392 c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh) 393 } 394 } else { 395 // Task is not running. Delete the task and set sandbox state as NOTREADY. 396 if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { 397 return status, errors.Wrap(err, "failed to delete task") 398 } 399 status.State = sandboxstore.StateNotReady 400 } 401 } 402 return status, nil 403 }() 404 if err != nil { 405 log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID()) 406 } 407 408 sandbox = sandboxstore.NewSandbox(*meta, s) 409 sandbox.Container = cntr 410 411 // Load network namespace. 412 if goruntime.GOOS != "windows" && 413 meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE { 414 // Don't need to load netns for host network sandbox. 415 return sandbox, nil 416 } 417 sandbox.NetNS = netns.LoadNetNS(meta.NetNSPath) 418 419 // It doesn't matter whether task is running or not. If it is running, sandbox 420 // status will be `READY`; if it is not running, sandbox status will be `NOT_READY`, 421 // kubelet will stop the sandbox which will properly cleanup everything. 422 return sandbox, nil 423} 424 425// loadImages loads images from containerd. 426func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) { 427 snapshotter := c.config.ContainerdConfig.Snapshotter 428 for _, i := range cImages { 429 ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default()) 430 if err != nil { 431 log.G(ctx).WithError(err).Errorf("Failed to check image content readiness for %q", i.Name()) 432 continue 433 } 434 if !ok { 435 log.G(ctx).Warnf("The image content readiness for %q is not ok", i.Name()) 436 continue 437 } 438 // Checking existence of top-level snapshot for each image being recovered. 439 unpacked, err := i.IsUnpacked(ctx, snapshotter) 440 if err != nil { 441 log.G(ctx).WithError(err).Warnf("Failed to check whether image is unpacked for image %s", i.Name()) 442 continue 443 } 444 if !unpacked { 445 log.G(ctx).Warnf("The image %s is not unpacked.", i.Name()) 446 // TODO(random-liu): Consider whether we should try unpack here. 447 } 448 if err := c.updateImage(ctx, i.Name()); err != nil { 449 log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name()) 450 continue 451 } 452 log.G(ctx).Debugf("Loaded image %q", i.Name()) 453 } 454} 455 456func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, base string) error { 457 // Cleanup orphaned id directories. 458 dirs, err := ioutil.ReadDir(base) 459 if err != nil && !os.IsNotExist(err) { 460 return errors.Wrap(err, "failed to read base directory") 461 } 462 idsMap := make(map[string]containerd.Container) 463 for _, cntr := range cntrs { 464 idsMap[cntr.ID()] = cntr 465 } 466 for _, d := range dirs { 467 if !d.IsDir() { 468 log.G(ctx).Warnf("Invalid file %q found in base directory %q", d.Name(), base) 469 continue 470 } 471 if _, ok := idsMap[d.Name()]; ok { 472 // Do not remove id directory if corresponding container is found. 473 continue 474 } 475 dir := filepath.Join(base, d.Name()) 476 if err := ensureRemoveAll(ctx, dir); err != nil { 477 log.G(ctx).WithError(err).Warnf("Failed to remove id directory %q", dir) 478 } else { 479 log.G(ctx).Debugf("Cleanup orphaned id directory %q", dir) 480 } 481 } 482 return nil 483} 484