1package remote // import "github.com/docker/docker/libcontainerd/remote"
2
3import (
4	"context"
5	"encoding/json"
6	"io"
7	"os"
8	"path/filepath"
9	"reflect"
10	"runtime"
11	"strings"
12	"sync"
13	"syscall"
14	"time"
15
16	"github.com/containerd/containerd"
17	apievents "github.com/containerd/containerd/api/events"
18	"github.com/containerd/containerd/api/types"
19	"github.com/containerd/containerd/archive"
20	"github.com/containerd/containerd/cio"
21	"github.com/containerd/containerd/content"
22	containerderrors "github.com/containerd/containerd/errdefs"
23	"github.com/containerd/containerd/events"
24	"github.com/containerd/containerd/images"
25	"github.com/containerd/containerd/runtime/linux/runctypes"
26	v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
27	"github.com/containerd/typeurl"
28	"github.com/docker/docker/errdefs"
29	"github.com/docker/docker/libcontainerd/queue"
30	libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
31	"github.com/docker/docker/pkg/ioutils"
32	v1 "github.com/opencontainers/image-spec/specs-go/v1"
33	specs "github.com/opencontainers/runtime-spec/specs-go"
34	"github.com/pkg/errors"
35	"github.com/sirupsen/logrus"
36	"google.golang.org/grpc/codes"
37	"google.golang.org/grpc/status"
38)
39
40// DockerContainerBundlePath is the label key pointing to the container's bundle path
41const DockerContainerBundlePath = "com.docker/engine.bundle.path"
42
43type client struct {
44	client   *containerd.Client
45	stateDir string
46	logger   *logrus.Entry
47	ns       string
48
49	backend         libcontainerdtypes.Backend
50	eventQ          queue.Queue
51	oomMu           sync.Mutex
52	oom             map[string]bool
53	v2runcoptionsMu sync.Mutex
54	// v2runcoptions is used for copying options specified on Create() to Start()
55	v2runcoptions map[string]v2runcoptions.Options
56}
57
58// NewClient creates a new libcontainerd client from a containerd client
59func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
60	c := &client{
61		client:        cli,
62		stateDir:      stateDir,
63		logger:        logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
64		ns:            ns,
65		backend:       b,
66		oom:           make(map[string]bool),
67		v2runcoptions: make(map[string]v2runcoptions.Options),
68	}
69
70	go c.processEventStream(ctx, ns)
71
72	return c, nil
73}
74
75func (c *client) Version(ctx context.Context) (containerd.Version, error) {
76	return c.client.Version(ctx)
77}
78
79// Restore loads the containerd container.
80// It should not be called concurrently with any other operation for the given ID.
81func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
82	var dio *cio.DirectIO
83	defer func() {
84		if err != nil && dio != nil {
85			dio.Cancel()
86			dio.Close()
87		}
88		err = wrapError(err)
89	}()
90
91	ctr, err := c.client.LoadContainer(ctx, id)
92	if err != nil {
93		return false, -1, nil, errors.WithStack(wrapError(err))
94	}
95
96	attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
97		// dio must be assigned to the previously defined dio for the defer above
98		// to handle cleanup
99		dio, err = c.newDirectIO(ctx, fifos)
100		if err != nil {
101			return nil, err
102		}
103		return attachStdio(dio)
104	}
105	t, err := ctr.Task(ctx, attachIO)
106	if err != nil && !containerderrors.IsNotFound(err) {
107		return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
108	}
109
110	if t != nil {
111		s, err := t.Status(ctx)
112		if err != nil {
113			return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status")
114		}
115		alive = s.Status != containerd.Stopped
116		pid = int(t.Pid())
117	}
118
119	c.logger.WithFields(logrus.Fields{
120		"container": id,
121		"alive":     alive,
122		"pid":       pid,
123	}).Debug("restored container")
124
125	return alive, pid, &restoredProcess{
126		p: t,
127	}, nil
128}
129
130func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error {
131	bdir := c.bundleDir(id)
132	c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
133
134	newOpts := []containerd.NewContainerOpts{
135		containerd.WithSpec(ociSpec),
136		containerd.WithRuntime(shim, runtimeOptions),
137		WithBundle(bdir, ociSpec),
138	}
139	opts = append(opts, newOpts...)
140
141	_, err := c.client.NewContainer(ctx, id, opts...)
142	if err != nil {
143		if containerderrors.IsAlreadyExists(err) {
144			return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
145		}
146		return wrapError(err)
147	}
148	if x, ok := runtimeOptions.(*v2runcoptions.Options); ok {
149		c.v2runcoptionsMu.Lock()
150		c.v2runcoptions[id] = *x
151		c.v2runcoptionsMu.Unlock()
152	}
153	return nil
154}
155
156// Start create and start a task for the specified containerd id
157func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
158	ctr, err := c.getContainer(ctx, id)
159	if err != nil {
160		return -1, err
161	}
162	var (
163		cp             *types.Descriptor
164		t              containerd.Task
165		rio            cio.IO
166		stdinCloseSync = make(chan struct{})
167	)
168
169	if checkpointDir != "" {
170		// write checkpoint to the content store
171		tar := archive.Diff(ctx, "", checkpointDir)
172		cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
173		// remove the checkpoint when we're done
174		defer func() {
175			if cp != nil {
176				err := c.client.ContentStore().Delete(context.Background(), cp.Digest)
177				if err != nil {
178					c.logger.WithError(err).WithFields(logrus.Fields{
179						"ref":    checkpointDir,
180						"digest": cp.Digest,
181					}).Warnf("failed to delete temporary checkpoint entry")
182				}
183			}
184		}()
185		if err := tar.Close(); err != nil {
186			return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
187		}
188		if err != nil {
189			return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
190		}
191	}
192
193	spec, err := ctr.Spec(ctx)
194	if err != nil {
195		return -1, errors.Wrap(err, "failed to retrieve spec")
196	}
197	labels, err := ctr.Labels(ctx)
198	if err != nil {
199		return -1, errors.Wrap(err, "failed to retrieve labels")
200	}
201	bundle := labels[DockerContainerBundlePath]
202	uid, gid := getSpecUser(spec)
203
204	taskOpts := []containerd.NewTaskOpts{
205		func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
206			info.Checkpoint = cp
207			return nil
208		},
209	}
210
211	if runtime.GOOS != "windows" {
212		taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
213			c.v2runcoptionsMu.Lock()
214			opts, ok := c.v2runcoptions[id]
215			c.v2runcoptionsMu.Unlock()
216			if ok {
217				opts.IoUid = uint32(uid)
218				opts.IoGid = uint32(gid)
219				info.Options = &opts
220			} else {
221				info.Options = &runctypes.CreateOptions{
222					IoUid:       uint32(uid),
223					IoGid:       uint32(gid),
224					NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
225				}
226			}
227			return nil
228		})
229	} else {
230		taskOpts = append(taskOpts, withLogLevel(c.logger.Level))
231	}
232
233	t, err = ctr.NewTask(ctx,
234		func(id string) (cio.IO, error) {
235			fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)
236
237			rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
238			return rio, err
239		},
240		taskOpts...,
241	)
242	if err != nil {
243		close(stdinCloseSync)
244		if rio != nil {
245			rio.Cancel()
246			rio.Close()
247		}
248		return -1, wrapError(err)
249	}
250
251	// Signal c.createIO that it can call CloseIO
252	close(stdinCloseSync)
253
254	if err := t.Start(ctx); err != nil {
255		if _, err := t.Delete(ctx); err != nil {
256			c.logger.WithError(err).WithField("container", id).
257				Error("failed to delete task after fail start")
258		}
259		return -1, wrapError(err)
260	}
261
262	return int(t.Pid()), nil
263}
264
265// Exec creates exec process.
266//
267// The containerd client calls Exec to register the exec config in the shim side.
268// When the client calls Start, the shim will create stdin fifo if needs. But
269// for the container main process, the stdin fifo will be created in Create not
270// the Start call. stdinCloseSync channel should be closed after Start exec
271// process.
272func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
273	ctr, err := c.getContainer(ctx, containerID)
274	if err != nil {
275		return -1, err
276	}
277	t, err := ctr.Task(ctx, nil)
278	if err != nil {
279		if containerderrors.IsNotFound(err) {
280			return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
281		}
282		return -1, wrapError(err)
283	}
284
285	var (
286		p              containerd.Process
287		rio            cio.IO
288		stdinCloseSync = make(chan struct{})
289	)
290
291	labels, err := ctr.Labels(ctx)
292	if err != nil {
293		return -1, wrapError(err)
294	}
295
296	fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
297
298	defer func() {
299		if err != nil {
300			if rio != nil {
301				rio.Cancel()
302				rio.Close()
303			}
304		}
305	}()
306
307	p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
308		rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
309		return rio, err
310	})
311	if err != nil {
312		close(stdinCloseSync)
313		if containerderrors.IsAlreadyExists(err) {
314			return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
315		}
316		return -1, wrapError(err)
317	}
318
319	// Signal c.createIO that it can call CloseIO
320	//
321	// the stdin of exec process will be created after p.Start in containerd
322	defer close(stdinCloseSync)
323
324	if err = p.Start(ctx); err != nil {
325		// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
326		// we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
327		// older containerd-shim
328		ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
329		defer cancel()
330		p.Delete(ctx)
331		return -1, wrapError(err)
332	}
333	return int(p.Pid()), nil
334}
335
336func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
337	p, err := c.getProcess(ctx, containerID, processID)
338	if err != nil {
339		return err
340	}
341	return wrapError(p.Kill(ctx, syscall.Signal(signal)))
342}
343
344func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
345	p, err := c.getProcess(ctx, containerID, processID)
346	if err != nil {
347		return err
348	}
349
350	return p.Resize(ctx, uint32(width), uint32(height))
351}
352
353func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
354	p, err := c.getProcess(ctx, containerID, processID)
355	if err != nil {
356		return err
357	}
358
359	return p.CloseIO(ctx, containerd.WithStdinCloser)
360}
361
362func (c *client) Pause(ctx context.Context, containerID string) error {
363	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
364	if err != nil {
365		return err
366	}
367
368	return wrapError(p.(containerd.Task).Pause(ctx))
369}
370
371func (c *client) Resume(ctx context.Context, containerID string) error {
372	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
373	if err != nil {
374		return err
375	}
376
377	return p.(containerd.Task).Resume(ctx)
378}
379
380func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
381	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
382	if err != nil {
383		return nil, err
384	}
385
386	m, err := p.(containerd.Task).Metrics(ctx)
387	if err != nil {
388		return nil, err
389	}
390
391	v, err := typeurl.UnmarshalAny(m.Data)
392	if err != nil {
393		return nil, err
394	}
395	return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil
396}
397
398func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
399	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
400	if err != nil {
401		return nil, err
402	}
403
404	pis, err := p.(containerd.Task).Pids(ctx)
405	if err != nil {
406		return nil, err
407	}
408
409	var pids []uint32
410	for _, i := range pis {
411		pids = append(pids, i.Pid)
412	}
413
414	return pids, nil
415}
416
417func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
418	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
419	if err != nil {
420		return nil, err
421	}
422
423	pis, err := p.(containerd.Task).Pids(ctx)
424	if err != nil {
425		return nil, err
426	}
427
428	var infos []libcontainerdtypes.Summary
429	for _, pi := range pis {
430		i, err := typeurl.UnmarshalAny(pi.Info)
431		if err != nil {
432			return nil, errors.Wrap(err, "unable to decode process details")
433		}
434		s, err := summaryFromInterface(i)
435		if err != nil {
436			return nil, err
437		}
438		infos = append(infos, *s)
439	}
440
441	return infos, nil
442}
443
444type restoredProcess struct {
445	p containerd.Process
446}
447
448func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
449	if p.p == nil {
450		return 255, time.Now(), nil
451	}
452	status, err := p.p.Delete(ctx)
453	if err != nil {
454		return 255, time.Now(), nil
455	}
456	return status.ExitCode(), status.ExitTime(), nil
457}
458
459func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
460	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
461	if err != nil {
462		return 255, time.Now(), nil
463	}
464
465	status, err := p.Delete(ctx)
466	if err != nil {
467		return 255, time.Now(), nil
468	}
469	return status.ExitCode(), status.ExitTime(), nil
470}
471
472func (c *client) Delete(ctx context.Context, containerID string) error {
473	ctr, err := c.getContainer(ctx, containerID)
474	if err != nil {
475		return err
476	}
477	labels, err := ctr.Labels(ctx)
478	if err != nil {
479		return err
480	}
481	bundle := labels[DockerContainerBundlePath]
482	if err := ctr.Delete(ctx); err != nil {
483		return wrapError(err)
484	}
485	c.oomMu.Lock()
486	delete(c.oom, containerID)
487	c.oomMu.Unlock()
488	c.v2runcoptionsMu.Lock()
489	delete(c.v2runcoptions, containerID)
490	c.v2runcoptionsMu.Unlock()
491	if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
492		if err := os.RemoveAll(bundle); err != nil {
493			c.logger.WithError(err).WithFields(logrus.Fields{
494				"container": containerID,
495				"bundle":    bundle,
496			}).Error("failed to remove state dir")
497		}
498	}
499	return nil
500}
501
502func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
503	t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
504	if err != nil {
505		return containerd.Unknown, err
506	}
507	s, err := t.Status(ctx)
508	if err != nil {
509		return containerd.Unknown, wrapError(err)
510	}
511	return s.Status, nil
512}
513
514func (c *client) getCheckpointOptions(id string, exit bool) containerd.CheckpointTaskOpts {
515	return func(r *containerd.CheckpointTaskInfo) error {
516		if r.Options == nil {
517			c.v2runcoptionsMu.Lock()
518			_, isV2 := c.v2runcoptions[id]
519			c.v2runcoptionsMu.Unlock()
520
521			if isV2 {
522				r.Options = &v2runcoptions.CheckpointOptions{Exit: exit}
523			} else {
524				r.Options = &runctypes.CheckpointOptions{Exit: exit}
525			}
526			return nil
527		}
528
529		switch opts := r.Options.(type) {
530		case *v2runcoptions.CheckpointOptions:
531			opts.Exit = exit
532		case *runctypes.CheckpointOptions:
533			opts.Exit = exit
534		}
535
536		return nil
537	}
538}
539
540func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
541	p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
542	if err != nil {
543		return err
544	}
545
546	opts := []containerd.CheckpointTaskOpts{c.getCheckpointOptions(containerID, exit)}
547	img, err := p.(containerd.Task).Checkpoint(ctx, opts...)
548	if err != nil {
549		return wrapError(err)
550	}
551	// Whatever happens, delete the checkpoint from containerd
552	defer func() {
553		err := c.client.ImageService().Delete(context.Background(), img.Name())
554		if err != nil {
555			c.logger.WithError(err).WithField("digest", img.Target().Digest).
556				Warnf("failed to delete checkpoint image")
557		}
558	}()
559
560	b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target())
561	if err != nil {
562		return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
563	}
564	var index v1.Index
565	if err := json.Unmarshal(b, &index); err != nil {
566		return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
567	}
568
569	var cpDesc *v1.Descriptor
570	for _, m := range index.Manifests {
571		if m.MediaType == images.MediaTypeContainerd1Checkpoint {
572			cpDesc = &m
573			break
574		}
575	}
576	if cpDesc == nil {
577		return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
578	}
579
580	rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc)
581	if err != nil {
582		return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
583	}
584	defer rat.Close()
585	_, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
586	if err != nil {
587		return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
588	}
589
590	return err
591}
592
593func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) {
594	ctr, err := c.client.LoadContainer(ctx, id)
595	if err != nil {
596		if containerderrors.IsNotFound(err) {
597			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
598		}
599		return nil, wrapError(err)
600	}
601	return ctr, nil
602}
603
604func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) {
605	ctr, err := c.getContainer(ctx, containerID)
606	if err != nil {
607		return nil, err
608	}
609	t, err := ctr.Task(ctx, nil)
610	if err != nil {
611		if containerderrors.IsNotFound(err) {
612			return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
613		}
614		return nil, wrapError(err)
615	}
616	if processID == libcontainerdtypes.InitProcessName {
617		return t, nil
618	}
619	p, err := t.LoadProcess(ctx, processID, nil)
620	if err != nil {
621		if containerderrors.IsNotFound(err) {
622			return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
623		}
624		return nil, wrapError(err)
625	}
626	return p, nil
627}
628
629// createIO creates the io to be used by a process
630// This needs to get a pointer to interface as upon closure the process may not have yet been registered
631func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
632	var (
633		io  *cio.DirectIO
634		err error
635	)
636	io, err = c.newDirectIO(context.Background(), fifos)
637	if err != nil {
638		return nil, err
639	}
640
641	if io.Stdin != nil {
642		var (
643			err       error
644			stdinOnce sync.Once
645		)
646		pipe := io.Stdin
647		io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
648			stdinOnce.Do(func() {
649				err = pipe.Close()
650				// Do the rest in a new routine to avoid a deadlock if the
651				// Exec/Start call failed.
652				go func() {
653					<-stdinCloseSync
654					p, err := c.getProcess(context.Background(), containerID, processID)
655					if err == nil {
656						err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
657						if err != nil && strings.Contains(err.Error(), "transport is closing") {
658							err = nil
659						}
660					}
661				}()
662			})
663			return err
664		})
665	}
666
667	rio, err := attachStdio(io)
668	if err != nil {
669		io.Cancel()
670		io.Close()
671	}
672	return rio, err
673}
674
675func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
676	c.eventQ.Append(ei.ContainerID, func() {
677		err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
678		if err != nil {
679			c.logger.WithError(err).WithFields(logrus.Fields{
680				"container":  ei.ContainerID,
681				"event":      et,
682				"event-info": ei,
683			}).Error("failed to process event")
684		}
685
686		if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
687			p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
688			if err != nil {
689
690				c.logger.WithError(errors.New("no such process")).
691					WithFields(logrus.Fields{
692						"error":     err,
693						"container": ei.ContainerID,
694						"process":   ei.ProcessID,
695					}).Error("exit event")
696				return
697			}
698
699			ctr, err := c.getContainer(ctx, ei.ContainerID)
700			if err != nil {
701				c.logger.WithFields(logrus.Fields{
702					"container": ei.ContainerID,
703					"error":     err,
704				}).Error("failed to find container")
705			} else {
706				labels, err := ctr.Labels(ctx)
707				if err != nil {
708					c.logger.WithFields(logrus.Fields{
709						"container": ei.ContainerID,
710						"error":     err,
711					}).Error("failed to get container labels")
712					return
713				}
714				newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
715			}
716			_, err = p.Delete(context.Background())
717			if err != nil {
718				c.logger.WithError(err).WithFields(logrus.Fields{
719					"container": ei.ContainerID,
720					"process":   ei.ProcessID,
721				}).Warn("failed to delete process")
722			}
723		}
724	})
725}
726
727func (c *client) waitServe(ctx context.Context) bool {
728	t := 100 * time.Millisecond
729	delay := time.NewTimer(t)
730	if !delay.Stop() {
731		<-delay.C
732	}
733	defer delay.Stop()
734
735	// `IsServing` will actually block until the service is ready.
736	// However it can return early, so we'll loop with a delay to handle it.
737	for {
738		serving, err := c.client.IsServing(ctx)
739		if err != nil {
740			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
741				return false
742			}
743			logrus.WithError(err).Warn("Error while testing if containerd API is ready")
744		}
745
746		if serving {
747			return true
748		}
749
750		delay.Reset(t)
751		select {
752		case <-ctx.Done():
753			return false
754		case <-delay.C:
755		}
756	}
757}
758
759func (c *client) processEventStream(ctx context.Context, ns string) {
760	var (
761		err error
762		ev  *events.Envelope
763		et  libcontainerdtypes.EventType
764		ei  libcontainerdtypes.EventInfo
765	)
766
767	// Create a new context specifically for this subscription.
768	// The context must be cancelled to cancel the subscription.
769	// In cases where we have to restart event stream processing,
770	//   we'll need the original context b/c this one will be cancelled
771	subCtx, cancel := context.WithCancel(ctx)
772	defer cancel()
773
774	// Filter on both namespace *and* topic. To create an "and" filter,
775	// this must be a single, comma-separated string
776	eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
777
778	c.logger.Debug("processing event stream")
779
780	for {
781		var oomKilled bool
782		select {
783		case err = <-errC:
784			if err != nil {
785				errStatus, ok := status.FromError(err)
786				if !ok || errStatus.Code() != codes.Canceled {
787					c.logger.WithError(err).Error("Failed to get event")
788					c.logger.Info("Waiting for containerd to be ready to restart event processing")
789					if c.waitServe(ctx) {
790						go c.processEventStream(ctx, ns)
791						return
792					}
793				}
794				c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
795			}
796			return
797		case ev = <-eventStream:
798			if ev.Event == nil {
799				c.logger.WithField("event", ev).Warn("invalid event")
800				continue
801			}
802
803			v, err := typeurl.UnmarshalAny(ev.Event)
804			if err != nil {
805				c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
806				continue
807			}
808
809			c.logger.WithField("topic", ev.Topic).Debug("event")
810
811			switch t := v.(type) {
812			case *apievents.TaskCreate:
813				et = libcontainerdtypes.EventCreate
814				ei = libcontainerdtypes.EventInfo{
815					ContainerID: t.ContainerID,
816					ProcessID:   t.ContainerID,
817					Pid:         t.Pid,
818				}
819			case *apievents.TaskStart:
820				et = libcontainerdtypes.EventStart
821				ei = libcontainerdtypes.EventInfo{
822					ContainerID: t.ContainerID,
823					ProcessID:   t.ContainerID,
824					Pid:         t.Pid,
825				}
826			case *apievents.TaskExit:
827				et = libcontainerdtypes.EventExit
828				ei = libcontainerdtypes.EventInfo{
829					ContainerID: t.ContainerID,
830					ProcessID:   t.ID,
831					Pid:         t.Pid,
832					ExitCode:    t.ExitStatus,
833					ExitedAt:    t.ExitedAt,
834				}
835			case *apievents.TaskOOM:
836				et = libcontainerdtypes.EventOOM
837				ei = libcontainerdtypes.EventInfo{
838					ContainerID: t.ContainerID,
839					OOMKilled:   true,
840				}
841				oomKilled = true
842			case *apievents.TaskExecAdded:
843				et = libcontainerdtypes.EventExecAdded
844				ei = libcontainerdtypes.EventInfo{
845					ContainerID: t.ContainerID,
846					ProcessID:   t.ExecID,
847				}
848			case *apievents.TaskExecStarted:
849				et = libcontainerdtypes.EventExecStarted
850				ei = libcontainerdtypes.EventInfo{
851					ContainerID: t.ContainerID,
852					ProcessID:   t.ExecID,
853					Pid:         t.Pid,
854				}
855			case *apievents.TaskPaused:
856				et = libcontainerdtypes.EventPaused
857				ei = libcontainerdtypes.EventInfo{
858					ContainerID: t.ContainerID,
859				}
860			case *apievents.TaskResumed:
861				et = libcontainerdtypes.EventResumed
862				ei = libcontainerdtypes.EventInfo{
863					ContainerID: t.ContainerID,
864				}
865			case *apievents.TaskDelete:
866				c.logger.WithFields(logrus.Fields{
867					"topic":     ev.Topic,
868					"type":      reflect.TypeOf(t),
869					"container": t.ContainerID},
870				).Info("ignoring event")
871				continue
872			default:
873				c.logger.WithFields(logrus.Fields{
874					"topic": ev.Topic,
875					"type":  reflect.TypeOf(t)},
876				).Info("ignoring event")
877				continue
878			}
879
880			c.oomMu.Lock()
881			if oomKilled {
882				c.oom[ei.ContainerID] = true
883			}
884			ei.OOMKilled = c.oom[ei.ContainerID]
885			c.oomMu.Unlock()
886
887			c.processEvent(ctx, et, ei)
888		}
889	}
890}
891
892func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
893	writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
894	if err != nil {
895		return nil, err
896	}
897	defer writer.Close()
898	size, err := io.Copy(writer, r)
899	if err != nil {
900		return nil, err
901	}
902	labels := map[string]string{
903		"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
904	}
905	if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
906		return nil, err
907	}
908	return &types.Descriptor{
909		MediaType: mediaType,
910		Digest:    writer.Digest(),
911		Size_:     size,
912	}, nil
913}
914
915func (c *client) bundleDir(id string) string {
916	return filepath.Join(c.stateDir, id)
917}
918
919func wrapError(err error) error {
920	switch {
921	case err == nil:
922		return nil
923	case containerderrors.IsNotFound(err):
924		return errdefs.NotFound(err)
925	}
926
927	msg := err.Error()
928	for _, s := range []string{"container does not exist", "not found", "no such container"} {
929		if strings.Contains(msg, s) {
930			return errdefs.NotFound(err)
931		}
932	}
933	return err
934}
935