1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package server
18
19import (
20	"io/ioutil"
21	"os"
22	"path/filepath"
23	goruntime "runtime"
24	"time"
25
26	"github.com/containerd/containerd"
27	containerdio "github.com/containerd/containerd/cio"
28	"github.com/containerd/containerd/errdefs"
29	containerdimages "github.com/containerd/containerd/images"
30	"github.com/containerd/containerd/log"
31	"github.com/containerd/containerd/platforms"
32	"github.com/containerd/typeurl"
33	"github.com/pkg/errors"
34	"golang.org/x/net/context"
35	runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
36
37	cio "github.com/containerd/containerd/pkg/cri/io"
38	containerstore "github.com/containerd/containerd/pkg/cri/store/container"
39	sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
40	ctrdutil "github.com/containerd/containerd/pkg/cri/util"
41	"github.com/containerd/containerd/pkg/netns"
42)
43
44// NOTE: The recovery logic has following assumption: when the cri plugin is down:
45// 1) Files (e.g. root directory, netns) and checkpoint maintained by the plugin MUST NOT be
46// touched. Or else, recovery logic for those containers/sandboxes may return error.
47// 2) Containerd containers may be deleted, but SHOULD NOT be added. Or else, recovery logic
48// for the newly added container/sandbox will return error, because there is no corresponding root
49// directory created.
50// 3) Containerd container tasks may exit or be stopped, deleted. Even though current logic could
51// tolerant tasks being created or started, we prefer that not to happen.
52
53// recover recovers system state from containerd and status checkpoint.
54func (c *criService) recover(ctx context.Context) error {
55	// Recover all sandboxes.
56	sandboxes, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindSandbox))
57	if err != nil {
58		return errors.Wrap(err, "failed to list sandbox containers")
59	}
60	for _, sandbox := range sandboxes {
61		sb, err := c.loadSandbox(ctx, sandbox)
62		if err != nil {
63			log.G(ctx).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID())
64			continue
65		}
66		log.G(ctx).Debugf("Loaded sandbox %+v", sb)
67		if err := c.sandboxStore.Add(sb); err != nil {
68			return errors.Wrapf(err, "failed to add sandbox %q to store", sandbox.ID())
69		}
70		if err := c.sandboxNameIndex.Reserve(sb.Name, sb.ID); err != nil {
71			return errors.Wrapf(err, "failed to reserve sandbox name %q", sb.Name)
72		}
73	}
74
75	// Recover all containers.
76	containers, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindContainer))
77	if err != nil {
78		return errors.Wrap(err, "failed to list containers")
79	}
80	for _, container := range containers {
81		cntr, err := c.loadContainer(ctx, container)
82		if err != nil {
83			log.G(ctx).WithError(err).Errorf("Failed to load container %q", container.ID())
84			continue
85		}
86		log.G(ctx).Debugf("Loaded container %+v", cntr)
87		if err := c.containerStore.Add(cntr); err != nil {
88			return errors.Wrapf(err, "failed to add container %q to store", container.ID())
89		}
90		if err := c.containerNameIndex.Reserve(cntr.Name, cntr.ID); err != nil {
91			return errors.Wrapf(err, "failed to reserve container name %q", cntr.Name)
92		}
93	}
94
95	// Recover all images.
96	cImages, err := c.client.ListImages(ctx)
97	if err != nil {
98		return errors.Wrap(err, "failed to list images")
99	}
100	c.loadImages(ctx, cImages)
101
102	// It's possible that containerd containers are deleted unexpectedly. In that case,
103	// we can't even get metadata, we should cleanup orphaned sandbox/container directories
104	// with best effort.
105
106	// Cleanup orphaned sandbox and container directories without corresponding containerd container.
107	for _, cleanup := range []struct {
108		cntrs  []containerd.Container
109		base   string
110		errMsg string
111	}{
112		{
113			cntrs:  sandboxes,
114			base:   filepath.Join(c.config.RootDir, sandboxesDir),
115			errMsg: "failed to cleanup orphaned sandbox directories",
116		},
117		{
118			cntrs:  sandboxes,
119			base:   filepath.Join(c.config.StateDir, sandboxesDir),
120			errMsg: "failed to cleanup orphaned volatile sandbox directories",
121		},
122		{
123			cntrs:  containers,
124			base:   filepath.Join(c.config.RootDir, containersDir),
125			errMsg: "failed to cleanup orphaned container directories",
126		},
127		{
128			cntrs:  containers,
129			base:   filepath.Join(c.config.StateDir, containersDir),
130			errMsg: "failed to cleanup orphaned volatile container directories",
131		},
132	} {
133		if err := cleanupOrphanedIDDirs(ctx, cleanup.cntrs, cleanup.base); err != nil {
134			return errors.Wrap(err, cleanup.errMsg)
135		}
136	}
137	return nil
138}
139
140// loadContainerTimeout is the default timeout for loading a container/sandbox.
141// One container/sandbox hangs (e.g. containerd#2438) should not affect other
142// containers/sandboxes.
143// Most CRI container/sandbox related operations are per container, the ones
144// which handle multiple containers at a time are:
145// * ListPodSandboxes: Don't talk with containerd services.
146// * ListContainers: Don't talk with containerd services.
147// * ListContainerStats: Not in critical code path, a default timeout will
148// be applied at CRI level.
149// * Recovery logic: We should set a time for each container/sandbox recovery.
150// * Event monitor: We should set a timeout for each container/sandbox event handling.
151const loadContainerTimeout = 10 * time.Second
152
153// loadContainer loads container from containerd and status checkpoint.
154func (c *criService) loadContainer(ctx context.Context, cntr containerd.Container) (containerstore.Container, error) {
155	ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout)
156	defer cancel()
157	id := cntr.ID()
158	containerDir := c.getContainerRootDir(id)
159	volatileContainerDir := c.getVolatileContainerRootDir(id)
160	var container containerstore.Container
161	// Load container metadata.
162	exts, err := cntr.Extensions(ctx)
163	if err != nil {
164		return container, errors.Wrap(err, "failed to get container extensions")
165	}
166	ext, ok := exts[containerMetadataExtension]
167	if !ok {
168		return container, errors.Errorf("metadata extension %q not found", containerMetadataExtension)
169	}
170	data, err := typeurl.UnmarshalAny(&ext)
171	if err != nil {
172		return container, errors.Wrapf(err, "failed to unmarshal metadata extension %q", ext)
173	}
174	meta := data.(*containerstore.Metadata)
175
176	// Load status from checkpoint.
177	status, err := containerstore.LoadStatus(containerDir, id)
178	if err != nil {
179		log.G(ctx).WithError(err).Warnf("Failed to load container status for %q", id)
180		status = unknownContainerStatus()
181	}
182
183	var containerIO *cio.ContainerIO
184	err = func() error {
185		// Load up-to-date status from containerd.
186		t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) {
187			stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty())
188			if err != nil {
189				return nil, err
190			}
191			defer func() {
192				if err != nil {
193					if stdoutWC != nil {
194						stdoutWC.Close()
195					}
196					if stderrWC != nil {
197						stderrWC.Close()
198					}
199				}
200			}()
201			containerIO, err = cio.NewContainerIO(id,
202				cio.WithFIFOs(fifos),
203			)
204			if err != nil {
205				return nil, err
206			}
207			containerIO.AddOutput("log", stdoutWC, stderrWC)
208			containerIO.Pipe()
209			return containerIO, nil
210		})
211		if err != nil && !errdefs.IsNotFound(err) {
212			return errors.Wrap(err, "failed to load task")
213		}
214		var s containerd.Status
215		var notFound bool
216		if errdefs.IsNotFound(err) {
217			// Task is not found.
218			notFound = true
219		} else {
220			// Task is found. Get task status.
221			s, err = t.Status(ctx)
222			if err != nil {
223				// It's still possible that task is deleted during this window.
224				if !errdefs.IsNotFound(err) {
225					return errors.Wrap(err, "failed to get task status")
226				}
227				notFound = true
228			}
229		}
230		if notFound {
231			// Task is not created or has been deleted, use the checkpointed status
232			// to generate container status.
233			switch status.State() {
234			case runtime.ContainerState_CONTAINER_CREATED:
235				// NOTE: Another possibility is that we've tried to start the container, but
236				// containerd got restarted during that. In that case, we still
237				// treat the container as `CREATED`.
238				containerIO, err = cio.NewContainerIO(id,
239					cio.WithNewFIFOs(volatileContainerDir, meta.Config.GetTty(), meta.Config.GetStdin()),
240				)
241				if err != nil {
242					return errors.Wrap(err, "failed to create container io")
243				}
244			case runtime.ContainerState_CONTAINER_RUNNING:
245				// Container was in running state, but its task has been deleted,
246				// set unknown exited state. Container io is not needed in this case.
247				status.FinishedAt = time.Now().UnixNano()
248				status.ExitCode = unknownExitCode
249				status.Reason = unknownExitReason
250			default:
251				// Container is in exited/unknown state, return the status as it is.
252			}
253		} else {
254			// Task status is found. Update container status based on the up-to-date task status.
255			switch s.Status {
256			case containerd.Created:
257				// Task has been created, but not started yet. This could only happen if containerd
258				// gets restarted during container start.
259				// Container must be in `CREATED` state.
260				if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
261					return errors.Wrap(err, "failed to delete task")
262				}
263				if status.State() != runtime.ContainerState_CONTAINER_CREATED {
264					return errors.Errorf("unexpected container state for created task: %q", status.State())
265				}
266			case containerd.Running:
267				// Task is running. Container must be in `RUNNING` state, based on our assumption that
268				// "task should not be started when containerd is down".
269				switch status.State() {
270				case runtime.ContainerState_CONTAINER_EXITED:
271					return errors.Errorf("unexpected container state for running task: %q", status.State())
272				case runtime.ContainerState_CONTAINER_RUNNING:
273				default:
274					// This may happen if containerd gets restarted after task is started, but
275					// before status is checkpointed.
276					status.StartedAt = time.Now().UnixNano()
277					status.Pid = t.Pid()
278				}
279				// Wait for the task for exit monitor.
280				// wait is a long running background request, no timeout needed.
281				exitCh, err := t.Wait(ctrdutil.NamespacedContext())
282				if err != nil {
283					if !errdefs.IsNotFound(err) {
284						return errors.Wrap(err, "failed to wait for task")
285					}
286					// Container was in running state, but its task has been deleted,
287					// set unknown exited state.
288					status.FinishedAt = time.Now().UnixNano()
289					status.ExitCode = unknownExitCode
290					status.Reason = unknownExitReason
291				} else {
292					// Start exit monitor.
293					c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
294				}
295			case containerd.Stopped:
296				// Task is stopped. Updata status and delete the task.
297				if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
298					return errors.Wrap(err, "failed to delete task")
299				}
300				status.FinishedAt = s.ExitTime.UnixNano()
301				status.ExitCode = int32(s.ExitStatus)
302			default:
303				return errors.Errorf("unexpected task status %q", s.Status)
304			}
305		}
306		return nil
307	}()
308	if err != nil {
309		log.G(ctx).WithError(err).Errorf("Failed to load container status for %q", id)
310		// Only set the unknown field in this case, because other fields may
311		// contain useful information loaded from the checkpoint.
312		status.Unknown = true
313	}
314	opts := []containerstore.Opts{
315		containerstore.WithStatus(status, containerDir),
316		containerstore.WithContainer(cntr),
317	}
318	// containerIO could be nil for container in unknown state.
319	if containerIO != nil {
320		opts = append(opts, containerstore.WithContainerIO(containerIO))
321	}
322	return containerstore.NewContainer(*meta, opts...)
323}
324
325// loadSandbox loads sandbox from containerd.
326func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) {
327	ctx, cancel := context.WithTimeout(ctx, loadContainerTimeout)
328	defer cancel()
329	var sandbox sandboxstore.Sandbox
330	// Load sandbox metadata.
331	exts, err := cntr.Extensions(ctx)
332	if err != nil {
333		return sandbox, errors.Wrap(err, "failed to get sandbox container extensions")
334	}
335	ext, ok := exts[sandboxMetadataExtension]
336	if !ok {
337		return sandbox, errors.Errorf("metadata extension %q not found", sandboxMetadataExtension)
338	}
339	data, err := typeurl.UnmarshalAny(&ext)
340	if err != nil {
341		return sandbox, errors.Wrapf(err, "failed to unmarshal metadata extension %q", ext)
342	}
343	meta := data.(*sandboxstore.Metadata)
344
345	s, err := func() (sandboxstore.Status, error) {
346		status := unknownSandboxStatus()
347		// Load sandbox created timestamp.
348		info, err := cntr.Info(ctx)
349		if err != nil {
350			return status, errors.Wrap(err, "failed to get sandbox container info")
351		}
352		status.CreatedAt = info.CreatedAt
353
354		// Load sandbox state.
355		t, err := cntr.Task(ctx, nil)
356		if err != nil && !errdefs.IsNotFound(err) {
357			return status, errors.Wrap(err, "failed to load task")
358		}
359		var taskStatus containerd.Status
360		var notFound bool
361		if errdefs.IsNotFound(err) {
362			// Task is not found.
363			notFound = true
364		} else {
365			// Task is found. Get task status.
366			taskStatus, err = t.Status(ctx)
367			if err != nil {
368				// It's still possible that task is deleted during this window.
369				if !errdefs.IsNotFound(err) {
370					return status, errors.Wrap(err, "failed to get task status")
371				}
372				notFound = true
373			}
374		}
375		if notFound {
376			// Task does not exist, set sandbox state as NOTREADY.
377			status.State = sandboxstore.StateNotReady
378		} else {
379			if taskStatus.Status == containerd.Running {
380				// Wait for the task for sandbox monitor.
381				// wait is a long running background request, no timeout needed.
382				exitCh, err := t.Wait(ctrdutil.NamespacedContext())
383				if err != nil {
384					if !errdefs.IsNotFound(err) {
385						return status, errors.Wrap(err, "failed to wait for task")
386					}
387					status.State = sandboxstore.StateNotReady
388				} else {
389					// Task is running, set sandbox state as READY.
390					status.State = sandboxstore.StateReady
391					status.Pid = t.Pid()
392					c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
393				}
394			} else {
395				// Task is not running. Delete the task and set sandbox state as NOTREADY.
396				if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
397					return status, errors.Wrap(err, "failed to delete task")
398				}
399				status.State = sandboxstore.StateNotReady
400			}
401		}
402		return status, nil
403	}()
404	if err != nil {
405		log.G(ctx).WithError(err).Errorf("Failed to load sandbox status for %q", cntr.ID())
406	}
407
408	sandbox = sandboxstore.NewSandbox(*meta, s)
409	sandbox.Container = cntr
410
411	// Load network namespace.
412	if goruntime.GOOS != "windows" &&
413		meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE {
414		// Don't need to load netns for host network sandbox.
415		return sandbox, nil
416	}
417	sandbox.NetNS = netns.LoadNetNS(meta.NetNSPath)
418
419	// It doesn't matter whether task is running or not. If it is running, sandbox
420	// status will be `READY`; if it is not running, sandbox status will be `NOT_READY`,
421	// kubelet will stop the sandbox which will properly cleanup everything.
422	return sandbox, nil
423}
424
425// loadImages loads images from containerd.
426func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) {
427	snapshotter := c.config.ContainerdConfig.Snapshotter
428	for _, i := range cImages {
429		ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default())
430		if err != nil {
431			log.G(ctx).WithError(err).Errorf("Failed to check image content readiness for %q", i.Name())
432			continue
433		}
434		if !ok {
435			log.G(ctx).Warnf("The image content readiness for %q is not ok", i.Name())
436			continue
437		}
438		// Checking existence of top-level snapshot for each image being recovered.
439		unpacked, err := i.IsUnpacked(ctx, snapshotter)
440		if err != nil {
441			log.G(ctx).WithError(err).Warnf("Failed to check whether image is unpacked for image %s", i.Name())
442			continue
443		}
444		if !unpacked {
445			log.G(ctx).Warnf("The image %s is not unpacked.", i.Name())
446			// TODO(random-liu): Consider whether we should try unpack here.
447		}
448		if err := c.updateImage(ctx, i.Name()); err != nil {
449			log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name())
450			continue
451		}
452		log.G(ctx).Debugf("Loaded image %q", i.Name())
453	}
454}
455
456func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, base string) error {
457	// Cleanup orphaned id directories.
458	dirs, err := ioutil.ReadDir(base)
459	if err != nil && !os.IsNotExist(err) {
460		return errors.Wrap(err, "failed to read base directory")
461	}
462	idsMap := make(map[string]containerd.Container)
463	for _, cntr := range cntrs {
464		idsMap[cntr.ID()] = cntr
465	}
466	for _, d := range dirs {
467		if !d.IsDir() {
468			log.G(ctx).Warnf("Invalid file %q found in base directory %q", d.Name(), base)
469			continue
470		}
471		if _, ok := idsMap[d.Name()]; ok {
472			// Do not remove id directory if corresponding container is found.
473			continue
474		}
475		dir := filepath.Join(base, d.Name())
476		if err := ensureRemoveAll(ctx, dir); err != nil {
477			log.G(ctx).WithError(err).Warnf("Failed to remove id directory %q", dir)
478		} else {
479			log.G(ctx).Debugf("Cleanup orphaned id directory %q", dir)
480		}
481	}
482	return nil
483}
484