1// +build linux
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package v2
20
21import (
22	"context"
23	"encoding/json"
24	"io/ioutil"
25	"os"
26	"os/exec"
27	"path/filepath"
28	"strings"
29	"sync"
30	"syscall"
31	"time"
32
33	"github.com/containerd/cgroups"
34	eventstypes "github.com/containerd/containerd/api/events"
35	"github.com/containerd/containerd/api/types/task"
36	"github.com/containerd/containerd/errdefs"
37	"github.com/containerd/containerd/log"
38	"github.com/containerd/containerd/mount"
39	"github.com/containerd/containerd/namespaces"
40	"github.com/containerd/containerd/pkg/oom"
41	"github.com/containerd/containerd/pkg/process"
42	"github.com/containerd/containerd/pkg/stdio"
43	"github.com/containerd/containerd/runtime/v2/runc"
44	"github.com/containerd/containerd/runtime/v2/runc/options"
45	"github.com/containerd/containerd/runtime/v2/shim"
46	taskAPI "github.com/containerd/containerd/runtime/v2/task"
47	"github.com/containerd/containerd/sys/reaper"
48	runcC "github.com/containerd/go-runc"
49	"github.com/containerd/typeurl"
50	"github.com/gogo/protobuf/proto"
51	"github.com/gogo/protobuf/types"
52	ptypes "github.com/gogo/protobuf/types"
53	specs "github.com/opencontainers/runtime-spec/specs-go"
54	"github.com/pkg/errors"
55	"github.com/sirupsen/logrus"
56	"golang.org/x/sys/unix"
57)
58
59var (
60	_     = (taskAPI.TaskService)(&service{})
61	empty = &ptypes.Empty{}
62)
63
64// group labels specifies how the shim groups services.
65// currently supports a runc.v2 specific .group label and the
66// standard k8s pod label.  Order matters in this list
67var groupLabels = []string{
68	"io.containerd.runc.v2.group",
69	"io.kubernetes.cri.sandbox-id",
70}
71
72type spec struct {
73	Annotations map[string]string `json:"annotations,omitempty"`
74}
75
76// New returns a new shim service that can be used via GRPC
77func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
78	ep, err := oom.New(publisher)
79	if err != nil {
80		return nil, err
81	}
82	go ep.Run(ctx)
83	s := &service{
84		id:         id,
85		context:    ctx,
86		events:     make(chan interface{}, 128),
87		ec:         reaper.Default.Subscribe(),
88		ep:         ep,
89		cancel:     shutdown,
90		containers: make(map[string]*runc.Container),
91	}
92	go s.processExits()
93	runcC.Monitor = reaper.Default
94	if err := s.initPlatform(); err != nil {
95		shutdown()
96		return nil, errors.Wrap(err, "failed to initialized platform behavior")
97	}
98	go s.forward(ctx, publisher)
99	return s, nil
100}
101
102// service is the shim implementation of a remote shim over GRPC
103type service struct {
104	mu          sync.Mutex
105	eventSendMu sync.Mutex
106
107	context  context.Context
108	events   chan interface{}
109	platform stdio.Platform
110	ec       chan runcC.Exit
111	ep       *oom.Epoller
112
113	// id only used in cleanup case
114	id string
115
116	containers map[string]*runc.Container
117
118	cancel func()
119}
120
121func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
122	ns, err := namespaces.NamespaceRequired(ctx)
123	if err != nil {
124		return nil, err
125	}
126	self, err := os.Executable()
127	if err != nil {
128		return nil, err
129	}
130	cwd, err := os.Getwd()
131	if err != nil {
132		return nil, err
133	}
134	args := []string{
135		"-namespace", ns,
136		"-id", id,
137		"-address", containerdAddress,
138	}
139	cmd := exec.Command(self, args...)
140	cmd.Dir = cwd
141	cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
142	cmd.SysProcAttr = &syscall.SysProcAttr{
143		Setpgid: true,
144	}
145	return cmd, nil
146}
147
148func readSpec() (*spec, error) {
149	f, err := os.Open("config.json")
150	if err != nil {
151		return nil, err
152	}
153	defer f.Close()
154	var s spec
155	if err := json.NewDecoder(f).Decode(&s); err != nil {
156		return nil, err
157	}
158	return &s, nil
159}
160
161func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
162	cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
163	if err != nil {
164		return "", err
165	}
166	grouping := id
167	spec, err := readSpec()
168	if err != nil {
169		return "", err
170	}
171	for _, group := range groupLabels {
172		if groupID, ok := spec.Annotations[group]; ok {
173			grouping = groupID
174			break
175		}
176	}
177	address, err := shim.SocketAddress(ctx, grouping)
178	if err != nil {
179		return "", err
180	}
181	socket, err := shim.NewSocket(address)
182	if err != nil {
183		if strings.Contains(err.Error(), "address already in use") {
184			if err := shim.WriteAddress("address", address); err != nil {
185				return "", err
186			}
187			return address, nil
188		}
189		return "", err
190	}
191	defer socket.Close()
192	f, err := socket.File()
193	if err != nil {
194		return "", err
195	}
196	defer f.Close()
197
198	cmd.ExtraFiles = append(cmd.ExtraFiles, f)
199
200	if err := cmd.Start(); err != nil {
201		return "", err
202	}
203	defer func() {
204		if err != nil {
205			cmd.Process.Kill()
206		}
207	}()
208	// make sure to wait after start
209	go cmd.Wait()
210	if err := shim.WriteAddress("address", address); err != nil {
211		return "", err
212	}
213	if data, err := ioutil.ReadAll(os.Stdin); err == nil {
214		if len(data) > 0 {
215			var any types.Any
216			if err := proto.Unmarshal(data, &any); err != nil {
217				return "", err
218			}
219			v, err := typeurl.UnmarshalAny(&any)
220			if err != nil {
221				return "", err
222			}
223			if opts, ok := v.(*options.Options); ok {
224				if opts.ShimCgroup != "" {
225					cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
226					if err != nil {
227						return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
228					}
229					if err := cg.Add(cgroups.Process{
230						Pid: cmd.Process.Pid,
231					}); err != nil {
232						return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
233					}
234				}
235			}
236		}
237	}
238	if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
239		return "", errors.Wrap(err, "failed to adjust OOM score for shim")
240	}
241	return address, nil
242}
243
244func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
245	cwd, err := os.Getwd()
246	if err != nil {
247		return nil, err
248	}
249	path := filepath.Join(filepath.Dir(cwd), s.id)
250	ns, err := namespaces.NamespaceRequired(ctx)
251	if err != nil {
252		return nil, err
253	}
254
255	runtime, err := runc.ReadRuntime(path)
256	if err != nil {
257		return nil, err
258	}
259	r := process.NewRunc(process.RuncRoot, path, ns, runtime, "", false)
260	if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
261		Force: true,
262	}); err != nil {
263		logrus.WithError(err).Warn("failed to remove runc container")
264	}
265	if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
266		logrus.WithError(err).Warn("failed to cleanup rootfs mount")
267	}
268	return &taskAPI.DeleteResponse{
269		ExitedAt:   time.Now(),
270		ExitStatus: 128 + uint32(unix.SIGKILL),
271	}, nil
272}
273
274// Create a new initial process and container with the underlying OCI runtime
275func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
276	s.mu.Lock()
277	defer s.mu.Unlock()
278
279	container, err := runc.NewContainer(ctx, s.platform, r)
280	if err != nil {
281		return nil, err
282	}
283
284	s.containers[r.ID] = container
285
286	s.send(&eventstypes.TaskCreate{
287		ContainerID: r.ID,
288		Bundle:      r.Bundle,
289		Rootfs:      r.Rootfs,
290		IO: &eventstypes.TaskIO{
291			Stdin:    r.Stdin,
292			Stdout:   r.Stdout,
293			Stderr:   r.Stderr,
294			Terminal: r.Terminal,
295		},
296		Checkpoint: r.Checkpoint,
297		Pid:        uint32(container.Pid()),
298	})
299
300	return &taskAPI.CreateTaskResponse{
301		Pid: uint32(container.Pid()),
302	}, nil
303}
304
305// Start a process
306func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
307	container, err := s.getContainer(r.ID)
308	if err != nil {
309		return nil, err
310	}
311
312	// hold the send lock so that the start events are sent before any exit events in the error case
313	s.eventSendMu.Lock()
314	p, err := container.Start(ctx, r)
315	if err != nil {
316		s.eventSendMu.Unlock()
317		return nil, errdefs.ToGRPC(err)
318	}
319	if err := s.ep.Add(container.ID, container.Cgroup()); err != nil {
320		logrus.WithError(err).Error("add cg to OOM monitor")
321	}
322	switch r.ExecID {
323	case "":
324		s.send(&eventstypes.TaskStart{
325			ContainerID: container.ID,
326			Pid:         uint32(p.Pid()),
327		})
328	default:
329		s.send(&eventstypes.TaskExecStarted{
330			ContainerID: container.ID,
331			ExecID:      r.ExecID,
332			Pid:         uint32(p.Pid()),
333		})
334	}
335	s.eventSendMu.Unlock()
336	return &taskAPI.StartResponse{
337		Pid: uint32(p.Pid()),
338	}, nil
339}
340
341// Delete the initial process and container
342func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
343	container, err := s.getContainer(r.ID)
344	if err != nil {
345		return nil, err
346	}
347	p, err := container.Delete(ctx, r)
348	if err != nil {
349		return nil, errdefs.ToGRPC(err)
350	}
351	// if we deleted our init task, close the platform and send the task delete event
352	if r.ExecID == "" {
353		s.mu.Lock()
354		delete(s.containers, r.ID)
355		hasContainers := len(s.containers) > 0
356		s.mu.Unlock()
357		if s.platform != nil && !hasContainers {
358			s.platform.Close()
359		}
360		s.send(&eventstypes.TaskDelete{
361			ContainerID: container.ID,
362			Pid:         uint32(p.Pid()),
363			ExitStatus:  uint32(p.ExitStatus()),
364			ExitedAt:    p.ExitedAt(),
365		})
366	}
367	return &taskAPI.DeleteResponse{
368		ExitStatus: uint32(p.ExitStatus()),
369		ExitedAt:   p.ExitedAt(),
370		Pid:        uint32(p.Pid()),
371	}, nil
372}
373
374// Exec an additional process inside the container
375func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
376	container, err := s.getContainer(r.ID)
377	if err != nil {
378		return nil, err
379	}
380	ok, cancel := container.ReserveProcess(r.ExecID)
381	if !ok {
382		return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
383	}
384	process, err := container.Exec(ctx, r)
385	if err != nil {
386		cancel()
387		return nil, errdefs.ToGRPC(err)
388	}
389
390	s.send(&eventstypes.TaskExecAdded{
391		ContainerID: container.ID,
392		ExecID:      process.ID(),
393	})
394	return empty, nil
395}
396
397// ResizePty of a process
398func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
399	container, err := s.getContainer(r.ID)
400	if err != nil {
401		return nil, err
402	}
403	if err := container.ResizePty(ctx, r); err != nil {
404		return nil, errdefs.ToGRPC(err)
405	}
406	return empty, nil
407}
408
409// State returns runtime state information for a process
410func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
411	container, err := s.getContainer(r.ID)
412	if err != nil {
413		return nil, err
414	}
415	p, err := container.Process(r.ExecID)
416	if err != nil {
417		return nil, err
418	}
419	st, err := p.Status(ctx)
420	if err != nil {
421		return nil, err
422	}
423	status := task.StatusUnknown
424	switch st {
425	case "created":
426		status = task.StatusCreated
427	case "running":
428		status = task.StatusRunning
429	case "stopped":
430		status = task.StatusStopped
431	case "paused":
432		status = task.StatusPaused
433	case "pausing":
434		status = task.StatusPausing
435	}
436	sio := p.Stdio()
437	return &taskAPI.StateResponse{
438		ID:         p.ID(),
439		Bundle:     container.Bundle,
440		Pid:        uint32(p.Pid()),
441		Status:     status,
442		Stdin:      sio.Stdin,
443		Stdout:     sio.Stdout,
444		Stderr:     sio.Stderr,
445		Terminal:   sio.Terminal,
446		ExitStatus: uint32(p.ExitStatus()),
447		ExitedAt:   p.ExitedAt(),
448	}, nil
449}
450
451// Pause the container
452func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
453	container, err := s.getContainer(r.ID)
454	if err != nil {
455		return nil, err
456	}
457	if err := container.Pause(ctx); err != nil {
458		return nil, errdefs.ToGRPC(err)
459	}
460	s.send(&eventstypes.TaskPaused{
461		ContainerID: container.ID,
462	})
463	return empty, nil
464}
465
466// Resume the container
467func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
468	container, err := s.getContainer(r.ID)
469	if err != nil {
470		return nil, err
471	}
472	if err := container.Resume(ctx); err != nil {
473		return nil, errdefs.ToGRPC(err)
474	}
475	s.send(&eventstypes.TaskResumed{
476		ContainerID: container.ID,
477	})
478	return empty, nil
479}
480
481// Kill a process with the provided signal
482func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
483	container, err := s.getContainer(r.ID)
484	if err != nil {
485		return nil, err
486	}
487	if err := container.Kill(ctx, r); err != nil {
488		return nil, errdefs.ToGRPC(err)
489	}
490	return empty, nil
491}
492
493// Pids returns all pids inside the container
494func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
495	container, err := s.getContainer(r.ID)
496	if err != nil {
497		return nil, err
498	}
499	pids, err := s.getContainerPids(ctx, r.ID)
500	if err != nil {
501		return nil, errdefs.ToGRPC(err)
502	}
503	var processes []*task.ProcessInfo
504	for _, pid := range pids {
505		pInfo := task.ProcessInfo{
506			Pid: pid,
507		}
508		for _, p := range container.ExecdProcesses() {
509			if p.Pid() == int(pid) {
510				d := &options.ProcessDetails{
511					ExecID: p.ID(),
512				}
513				a, err := typeurl.MarshalAny(d)
514				if err != nil {
515					return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
516				}
517				pInfo.Info = a
518				break
519			}
520		}
521		processes = append(processes, &pInfo)
522	}
523	return &taskAPI.PidsResponse{
524		Processes: processes,
525	}, nil
526}
527
528// CloseIO of a process
529func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
530	container, err := s.getContainer(r.ID)
531	if err != nil {
532		return nil, err
533	}
534	if err := container.CloseIO(ctx, r); err != nil {
535		return nil, err
536	}
537	return empty, nil
538}
539
540// Checkpoint the container
541func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
542	container, err := s.getContainer(r.ID)
543	if err != nil {
544		return nil, err
545	}
546	if err := container.Checkpoint(ctx, r); err != nil {
547		return nil, errdefs.ToGRPC(err)
548	}
549	return empty, nil
550}
551
552// Update a running container
553func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
554	container, err := s.getContainer(r.ID)
555	if err != nil {
556		return nil, err
557	}
558	if err := container.Update(ctx, r); err != nil {
559		return nil, errdefs.ToGRPC(err)
560	}
561	return empty, nil
562}
563
564// Wait for a process to exit
565func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
566	container, err := s.getContainer(r.ID)
567	if err != nil {
568		return nil, err
569	}
570	p, err := container.Process(r.ExecID)
571	if err != nil {
572		return nil, errdefs.ToGRPC(err)
573	}
574	p.Wait()
575
576	return &taskAPI.WaitResponse{
577		ExitStatus: uint32(p.ExitStatus()),
578		ExitedAt:   p.ExitedAt(),
579	}, nil
580}
581
582// Connect returns shim information such as the shim's pid
583func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
584	var pid int
585	if container, err := s.getContainer(r.ID); err == nil {
586		pid = container.Pid()
587	}
588	return &taskAPI.ConnectResponse{
589		ShimPid: uint32(os.Getpid()),
590		TaskPid: uint32(pid),
591	}, nil
592}
593
594func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
595	s.mu.Lock()
596	// return out if the shim is still servicing containers
597	if len(s.containers) > 0 {
598		s.mu.Unlock()
599		return empty, nil
600	}
601	s.cancel()
602	close(s.events)
603	return empty, nil
604}
605
606func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
607	container, err := s.getContainer(r.ID)
608	if err != nil {
609		return nil, err
610	}
611	cg := container.Cgroup()
612	if cg == nil {
613		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
614	}
615	stats, err := cg.Stat(cgroups.IgnoreNotExist)
616	if err != nil {
617		return nil, err
618	}
619	data, err := typeurl.MarshalAny(stats)
620	if err != nil {
621		return nil, err
622	}
623	return &taskAPI.StatsResponse{
624		Stats: data,
625	}, nil
626}
627
628func (s *service) processExits() {
629	for e := range s.ec {
630		s.checkProcesses(e)
631	}
632}
633
634func (s *service) send(evt interface{}) {
635	s.events <- evt
636}
637
638func (s *service) sendL(evt interface{}) {
639	s.eventSendMu.Lock()
640	s.events <- evt
641	s.eventSendMu.Unlock()
642}
643
644func (s *service) checkProcesses(e runcC.Exit) {
645	s.mu.Lock()
646	defer s.mu.Unlock()
647
648	for _, container := range s.containers {
649		if !container.HasPid(e.Pid) {
650			continue
651		}
652
653		for _, p := range container.All() {
654			if p.Pid() != e.Pid {
655				continue
656			}
657
658			if ip, ok := p.(*process.Init); ok {
659				shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
660				if err != nil {
661					log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
662				}
663
664				// Ensure all children are killed
665				if shouldKillAll {
666					if err := ip.KillAll(s.context); err != nil {
667						logrus.WithError(err).WithField("id", ip.ID()).
668							Error("failed to kill init's children")
669					}
670				}
671			}
672
673			p.SetExited(e.Status)
674			s.sendL(&eventstypes.TaskExit{
675				ContainerID: container.ID,
676				ID:          p.ID(),
677				Pid:         uint32(e.Pid),
678				ExitStatus:  uint32(e.Status),
679				ExitedAt:    p.ExitedAt(),
680			})
681			return
682		}
683		return
684	}
685}
686
687func shouldKillAllOnExit(bundlePath string) (bool, error) {
688	var bundleSpec specs.Spec
689	bundleConfigContents, err := ioutil.ReadFile(filepath.Join(bundlePath, "config.json"))
690	if err != nil {
691		return false, err
692	}
693	json.Unmarshal(bundleConfigContents, &bundleSpec)
694
695	if bundleSpec.Linux != nil {
696		for _, ns := range bundleSpec.Linux.Namespaces {
697			if ns.Type == specs.PIDNamespace && ns.Path == "" {
698				return false, nil
699			}
700		}
701	}
702
703	return true, nil
704}
705
706func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
707	container, err := s.getContainer(id)
708	if err != nil {
709		return nil, err
710	}
711	p, err := container.Process("")
712	if err != nil {
713		return nil, errdefs.ToGRPC(err)
714	}
715	ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
716	if err != nil {
717		return nil, err
718	}
719	pids := make([]uint32, 0, len(ps))
720	for _, pid := range ps {
721		pids = append(pids, uint32(pid))
722	}
723	return pids, nil
724}
725
726func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
727	ns, _ := namespaces.Namespace(ctx)
728	ctx = namespaces.WithNamespace(context.Background(), ns)
729	for e := range s.events {
730		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
731		err := publisher.Publish(ctx, runc.GetTopic(e), e)
732		cancel()
733		if err != nil {
734			logrus.WithError(err).Error("post event")
735		}
736	}
737	publisher.Close()
738}
739
740func (s *service) getContainer(id string) (*runc.Container, error) {
741	s.mu.Lock()
742	container := s.containers[id]
743	s.mu.Unlock()
744	if container == nil {
745		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
746	}
747	return container, nil
748}
749
750// initialize a single epoll fd to manage our consoles. `initPlatform` should
751// only be called once.
752func (s *service) initPlatform() error {
753	if s.platform != nil {
754		return nil
755	}
756	p, err := runc.NewPlatform()
757	if err != nil {
758		return err
759	}
760	s.platform = p
761	return nil
762}
763