1// +build linux
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package v2
20
21import (
22	"context"
23	"encoding/json"
24	"io/ioutil"
25	"os"
26	"os/exec"
27	"path/filepath"
28	"sync"
29	"syscall"
30	"time"
31
32	"github.com/containerd/cgroups"
33	cgroupsv2 "github.com/containerd/cgroups/v2"
34	eventstypes "github.com/containerd/containerd/api/events"
35	"github.com/containerd/containerd/api/types/task"
36	"github.com/containerd/containerd/errdefs"
37	"github.com/containerd/containerd/mount"
38	"github.com/containerd/containerd/namespaces"
39	"github.com/containerd/containerd/pkg/oom"
40	oomv1 "github.com/containerd/containerd/pkg/oom/v1"
41	oomv2 "github.com/containerd/containerd/pkg/oom/v2"
42	"github.com/containerd/containerd/pkg/process"
43	"github.com/containerd/containerd/pkg/stdio"
44	"github.com/containerd/containerd/runtime/v2/runc"
45	"github.com/containerd/containerd/runtime/v2/runc/options"
46	"github.com/containerd/containerd/runtime/v2/shim"
47	taskAPI "github.com/containerd/containerd/runtime/v2/task"
48	"github.com/containerd/containerd/sys"
49	"github.com/containerd/containerd/sys/reaper"
50	runcC "github.com/containerd/go-runc"
51	"github.com/containerd/typeurl"
52	"github.com/gogo/protobuf/proto"
53	ptypes "github.com/gogo/protobuf/types"
54	"github.com/pkg/errors"
55	"github.com/sirupsen/logrus"
56	"golang.org/x/sys/unix"
57)
58
59var (
60	_     = (taskAPI.TaskService)(&service{})
61	empty = &ptypes.Empty{}
62)
63
64// group labels specifies how the shim groups services.
65// currently supports a runc.v2 specific .group label and the
66// standard k8s pod label.  Order matters in this list
67var groupLabels = []string{
68	"io.containerd.runc.v2.group",
69	"io.kubernetes.cri.sandbox-id",
70}
71
72type spec struct {
73	Annotations map[string]string `json:"annotations,omitempty"`
74}
75
76// New returns a new shim service that can be used via GRPC
77func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
78	var (
79		ep  oom.Watcher
80		err error
81	)
82	if cgroups.Mode() == cgroups.Unified {
83		ep, err = oomv2.New(publisher)
84	} else {
85		ep, err = oomv1.New(publisher)
86	}
87	if err != nil {
88		return nil, err
89	}
90	go ep.Run(ctx)
91	s := &service{
92		id:         id,
93		context:    ctx,
94		events:     make(chan interface{}, 128),
95		ec:         reaper.Default.Subscribe(),
96		ep:         ep,
97		cancel:     shutdown,
98		containers: make(map[string]*runc.Container),
99	}
100	go s.processExits()
101	runcC.Monitor = reaper.Default
102	if err := s.initPlatform(); err != nil {
103		shutdown()
104		return nil, errors.Wrap(err, "failed to initialized platform behavior")
105	}
106	go s.forward(ctx, publisher)
107
108	if address, err := shim.ReadAddress("address"); err == nil {
109		s.shimAddress = address
110	}
111	return s, nil
112}
113
114// service is the shim implementation of a remote shim over GRPC
115type service struct {
116	mu          sync.Mutex
117	eventSendMu sync.Mutex
118
119	context  context.Context
120	events   chan interface{}
121	platform stdio.Platform
122	ec       chan runcC.Exit
123	ep       oom.Watcher
124
125	// id only used in cleanup case
126	id string
127
128	containers map[string]*runc.Container
129
130	shimAddress string
131	cancel      func()
132}
133
134func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
135	ns, err := namespaces.NamespaceRequired(ctx)
136	if err != nil {
137		return nil, err
138	}
139	self, err := os.Executable()
140	if err != nil {
141		return nil, err
142	}
143	cwd, err := os.Getwd()
144	if err != nil {
145		return nil, err
146	}
147	args := []string{
148		"-namespace", ns,
149		"-id", id,
150		"-address", containerdAddress,
151	}
152	cmd := exec.Command(self, args...)
153	cmd.Dir = cwd
154	cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
155	cmd.SysProcAttr = &syscall.SysProcAttr{
156		Setpgid: true,
157	}
158	return cmd, nil
159}
160
161func readSpec() (*spec, error) {
162	f, err := os.Open("config.json")
163	if err != nil {
164		return nil, err
165	}
166	defer f.Close()
167	var s spec
168	if err := json.NewDecoder(f).Decode(&s); err != nil {
169		return nil, err
170	}
171	return &s, nil
172}
173
174func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (_ string, retErr error) {
175	cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
176	if err != nil {
177		return "", err
178	}
179	grouping := id
180	spec, err := readSpec()
181	if err != nil {
182		return "", err
183	}
184	for _, group := range groupLabels {
185		if groupID, ok := spec.Annotations[group]; ok {
186			grouping = groupID
187			break
188		}
189	}
190	address, err := shim.SocketAddress(ctx, containerdAddress, grouping)
191	if err != nil {
192		return "", err
193	}
194
195	socket, err := shim.NewSocket(address)
196	if err != nil {
197		// the only time where this would happen is if there is a bug and the socket
198		// was not cleaned up in the cleanup method of the shim or we are using the
199		// grouping functionality where the new process should be run with the same
200		// shim as an existing container
201		if !shim.SocketEaddrinuse(err) {
202			return "", errors.Wrap(err, "create new shim socket")
203		}
204		if shim.CanConnect(address) {
205			if err := shim.WriteAddress("address", address); err != nil {
206				return "", errors.Wrap(err, "write existing socket for shim")
207			}
208			return address, nil
209		}
210		if err := shim.RemoveSocket(address); err != nil {
211			return "", errors.Wrap(err, "remove pre-existing socket")
212		}
213		if socket, err = shim.NewSocket(address); err != nil {
214			return "", errors.Wrap(err, "try create new shim socket 2x")
215		}
216	}
217	defer func() {
218		if retErr != nil {
219			socket.Close()
220			_ = shim.RemoveSocket(address)
221		}
222	}()
223	f, err := socket.File()
224	if err != nil {
225		return "", err
226	}
227
228	cmd.ExtraFiles = append(cmd.ExtraFiles, f)
229
230	if err := cmd.Start(); err != nil {
231		f.Close()
232		return "", err
233	}
234	defer func() {
235		if retErr != nil {
236			cmd.Process.Kill()
237		}
238	}()
239	// make sure to wait after start
240	go cmd.Wait()
241	if err := shim.WriteAddress("address", address); err != nil {
242		return "", err
243	}
244	if data, err := ioutil.ReadAll(os.Stdin); err == nil {
245		if len(data) > 0 {
246			var any ptypes.Any
247			if err := proto.Unmarshal(data, &any); err != nil {
248				return "", err
249			}
250			v, err := typeurl.UnmarshalAny(&any)
251			if err != nil {
252				return "", err
253			}
254			if opts, ok := v.(*options.Options); ok {
255				if opts.ShimCgroup != "" {
256					if cgroups.Mode() == cgroups.Unified {
257						if err := cgroupsv2.VerifyGroupPath(opts.ShimCgroup); err != nil {
258							return "", errors.Wrapf(err, "failed to verify cgroup path %q", opts.ShimCgroup)
259						}
260						cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup)
261						if err != nil {
262							return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
263						}
264						if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
265							return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
266						}
267					} else {
268						cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
269						if err != nil {
270							return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
271						}
272						if err := cg.Add(cgroups.Process{
273							Pid: cmd.Process.Pid,
274						}); err != nil {
275							return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
276						}
277					}
278				}
279			}
280		}
281	}
282	if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
283		return "", errors.Wrap(err, "failed to adjust OOM score for shim")
284	}
285	return address, nil
286}
287
288func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
289	cwd, err := os.Getwd()
290	if err != nil {
291		return nil, err
292	}
293	path := filepath.Join(filepath.Dir(cwd), s.id)
294	ns, err := namespaces.NamespaceRequired(ctx)
295	if err != nil {
296		return nil, err
297	}
298	runtime, err := runc.ReadRuntime(path)
299	if err != nil {
300		return nil, err
301	}
302	opts, err := runc.ReadOptions(path)
303	if err != nil {
304		return nil, err
305	}
306	root := process.RuncRoot
307	if opts != nil && opts.Root != "" {
308		root = opts.Root
309	}
310
311	r := process.NewRunc(root, path, ns, runtime, "", false)
312	if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
313		Force: true,
314	}); err != nil {
315		logrus.WithError(err).Warn("failed to remove runc container")
316	}
317	if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
318		logrus.WithError(err).Warn("failed to cleanup rootfs mount")
319	}
320	return &taskAPI.DeleteResponse{
321		ExitedAt:   time.Now(),
322		ExitStatus: 128 + uint32(unix.SIGKILL),
323	}, nil
324}
325
326// Create a new initial process and container with the underlying OCI runtime
327func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
328	s.mu.Lock()
329	defer s.mu.Unlock()
330
331	container, err := runc.NewContainer(ctx, s.platform, r)
332	if err != nil {
333		return nil, err
334	}
335
336	s.containers[r.ID] = container
337
338	s.send(&eventstypes.TaskCreate{
339		ContainerID: r.ID,
340		Bundle:      r.Bundle,
341		Rootfs:      r.Rootfs,
342		IO: &eventstypes.TaskIO{
343			Stdin:    r.Stdin,
344			Stdout:   r.Stdout,
345			Stderr:   r.Stderr,
346			Terminal: r.Terminal,
347		},
348		Checkpoint: r.Checkpoint,
349		Pid:        uint32(container.Pid()),
350	})
351
352	return &taskAPI.CreateTaskResponse{
353		Pid: uint32(container.Pid()),
354	}, nil
355}
356
357// Start a process
358func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
359	container, err := s.getContainer(r.ID)
360	if err != nil {
361		return nil, err
362	}
363
364	// hold the send lock so that the start events are sent before any exit events in the error case
365	s.eventSendMu.Lock()
366	p, err := container.Start(ctx, r)
367	if err != nil {
368		s.eventSendMu.Unlock()
369		return nil, errdefs.ToGRPC(err)
370	}
371
372	switch r.ExecID {
373	case "":
374		switch cg := container.Cgroup().(type) {
375		case cgroups.Cgroup:
376			if err := s.ep.Add(container.ID, cg); err != nil {
377				logrus.WithError(err).Error("add cg to OOM monitor")
378			}
379		case *cgroupsv2.Manager:
380			allControllers, err := cg.RootControllers()
381			if err != nil {
382				logrus.WithError(err).Error("failed to get root controllers")
383			} else {
384				if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
385					if sys.RunningInUserNS() {
386						logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
387					} else {
388						logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
389					}
390				}
391			}
392			if err := s.ep.Add(container.ID, cg); err != nil {
393				logrus.WithError(err).Error("add cg to OOM monitor")
394			}
395		}
396
397		s.send(&eventstypes.TaskStart{
398			ContainerID: container.ID,
399			Pid:         uint32(p.Pid()),
400		})
401	default:
402		s.send(&eventstypes.TaskExecStarted{
403			ContainerID: container.ID,
404			ExecID:      r.ExecID,
405			Pid:         uint32(p.Pid()),
406		})
407	}
408	s.eventSendMu.Unlock()
409	return &taskAPI.StartResponse{
410		Pid: uint32(p.Pid()),
411	}, nil
412}
413
414// Delete the initial process and container
415func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
416	container, err := s.getContainer(r.ID)
417	if err != nil {
418		return nil, err
419	}
420	p, err := container.Delete(ctx, r)
421	if err != nil {
422		return nil, errdefs.ToGRPC(err)
423	}
424	// if we deleted an init task, send the task delete event
425	if r.ExecID == "" {
426		s.mu.Lock()
427		delete(s.containers, r.ID)
428		s.mu.Unlock()
429		s.send(&eventstypes.TaskDelete{
430			ContainerID: container.ID,
431			Pid:         uint32(p.Pid()),
432			ExitStatus:  uint32(p.ExitStatus()),
433			ExitedAt:    p.ExitedAt(),
434		})
435	}
436	return &taskAPI.DeleteResponse{
437		ExitStatus: uint32(p.ExitStatus()),
438		ExitedAt:   p.ExitedAt(),
439		Pid:        uint32(p.Pid()),
440	}, nil
441}
442
443// Exec an additional process inside the container
444func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
445	container, err := s.getContainer(r.ID)
446	if err != nil {
447		return nil, err
448	}
449	ok, cancel := container.ReserveProcess(r.ExecID)
450	if !ok {
451		return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
452	}
453	process, err := container.Exec(ctx, r)
454	if err != nil {
455		cancel()
456		return nil, errdefs.ToGRPC(err)
457	}
458
459	s.send(&eventstypes.TaskExecAdded{
460		ContainerID: container.ID,
461		ExecID:      process.ID(),
462	})
463	return empty, nil
464}
465
466// ResizePty of a process
467func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
468	container, err := s.getContainer(r.ID)
469	if err != nil {
470		return nil, err
471	}
472	if err := container.ResizePty(ctx, r); err != nil {
473		return nil, errdefs.ToGRPC(err)
474	}
475	return empty, nil
476}
477
478// State returns runtime state information for a process
479func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
480	container, err := s.getContainer(r.ID)
481	if err != nil {
482		return nil, err
483	}
484	p, err := container.Process(r.ExecID)
485	if err != nil {
486		return nil, err
487	}
488	st, err := p.Status(ctx)
489	if err != nil {
490		return nil, err
491	}
492	status := task.StatusUnknown
493	switch st {
494	case "created":
495		status = task.StatusCreated
496	case "running":
497		status = task.StatusRunning
498	case "stopped":
499		status = task.StatusStopped
500	case "paused":
501		status = task.StatusPaused
502	case "pausing":
503		status = task.StatusPausing
504	}
505	sio := p.Stdio()
506	return &taskAPI.StateResponse{
507		ID:         p.ID(),
508		Bundle:     container.Bundle,
509		Pid:        uint32(p.Pid()),
510		Status:     status,
511		Stdin:      sio.Stdin,
512		Stdout:     sio.Stdout,
513		Stderr:     sio.Stderr,
514		Terminal:   sio.Terminal,
515		ExitStatus: uint32(p.ExitStatus()),
516		ExitedAt:   p.ExitedAt(),
517	}, nil
518}
519
520// Pause the container
521func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
522	container, err := s.getContainer(r.ID)
523	if err != nil {
524		return nil, err
525	}
526	if err := container.Pause(ctx); err != nil {
527		return nil, errdefs.ToGRPC(err)
528	}
529	s.send(&eventstypes.TaskPaused{
530		ContainerID: container.ID,
531	})
532	return empty, nil
533}
534
535// Resume the container
536func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
537	container, err := s.getContainer(r.ID)
538	if err != nil {
539		return nil, err
540	}
541	if err := container.Resume(ctx); err != nil {
542		return nil, errdefs.ToGRPC(err)
543	}
544	s.send(&eventstypes.TaskResumed{
545		ContainerID: container.ID,
546	})
547	return empty, nil
548}
549
550// Kill a process with the provided signal
551func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
552	container, err := s.getContainer(r.ID)
553	if err != nil {
554		return nil, err
555	}
556	if err := container.Kill(ctx, r); err != nil {
557		return nil, errdefs.ToGRPC(err)
558	}
559	return empty, nil
560}
561
562// Pids returns all pids inside the container
563func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
564	container, err := s.getContainer(r.ID)
565	if err != nil {
566		return nil, err
567	}
568	pids, err := s.getContainerPids(ctx, r.ID)
569	if err != nil {
570		return nil, errdefs.ToGRPC(err)
571	}
572	var processes []*task.ProcessInfo
573	for _, pid := range pids {
574		pInfo := task.ProcessInfo{
575			Pid: pid,
576		}
577		for _, p := range container.ExecdProcesses() {
578			if p.Pid() == int(pid) {
579				d := &options.ProcessDetails{
580					ExecID: p.ID(),
581				}
582				a, err := typeurl.MarshalAny(d)
583				if err != nil {
584					return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
585				}
586				pInfo.Info = a
587				break
588			}
589		}
590		processes = append(processes, &pInfo)
591	}
592	return &taskAPI.PidsResponse{
593		Processes: processes,
594	}, nil
595}
596
597// CloseIO of a process
598func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
599	container, err := s.getContainer(r.ID)
600	if err != nil {
601		return nil, err
602	}
603	if err := container.CloseIO(ctx, r); err != nil {
604		return nil, err
605	}
606	return empty, nil
607}
608
609// Checkpoint the container
610func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
611	container, err := s.getContainer(r.ID)
612	if err != nil {
613		return nil, err
614	}
615	if err := container.Checkpoint(ctx, r); err != nil {
616		return nil, errdefs.ToGRPC(err)
617	}
618	return empty, nil
619}
620
621// Update a running container
622func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
623	container, err := s.getContainer(r.ID)
624	if err != nil {
625		return nil, err
626	}
627	if err := container.Update(ctx, r); err != nil {
628		return nil, errdefs.ToGRPC(err)
629	}
630	return empty, nil
631}
632
633// Wait for a process to exit
634func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
635	container, err := s.getContainer(r.ID)
636	if err != nil {
637		return nil, err
638	}
639	p, err := container.Process(r.ExecID)
640	if err != nil {
641		return nil, errdefs.ToGRPC(err)
642	}
643	p.Wait()
644
645	return &taskAPI.WaitResponse{
646		ExitStatus: uint32(p.ExitStatus()),
647		ExitedAt:   p.ExitedAt(),
648	}, nil
649}
650
651// Connect returns shim information such as the shim's pid
652func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
653	var pid int
654	if container, err := s.getContainer(r.ID); err == nil {
655		pid = container.Pid()
656	}
657	return &taskAPI.ConnectResponse{
658		ShimPid: uint32(os.Getpid()),
659		TaskPid: uint32(pid),
660	}, nil
661}
662
663func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
664	s.mu.Lock()
665	// return out if the shim is still servicing containers
666	if len(s.containers) > 0 {
667		s.mu.Unlock()
668		return empty, nil
669	}
670	s.cancel()
671	close(s.events)
672
673	if s.platform != nil {
674		s.platform.Close()
675	}
676	if s.shimAddress != "" {
677		_ = shim.RemoveSocket(s.shimAddress)
678	}
679	return empty, nil
680}
681
682func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
683	container, err := s.getContainer(r.ID)
684	if err != nil {
685		return nil, err
686	}
687	cgx := container.Cgroup()
688	if cgx == nil {
689		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
690	}
691	var statsx interface{}
692	switch cg := cgx.(type) {
693	case cgroups.Cgroup:
694		stats, err := cg.Stat(cgroups.IgnoreNotExist)
695		if err != nil {
696			return nil, err
697		}
698		statsx = stats
699	case *cgroupsv2.Manager:
700		stats, err := cg.Stat()
701		if err != nil {
702			return nil, err
703		}
704		statsx = stats
705	default:
706		return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
707	}
708	data, err := typeurl.MarshalAny(statsx)
709	if err != nil {
710		return nil, err
711	}
712	return &taskAPI.StatsResponse{
713		Stats: data,
714	}, nil
715}
716
717func (s *service) processExits() {
718	for e := range s.ec {
719		s.checkProcesses(e)
720	}
721}
722
723func (s *service) send(evt interface{}) {
724	s.events <- evt
725}
726
727func (s *service) sendL(evt interface{}) {
728	s.eventSendMu.Lock()
729	s.events <- evt
730	s.eventSendMu.Unlock()
731}
732
733func (s *service) checkProcesses(e runcC.Exit) {
734	s.mu.Lock()
735	defer s.mu.Unlock()
736
737	for _, container := range s.containers {
738		if !container.HasPid(e.Pid) {
739			continue
740		}
741
742		for _, p := range container.All() {
743			if p.Pid() != e.Pid {
744				continue
745			}
746
747			if ip, ok := p.(*process.Init); ok {
748				// Ensure all children are killed
749				if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
750					if err := ip.KillAll(s.context); err != nil {
751						logrus.WithError(err).WithField("id", ip.ID()).
752							Error("failed to kill init's children")
753					}
754				}
755			}
756
757			p.SetExited(e.Status)
758			s.sendL(&eventstypes.TaskExit{
759				ContainerID: container.ID,
760				ID:          p.ID(),
761				Pid:         uint32(e.Pid),
762				ExitStatus:  uint32(e.Status),
763				ExitedAt:    p.ExitedAt(),
764			})
765			return
766		}
767		return
768	}
769}
770
771func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
772	container, err := s.getContainer(id)
773	if err != nil {
774		return nil, err
775	}
776	p, err := container.Process("")
777	if err != nil {
778		return nil, errdefs.ToGRPC(err)
779	}
780	ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
781	if err != nil {
782		return nil, err
783	}
784	pids := make([]uint32, 0, len(ps))
785	for _, pid := range ps {
786		pids = append(pids, uint32(pid))
787	}
788	return pids, nil
789}
790
791func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
792	ns, _ := namespaces.Namespace(ctx)
793	ctx = namespaces.WithNamespace(context.Background(), ns)
794	for e := range s.events {
795		err := publisher.Publish(ctx, runc.GetTopic(e), e)
796		if err != nil {
797			logrus.WithError(err).Error("post event")
798		}
799	}
800	publisher.Close()
801}
802
803func (s *service) getContainer(id string) (*runc.Container, error) {
804	s.mu.Lock()
805	container := s.containers[id]
806	s.mu.Unlock()
807	if container == nil {
808		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
809	}
810	return container, nil
811}
812
813// initialize a single epoll fd to manage our consoles. `initPlatform` should
814// only be called once.
815func (s *service) initPlatform() error {
816	if s.platform != nil {
817		return nil
818	}
819	p, err := runc.NewPlatform()
820	if err != nil {
821		return err
822	}
823	s.platform = p
824	return nil
825}
826