1// +build linux
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package v2
20
21import (
22	"context"
23	"encoding/json"
24	"io/ioutil"
25	"os"
26	"os/exec"
27	"path/filepath"
28	"sync"
29	"syscall"
30	"time"
31
32	"github.com/containerd/cgroups"
33	cgroupsv2 "github.com/containerd/cgroups/v2"
34	eventstypes "github.com/containerd/containerd/api/events"
35	"github.com/containerd/containerd/api/types/task"
36	"github.com/containerd/containerd/errdefs"
37	"github.com/containerd/containerd/mount"
38	"github.com/containerd/containerd/namespaces"
39	"github.com/containerd/containerd/pkg/oom"
40	oomv1 "github.com/containerd/containerd/pkg/oom/v1"
41	oomv2 "github.com/containerd/containerd/pkg/oom/v2"
42	"github.com/containerd/containerd/pkg/process"
43	"github.com/containerd/containerd/pkg/stdio"
44	"github.com/containerd/containerd/pkg/userns"
45	"github.com/containerd/containerd/runtime/v2/runc"
46	"github.com/containerd/containerd/runtime/v2/runc/options"
47	"github.com/containerd/containerd/runtime/v2/shim"
48	taskAPI "github.com/containerd/containerd/runtime/v2/task"
49	"github.com/containerd/containerd/sys/reaper"
50	runcC "github.com/containerd/go-runc"
51	"github.com/containerd/typeurl"
52	"github.com/gogo/protobuf/proto"
53	ptypes "github.com/gogo/protobuf/types"
54	"github.com/pkg/errors"
55	"github.com/sirupsen/logrus"
56	"golang.org/x/sys/unix"
57)
58
59var (
60	_     = (taskAPI.TaskService)(&service{})
61	empty = &ptypes.Empty{}
62)
63
64// group labels specifies how the shim groups services.
65// currently supports a runc.v2 specific .group label and the
66// standard k8s pod label.  Order matters in this list
67var groupLabels = []string{
68	"io.containerd.runc.v2.group",
69	"io.kubernetes.cri.sandbox-id",
70}
71
72type spec struct {
73	Annotations map[string]string `json:"annotations,omitempty"`
74}
75
76// New returns a new shim service that can be used via GRPC
77func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
78	var (
79		ep  oom.Watcher
80		err error
81	)
82	if cgroups.Mode() == cgroups.Unified {
83		ep, err = oomv2.New(publisher)
84	} else {
85		ep, err = oomv1.New(publisher)
86	}
87	if err != nil {
88		return nil, err
89	}
90	go ep.Run(ctx)
91	s := &service{
92		id:         id,
93		context:    ctx,
94		events:     make(chan interface{}, 128),
95		ec:         reaper.Default.Subscribe(),
96		ep:         ep,
97		cancel:     shutdown,
98		containers: make(map[string]*runc.Container),
99	}
100	go s.processExits()
101	runcC.Monitor = reaper.Default
102	if err := s.initPlatform(); err != nil {
103		shutdown()
104		return nil, errors.Wrap(err, "failed to initialized platform behavior")
105	}
106	go s.forward(ctx, publisher)
107
108	if address, err := shim.ReadAddress("address"); err == nil {
109		s.shimAddress = address
110	}
111	return s, nil
112}
113
114// service is the shim implementation of a remote shim over GRPC
115type service struct {
116	mu          sync.Mutex
117	eventSendMu sync.Mutex
118
119	context  context.Context
120	events   chan interface{}
121	platform stdio.Platform
122	ec       chan runcC.Exit
123	ep       oom.Watcher
124
125	// id only used in cleanup case
126	id string
127
128	containers map[string]*runc.Container
129
130	shimAddress string
131	cancel      func()
132}
133
134func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
135	ns, err := namespaces.NamespaceRequired(ctx)
136	if err != nil {
137		return nil, err
138	}
139	self, err := os.Executable()
140	if err != nil {
141		return nil, err
142	}
143	cwd, err := os.Getwd()
144	if err != nil {
145		return nil, err
146	}
147	args := []string{
148		"-namespace", ns,
149		"-id", id,
150		"-address", containerdAddress,
151	}
152	cmd := exec.Command(self, args...)
153	cmd.Dir = cwd
154	cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
155	cmd.SysProcAttr = &syscall.SysProcAttr{
156		Setpgid: true,
157	}
158	return cmd, nil
159}
160
161func readSpec() (*spec, error) {
162	f, err := os.Open("config.json")
163	if err != nil {
164		return nil, err
165	}
166	defer f.Close()
167	var s spec
168	if err := json.NewDecoder(f).Decode(&s); err != nil {
169		return nil, err
170	}
171	return &s, nil
172}
173
174func (s *service) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) {
175	cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress)
176	if err != nil {
177		return "", err
178	}
179	grouping := opts.ID
180	spec, err := readSpec()
181	if err != nil {
182		return "", err
183	}
184	for _, group := range groupLabels {
185		if groupID, ok := spec.Annotations[group]; ok {
186			grouping = groupID
187			break
188		}
189	}
190	address, err := shim.SocketAddress(ctx, opts.Address, grouping)
191	if err != nil {
192		return "", err
193	}
194
195	socket, err := shim.NewSocket(address)
196	if err != nil {
197		// the only time where this would happen is if there is a bug and the socket
198		// was not cleaned up in the cleanup method of the shim or we are using the
199		// grouping functionality where the new process should be run with the same
200		// shim as an existing container
201		if !shim.SocketEaddrinuse(err) {
202			return "", errors.Wrap(err, "create new shim socket")
203		}
204		if shim.CanConnect(address) {
205			if err := shim.WriteAddress("address", address); err != nil {
206				return "", errors.Wrap(err, "write existing socket for shim")
207			}
208			return address, nil
209		}
210		if err := shim.RemoveSocket(address); err != nil {
211			return "", errors.Wrap(err, "remove pre-existing socket")
212		}
213		if socket, err = shim.NewSocket(address); err != nil {
214			return "", errors.Wrap(err, "try create new shim socket 2x")
215		}
216	}
217	defer func() {
218		if retErr != nil {
219			socket.Close()
220			_ = shim.RemoveSocket(address)
221		}
222	}()
223
224	// make sure that reexec shim-v2 binary use the value if need
225	if err := shim.WriteAddress("address", address); err != nil {
226		return "", err
227	}
228
229	f, err := socket.File()
230	if err != nil {
231		return "", err
232	}
233
234	cmd.ExtraFiles = append(cmd.ExtraFiles, f)
235
236	if err := cmd.Start(); err != nil {
237		f.Close()
238		return "", err
239	}
240	defer func() {
241		if retErr != nil {
242			cmd.Process.Kill()
243		}
244	}()
245	// make sure to wait after start
246	go cmd.Wait()
247	if data, err := ioutil.ReadAll(os.Stdin); err == nil {
248		if len(data) > 0 {
249			var any ptypes.Any
250			if err := proto.Unmarshal(data, &any); err != nil {
251				return "", err
252			}
253			v, err := typeurl.UnmarshalAny(&any)
254			if err != nil {
255				return "", err
256			}
257			if opts, ok := v.(*options.Options); ok {
258				if opts.ShimCgroup != "" {
259					if cgroups.Mode() == cgroups.Unified {
260						if err := cgroupsv2.VerifyGroupPath(opts.ShimCgroup); err != nil {
261							return "", errors.Wrapf(err, "failed to verify cgroup path %q", opts.ShimCgroup)
262						}
263						cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup)
264						if err != nil {
265							return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
266						}
267						if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
268							return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
269						}
270					} else {
271						cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
272						if err != nil {
273							return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
274						}
275						if err := cg.Add(cgroups.Process{
276							Pid: cmd.Process.Pid,
277						}); err != nil {
278							return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
279						}
280					}
281				}
282			}
283		}
284	}
285	if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
286		return "", errors.Wrap(err, "failed to adjust OOM score for shim")
287	}
288	return address, nil
289}
290
291func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
292	cwd, err := os.Getwd()
293	if err != nil {
294		return nil, err
295	}
296
297	path := filepath.Join(filepath.Dir(cwd), s.id)
298	ns, err := namespaces.NamespaceRequired(ctx)
299	if err != nil {
300		return nil, err
301	}
302	runtime, err := runc.ReadRuntime(path)
303	if err != nil {
304		return nil, err
305	}
306	opts, err := runc.ReadOptions(path)
307	if err != nil {
308		return nil, err
309	}
310	root := process.RuncRoot
311	if opts != nil && opts.Root != "" {
312		root = opts.Root
313	}
314
315	r := process.NewRunc(root, path, ns, runtime, "", false)
316	if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
317		Force: true,
318	}); err != nil {
319		logrus.WithError(err).Warn("failed to remove runc container")
320	}
321	if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
322		logrus.WithError(err).Warn("failed to cleanup rootfs mount")
323	}
324	return &taskAPI.DeleteResponse{
325		ExitedAt:   time.Now(),
326		ExitStatus: 128 + uint32(unix.SIGKILL),
327	}, nil
328}
329
330// Create a new initial process and container with the underlying OCI runtime
331func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
332	s.mu.Lock()
333	defer s.mu.Unlock()
334
335	container, err := runc.NewContainer(ctx, s.platform, r)
336	if err != nil {
337		return nil, err
338	}
339
340	s.containers[r.ID] = container
341
342	s.send(&eventstypes.TaskCreate{
343		ContainerID: r.ID,
344		Bundle:      r.Bundle,
345		Rootfs:      r.Rootfs,
346		IO: &eventstypes.TaskIO{
347			Stdin:    r.Stdin,
348			Stdout:   r.Stdout,
349			Stderr:   r.Stderr,
350			Terminal: r.Terminal,
351		},
352		Checkpoint: r.Checkpoint,
353		Pid:        uint32(container.Pid()),
354	})
355
356	return &taskAPI.CreateTaskResponse{
357		Pid: uint32(container.Pid()),
358	}, nil
359}
360
361// Start a process
362func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
363	container, err := s.getContainer(r.ID)
364	if err != nil {
365		return nil, err
366	}
367
368	// hold the send lock so that the start events are sent before any exit events in the error case
369	s.eventSendMu.Lock()
370	p, err := container.Start(ctx, r)
371	if err != nil {
372		s.eventSendMu.Unlock()
373		return nil, errdefs.ToGRPC(err)
374	}
375
376	switch r.ExecID {
377	case "":
378		switch cg := container.Cgroup().(type) {
379		case cgroups.Cgroup:
380			if err := s.ep.Add(container.ID, cg); err != nil {
381				logrus.WithError(err).Error("add cg to OOM monitor")
382			}
383		case *cgroupsv2.Manager:
384			allControllers, err := cg.RootControllers()
385			if err != nil {
386				logrus.WithError(err).Error("failed to get root controllers")
387			} else {
388				if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
389					if userns.RunningInUserNS() {
390						logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
391					} else {
392						logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
393					}
394				}
395			}
396			if err := s.ep.Add(container.ID, cg); err != nil {
397				logrus.WithError(err).Error("add cg to OOM monitor")
398			}
399		}
400
401		s.send(&eventstypes.TaskStart{
402			ContainerID: container.ID,
403			Pid:         uint32(p.Pid()),
404		})
405	default:
406		s.send(&eventstypes.TaskExecStarted{
407			ContainerID: container.ID,
408			ExecID:      r.ExecID,
409			Pid:         uint32(p.Pid()),
410		})
411	}
412	s.eventSendMu.Unlock()
413	return &taskAPI.StartResponse{
414		Pid: uint32(p.Pid()),
415	}, nil
416}
417
418// Delete the initial process and container
419func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
420	container, err := s.getContainer(r.ID)
421	if err != nil {
422		return nil, err
423	}
424	p, err := container.Delete(ctx, r)
425	if err != nil {
426		return nil, errdefs.ToGRPC(err)
427	}
428	// if we deleted an init task, send the task delete event
429	if r.ExecID == "" {
430		s.mu.Lock()
431		delete(s.containers, r.ID)
432		s.mu.Unlock()
433		s.send(&eventstypes.TaskDelete{
434			ContainerID: container.ID,
435			Pid:         uint32(p.Pid()),
436			ExitStatus:  uint32(p.ExitStatus()),
437			ExitedAt:    p.ExitedAt(),
438		})
439	}
440	return &taskAPI.DeleteResponse{
441		ExitStatus: uint32(p.ExitStatus()),
442		ExitedAt:   p.ExitedAt(),
443		Pid:        uint32(p.Pid()),
444	}, nil
445}
446
447// Exec an additional process inside the container
448func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
449	container, err := s.getContainer(r.ID)
450	if err != nil {
451		return nil, err
452	}
453	ok, cancel := container.ReserveProcess(r.ExecID)
454	if !ok {
455		return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
456	}
457	process, err := container.Exec(ctx, r)
458	if err != nil {
459		cancel()
460		return nil, errdefs.ToGRPC(err)
461	}
462
463	s.send(&eventstypes.TaskExecAdded{
464		ContainerID: container.ID,
465		ExecID:      process.ID(),
466	})
467	return empty, nil
468}
469
470// ResizePty of a process
471func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
472	container, err := s.getContainer(r.ID)
473	if err != nil {
474		return nil, err
475	}
476	if err := container.ResizePty(ctx, r); err != nil {
477		return nil, errdefs.ToGRPC(err)
478	}
479	return empty, nil
480}
481
482// State returns runtime state information for a process
483func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
484	container, err := s.getContainer(r.ID)
485	if err != nil {
486		return nil, err
487	}
488	p, err := container.Process(r.ExecID)
489	if err != nil {
490		return nil, errdefs.ToGRPC(err)
491	}
492	st, err := p.Status(ctx)
493	if err != nil {
494		return nil, err
495	}
496	status := task.StatusUnknown
497	switch st {
498	case "created":
499		status = task.StatusCreated
500	case "running":
501		status = task.StatusRunning
502	case "stopped":
503		status = task.StatusStopped
504	case "paused":
505		status = task.StatusPaused
506	case "pausing":
507		status = task.StatusPausing
508	}
509	sio := p.Stdio()
510	return &taskAPI.StateResponse{
511		ID:         p.ID(),
512		Bundle:     container.Bundle,
513		Pid:        uint32(p.Pid()),
514		Status:     status,
515		Stdin:      sio.Stdin,
516		Stdout:     sio.Stdout,
517		Stderr:     sio.Stderr,
518		Terminal:   sio.Terminal,
519		ExitStatus: uint32(p.ExitStatus()),
520		ExitedAt:   p.ExitedAt(),
521	}, nil
522}
523
524// Pause the container
525func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
526	container, err := s.getContainer(r.ID)
527	if err != nil {
528		return nil, err
529	}
530	if err := container.Pause(ctx); err != nil {
531		return nil, errdefs.ToGRPC(err)
532	}
533	s.send(&eventstypes.TaskPaused{
534		ContainerID: container.ID,
535	})
536	return empty, nil
537}
538
539// Resume the container
540func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
541	container, err := s.getContainer(r.ID)
542	if err != nil {
543		return nil, err
544	}
545	if err := container.Resume(ctx); err != nil {
546		return nil, errdefs.ToGRPC(err)
547	}
548	s.send(&eventstypes.TaskResumed{
549		ContainerID: container.ID,
550	})
551	return empty, nil
552}
553
554// Kill a process with the provided signal
555func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
556	container, err := s.getContainer(r.ID)
557	if err != nil {
558		return nil, err
559	}
560	if err := container.Kill(ctx, r); err != nil {
561		return nil, errdefs.ToGRPC(err)
562	}
563	return empty, nil
564}
565
566// Pids returns all pids inside the container
567func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
568	container, err := s.getContainer(r.ID)
569	if err != nil {
570		return nil, err
571	}
572	pids, err := s.getContainerPids(ctx, r.ID)
573	if err != nil {
574		return nil, errdefs.ToGRPC(err)
575	}
576	var processes []*task.ProcessInfo
577	for _, pid := range pids {
578		pInfo := task.ProcessInfo{
579			Pid: pid,
580		}
581		for _, p := range container.ExecdProcesses() {
582			if p.Pid() == int(pid) {
583				d := &options.ProcessDetails{
584					ExecID: p.ID(),
585				}
586				a, err := typeurl.MarshalAny(d)
587				if err != nil {
588					return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
589				}
590				pInfo.Info = a
591				break
592			}
593		}
594		processes = append(processes, &pInfo)
595	}
596	return &taskAPI.PidsResponse{
597		Processes: processes,
598	}, nil
599}
600
601// CloseIO of a process
602func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
603	container, err := s.getContainer(r.ID)
604	if err != nil {
605		return nil, err
606	}
607	if err := container.CloseIO(ctx, r); err != nil {
608		return nil, err
609	}
610	return empty, nil
611}
612
613// Checkpoint the container
614func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
615	container, err := s.getContainer(r.ID)
616	if err != nil {
617		return nil, err
618	}
619	if err := container.Checkpoint(ctx, r); err != nil {
620		return nil, errdefs.ToGRPC(err)
621	}
622	return empty, nil
623}
624
625// Update a running container
626func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
627	container, err := s.getContainer(r.ID)
628	if err != nil {
629		return nil, err
630	}
631	if err := container.Update(ctx, r); err != nil {
632		return nil, errdefs.ToGRPC(err)
633	}
634	return empty, nil
635}
636
637// Wait for a process to exit
638func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
639	container, err := s.getContainer(r.ID)
640	if err != nil {
641		return nil, err
642	}
643	p, err := container.Process(r.ExecID)
644	if err != nil {
645		return nil, errdefs.ToGRPC(err)
646	}
647	p.Wait()
648
649	return &taskAPI.WaitResponse{
650		ExitStatus: uint32(p.ExitStatus()),
651		ExitedAt:   p.ExitedAt(),
652	}, nil
653}
654
655// Connect returns shim information such as the shim's pid
656func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
657	var pid int
658	if container, err := s.getContainer(r.ID); err == nil {
659		pid = container.Pid()
660	}
661	return &taskAPI.ConnectResponse{
662		ShimPid: uint32(os.Getpid()),
663		TaskPid: uint32(pid),
664	}, nil
665}
666
667func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
668	s.mu.Lock()
669	defer s.mu.Unlock()
670
671	// return out if the shim is still servicing containers
672	if len(s.containers) > 0 {
673		return empty, nil
674	}
675
676	if s.platform != nil {
677		s.platform.Close()
678	}
679
680	if s.shimAddress != "" {
681		_ = shim.RemoveSocket(s.shimAddress)
682	}
683
684	// please make sure that temporary resource has been cleanup
685	// before shutdown service.
686	s.cancel()
687	close(s.events)
688	return empty, nil
689}
690
691func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
692	container, err := s.getContainer(r.ID)
693	if err != nil {
694		return nil, err
695	}
696	cgx := container.Cgroup()
697	if cgx == nil {
698		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
699	}
700	var statsx interface{}
701	switch cg := cgx.(type) {
702	case cgroups.Cgroup:
703		stats, err := cg.Stat(cgroups.IgnoreNotExist)
704		if err != nil {
705			return nil, err
706		}
707		statsx = stats
708	case *cgroupsv2.Manager:
709		stats, err := cg.Stat()
710		if err != nil {
711			return nil, err
712		}
713		statsx = stats
714	default:
715		return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
716	}
717	data, err := typeurl.MarshalAny(statsx)
718	if err != nil {
719		return nil, err
720	}
721	return &taskAPI.StatsResponse{
722		Stats: data,
723	}, nil
724}
725
726func (s *service) processExits() {
727	for e := range s.ec {
728		s.checkProcesses(e)
729	}
730}
731
732func (s *service) send(evt interface{}) {
733	s.events <- evt
734}
735
736func (s *service) sendL(evt interface{}) {
737	s.eventSendMu.Lock()
738	s.events <- evt
739	s.eventSendMu.Unlock()
740}
741
742func (s *service) checkProcesses(e runcC.Exit) {
743	s.mu.Lock()
744	defer s.mu.Unlock()
745
746	for _, container := range s.containers {
747		if !container.HasPid(e.Pid) {
748			continue
749		}
750
751		for _, p := range container.All() {
752			if p.Pid() != e.Pid {
753				continue
754			}
755
756			if ip, ok := p.(*process.Init); ok {
757				// Ensure all children are killed
758				if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
759					if err := ip.KillAll(s.context); err != nil {
760						logrus.WithError(err).WithField("id", ip.ID()).
761							Error("failed to kill init's children")
762					}
763				}
764			}
765
766			p.SetExited(e.Status)
767			s.sendL(&eventstypes.TaskExit{
768				ContainerID: container.ID,
769				ID:          p.ID(),
770				Pid:         uint32(e.Pid),
771				ExitStatus:  uint32(e.Status),
772				ExitedAt:    p.ExitedAt(),
773			})
774			return
775		}
776		return
777	}
778}
779
780func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
781	container, err := s.getContainer(id)
782	if err != nil {
783		return nil, err
784	}
785	p, err := container.Process("")
786	if err != nil {
787		return nil, errdefs.ToGRPC(err)
788	}
789	ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
790	if err != nil {
791		return nil, err
792	}
793	pids := make([]uint32, 0, len(ps))
794	for _, pid := range ps {
795		pids = append(pids, uint32(pid))
796	}
797	return pids, nil
798}
799
800func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
801	ns, _ := namespaces.Namespace(ctx)
802	ctx = namespaces.WithNamespace(context.Background(), ns)
803	for e := range s.events {
804		err := publisher.Publish(ctx, runc.GetTopic(e), e)
805		if err != nil {
806			logrus.WithError(err).Error("post event")
807		}
808	}
809	publisher.Close()
810}
811
812func (s *service) getContainer(id string) (*runc.Container, error) {
813	s.mu.Lock()
814	container := s.containers[id]
815	s.mu.Unlock()
816	if container == nil {
817		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
818	}
819	return container, nil
820}
821
822// initialize a single epoll fd to manage our consoles. `initPlatform` should
823// only be called once.
824func (s *service) initPlatform() error {
825	if s.platform != nil {
826		return nil
827	}
828	p, err := runc.NewPlatform()
829	if err != nil {
830		return err
831	}
832	s.platform = p
833	return nil
834}
835