1// +build linux
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package v1
20
21import (
22	"context"
23	"io/ioutil"
24	"os"
25	"os/exec"
26	"path/filepath"
27	"sync"
28	"syscall"
29	"time"
30
31	"github.com/containerd/cgroups"
32	eventstypes "github.com/containerd/containerd/api/events"
33	"github.com/containerd/containerd/api/types/task"
34	"github.com/containerd/containerd/errdefs"
35	"github.com/containerd/containerd/mount"
36	"github.com/containerd/containerd/namespaces"
37	"github.com/containerd/containerd/pkg/oom"
38	oomv1 "github.com/containerd/containerd/pkg/oom/v1"
39	"github.com/containerd/containerd/pkg/process"
40	"github.com/containerd/containerd/pkg/stdio"
41	"github.com/containerd/containerd/runtime/v2/runc"
42	"github.com/containerd/containerd/runtime/v2/runc/options"
43	"github.com/containerd/containerd/runtime/v2/shim"
44	taskAPI "github.com/containerd/containerd/runtime/v2/task"
45	"github.com/containerd/containerd/sys/reaper"
46	runcC "github.com/containerd/go-runc"
47	"github.com/containerd/typeurl"
48	"github.com/gogo/protobuf/proto"
49	ptypes "github.com/gogo/protobuf/types"
50	"github.com/pkg/errors"
51	"github.com/sirupsen/logrus"
52	"golang.org/x/sys/unix"
53)
54
55var (
56	_     = (taskAPI.TaskService)(&service{})
57	empty = &ptypes.Empty{}
58)
59
60// New returns a new shim service that can be used via GRPC
61func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
62	ep, err := oomv1.New(publisher)
63	if err != nil {
64		return nil, err
65	}
66	go ep.Run(ctx)
67	s := &service{
68		id:      id,
69		context: ctx,
70		events:  make(chan interface{}, 128),
71		ec:      reaper.Default.Subscribe(),
72		ep:      ep,
73		cancel:  shutdown,
74	}
75	go s.processExits()
76	runcC.Monitor = reaper.Default
77	if err := s.initPlatform(); err != nil {
78		shutdown()
79		return nil, errors.Wrap(err, "failed to initialized platform behavior")
80	}
81	go s.forward(ctx, publisher)
82	return s, nil
83}
84
85// service is the shim implementation of a remote shim over GRPC
86type service struct {
87	mu          sync.Mutex
88	eventSendMu sync.Mutex
89
90	context  context.Context
91	events   chan interface{}
92	platform stdio.Platform
93	ec       chan runcC.Exit
94	ep       oom.Watcher
95
96	id        string
97	container *runc.Container
98
99	cancel func()
100}
101
102func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
103	ns, err := namespaces.NamespaceRequired(ctx)
104	if err != nil {
105		return nil, err
106	}
107	self, err := os.Executable()
108	if err != nil {
109		return nil, err
110	}
111	cwd, err := os.Getwd()
112	if err != nil {
113		return nil, err
114	}
115	args := []string{
116		"-namespace", ns,
117		"-id", id,
118		"-address", containerdAddress,
119	}
120	cmd := exec.Command(self, args...)
121	cmd.Dir = cwd
122	cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
123	cmd.SysProcAttr = &syscall.SysProcAttr{
124		Setpgid: true,
125	}
126	return cmd, nil
127}
128
129func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
130	cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
131	if err != nil {
132		return "", err
133	}
134	address, err := shim.SocketAddress(ctx, containerdAddress, id)
135	if err != nil {
136		return "", err
137	}
138	socket, err := shim.NewSocket(address)
139	if err != nil {
140		if !shim.SocketEaddrinuse(err) {
141			return "", err
142		}
143		if err := shim.RemoveSocket(address); err != nil {
144			return "", errors.Wrap(err, "remove already used socket")
145		}
146		if socket, err = shim.NewSocket(address); err != nil {
147			return "", err
148		}
149	}
150	f, err := socket.File()
151	if err != nil {
152		return "", err
153	}
154
155	cmd.ExtraFiles = append(cmd.ExtraFiles, f)
156
157	if err := cmd.Start(); err != nil {
158		return "", err
159	}
160	defer func() {
161		if err != nil {
162			_ = shim.RemoveSocket(address)
163			cmd.Process.Kill()
164		}
165	}()
166	// make sure to wait after start
167	go cmd.Wait()
168	if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil {
169		return "", err
170	}
171	if err := shim.WriteAddress("address", address); err != nil {
172		return "", err
173	}
174	if data, err := ioutil.ReadAll(os.Stdin); err == nil {
175		if len(data) > 0 {
176			var any ptypes.Any
177			if err := proto.Unmarshal(data, &any); err != nil {
178				return "", err
179			}
180			v, err := typeurl.UnmarshalAny(&any)
181			if err != nil {
182				return "", err
183			}
184			if opts, ok := v.(*options.Options); ok {
185				if opts.ShimCgroup != "" {
186					cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
187					if err != nil {
188						return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
189					}
190					if err := cg.Add(cgroups.Process{
191						Pid: cmd.Process.Pid,
192					}); err != nil {
193						return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
194					}
195				}
196			}
197		}
198	}
199	if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
200		return "", errors.Wrap(err, "failed to adjust OOM score for shim")
201	}
202	return address, nil
203}
204
205func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
206	path, err := os.Getwd()
207	if err != nil {
208		return nil, err
209	}
210	ns, err := namespaces.NamespaceRequired(ctx)
211	if err != nil {
212		return nil, err
213	}
214	runtime, err := runc.ReadRuntime(path)
215	if err != nil {
216		return nil, err
217	}
218	opts, err := runc.ReadOptions(path)
219	if err != nil {
220		return nil, err
221	}
222	root := process.RuncRoot
223	if opts != nil && opts.Root != "" {
224		root = opts.Root
225	}
226
227	r := process.NewRunc(root, path, ns, runtime, "", false)
228	if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
229		Force: true,
230	}); err != nil {
231		logrus.WithError(err).Warn("failed to remove runc container")
232	}
233	if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
234		logrus.WithError(err).Warn("failed to cleanup rootfs mount")
235	}
236	return &taskAPI.DeleteResponse{
237		ExitedAt:   time.Now(),
238		ExitStatus: 128 + uint32(unix.SIGKILL),
239	}, nil
240}
241
242// Create a new initial process and container with the underlying OCI runtime
243func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
244	s.mu.Lock()
245	defer s.mu.Unlock()
246
247	container, err := runc.NewContainer(ctx, s.platform, r)
248	if err != nil {
249		return nil, err
250	}
251
252	s.container = container
253
254	s.send(&eventstypes.TaskCreate{
255		ContainerID: r.ID,
256		Bundle:      r.Bundle,
257		Rootfs:      r.Rootfs,
258		IO: &eventstypes.TaskIO{
259			Stdin:    r.Stdin,
260			Stdout:   r.Stdout,
261			Stderr:   r.Stderr,
262			Terminal: r.Terminal,
263		},
264		Checkpoint: r.Checkpoint,
265		Pid:        uint32(container.Pid()),
266	})
267
268	return &taskAPI.CreateTaskResponse{
269		Pid: uint32(container.Pid()),
270	}, nil
271}
272
273// Start a process
274func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
275	container, err := s.getContainer()
276	if err != nil {
277		return nil, err
278	}
279
280	// hold the send lock so that the start events are sent before any exit events in the error case
281	s.eventSendMu.Lock()
282	p, err := container.Start(ctx, r)
283	if err != nil {
284		s.eventSendMu.Unlock()
285		return nil, errdefs.ToGRPC(err)
286	}
287	switch r.ExecID {
288	case "":
289		if cg, ok := container.Cgroup().(cgroups.Cgroup); ok {
290			if err := s.ep.Add(container.ID, cg); err != nil {
291				logrus.WithError(err).Error("add cg to OOM monitor")
292			}
293		} else {
294			logrus.WithError(errdefs.ErrNotImplemented).Error("add cg to OOM monitor")
295		}
296		s.send(&eventstypes.TaskStart{
297			ContainerID: container.ID,
298			Pid:         uint32(p.Pid()),
299		})
300	default:
301		s.send(&eventstypes.TaskExecStarted{
302			ContainerID: container.ID,
303			ExecID:      r.ExecID,
304			Pid:         uint32(p.Pid()),
305		})
306	}
307	s.eventSendMu.Unlock()
308	return &taskAPI.StartResponse{
309		Pid: uint32(p.Pid()),
310	}, nil
311}
312
313// Delete the initial process and container
314func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
315	container, err := s.getContainer()
316	if err != nil {
317		return nil, err
318	}
319	p, err := container.Delete(ctx, r)
320	if err != nil {
321		return nil, errdefs.ToGRPC(err)
322	}
323	// if we deleted our init task, close the platform and send the task delete event
324	if r.ExecID == "" {
325		if s.platform != nil {
326			s.platform.Close()
327		}
328		s.send(&eventstypes.TaskDelete{
329			ContainerID: container.ID,
330			Pid:         uint32(p.Pid()),
331			ExitStatus:  uint32(p.ExitStatus()),
332			ExitedAt:    p.ExitedAt(),
333		})
334	}
335	return &taskAPI.DeleteResponse{
336		ExitStatus: uint32(p.ExitStatus()),
337		ExitedAt:   p.ExitedAt(),
338		Pid:        uint32(p.Pid()),
339	}, nil
340}
341
342// Exec an additional process inside the container
343func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
344	container, err := s.getContainer()
345	if err != nil {
346		return nil, err
347	}
348	ok, cancel := container.ReserveProcess(r.ExecID)
349	if !ok {
350		return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
351	}
352	process, err := container.Exec(ctx, r)
353	if err != nil {
354		cancel()
355		return nil, errdefs.ToGRPC(err)
356	}
357
358	s.send(&eventstypes.TaskExecAdded{
359		ContainerID: s.container.ID,
360		ExecID:      process.ID(),
361	})
362	return empty, nil
363}
364
365// ResizePty of a process
366func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
367	container, err := s.getContainer()
368	if err != nil {
369		return nil, err
370	}
371	if err := container.ResizePty(ctx, r); err != nil {
372		return nil, errdefs.ToGRPC(err)
373	}
374	return empty, nil
375}
376
377// State returns runtime state information for a process
378func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
379	p, err := s.getProcess(r.ExecID)
380	if err != nil {
381		return nil, err
382	}
383	st, err := p.Status(ctx)
384	if err != nil {
385		return nil, err
386	}
387	status := task.StatusUnknown
388	switch st {
389	case "created":
390		status = task.StatusCreated
391	case "running":
392		status = task.StatusRunning
393	case "stopped":
394		status = task.StatusStopped
395	case "paused":
396		status = task.StatusPaused
397	case "pausing":
398		status = task.StatusPausing
399	}
400	sio := p.Stdio()
401	return &taskAPI.StateResponse{
402		ID:         p.ID(),
403		Bundle:     s.container.Bundle,
404		Pid:        uint32(p.Pid()),
405		Status:     status,
406		Stdin:      sio.Stdin,
407		Stdout:     sio.Stdout,
408		Stderr:     sio.Stderr,
409		Terminal:   sio.Terminal,
410		ExitStatus: uint32(p.ExitStatus()),
411		ExitedAt:   p.ExitedAt(),
412	}, nil
413}
414
415// Pause the container
416func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
417	container, err := s.getContainer()
418	if err != nil {
419		return nil, err
420	}
421	if err := container.Pause(ctx); err != nil {
422		return nil, errdefs.ToGRPC(err)
423	}
424	s.send(&eventstypes.TaskPaused{
425		ContainerID: container.ID,
426	})
427	return empty, nil
428}
429
430// Resume the container
431func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
432	container, err := s.getContainer()
433	if err != nil {
434		return nil, err
435	}
436	if err := container.Resume(ctx); err != nil {
437		return nil, errdefs.ToGRPC(err)
438	}
439	s.send(&eventstypes.TaskResumed{
440		ContainerID: container.ID,
441	})
442	return empty, nil
443}
444
445// Kill a process with the provided signal
446func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
447	container, err := s.getContainer()
448	if err != nil {
449		return nil, err
450	}
451	if err := container.Kill(ctx, r); err != nil {
452		return nil, errdefs.ToGRPC(err)
453	}
454	return empty, nil
455}
456
457// Pids returns all pids inside the container
458func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
459	container, err := s.getContainer()
460	if err != nil {
461		return nil, err
462	}
463	pids, err := s.getContainerPids(ctx, r.ID)
464	if err != nil {
465		return nil, errdefs.ToGRPC(err)
466	}
467	var processes []*task.ProcessInfo
468	for _, pid := range pids {
469		pInfo := task.ProcessInfo{
470			Pid: pid,
471		}
472		for _, p := range container.ExecdProcesses() {
473			if p.Pid() == int(pid) {
474				d := &options.ProcessDetails{
475					ExecID: p.ID(),
476				}
477				a, err := typeurl.MarshalAny(d)
478				if err != nil {
479					return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
480				}
481				pInfo.Info = a
482				break
483			}
484		}
485		processes = append(processes, &pInfo)
486	}
487	return &taskAPI.PidsResponse{
488		Processes: processes,
489	}, nil
490}
491
492// CloseIO of a process
493func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
494	container, err := s.getContainer()
495	if err != nil {
496		return nil, err
497	}
498	if err := container.CloseIO(ctx, r); err != nil {
499		return nil, err
500	}
501	return empty, nil
502}
503
504// Checkpoint the container
505func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
506	container, err := s.getContainer()
507	if err != nil {
508		return nil, err
509	}
510	if err := container.Checkpoint(ctx, r); err != nil {
511		return nil, errdefs.ToGRPC(err)
512	}
513	return empty, nil
514}
515
516// Update a running container
517func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
518	container, err := s.getContainer()
519	if err != nil {
520		return nil, err
521	}
522	if err := container.Update(ctx, r); err != nil {
523		return nil, errdefs.ToGRPC(err)
524	}
525	return empty, nil
526}
527
528// Wait for a process to exit
529func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
530	container, err := s.getContainer()
531	if err != nil {
532		return nil, err
533	}
534	p, err := container.Process(r.ExecID)
535	if err != nil {
536		return nil, errdefs.ToGRPC(err)
537	}
538	p.Wait()
539
540	return &taskAPI.WaitResponse{
541		ExitStatus: uint32(p.ExitStatus()),
542		ExitedAt:   p.ExitedAt(),
543	}, nil
544}
545
546// Connect returns shim information such as the shim's pid
547func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
548	var pid int
549	if s.container != nil {
550		pid = s.container.Pid()
551	}
552	return &taskAPI.ConnectResponse{
553		ShimPid: uint32(os.Getpid()),
554		TaskPid: uint32(pid),
555	}, nil
556}
557
558func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
559	s.cancel()
560	close(s.events)
561	if address, err := shim.ReadAddress("address"); err == nil {
562		_ = shim.RemoveSocket(address)
563	}
564	return empty, nil
565}
566
567func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
568	cgx := s.container.Cgroup()
569	if cgx == nil {
570		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
571	}
572	cg, ok := cgx.(cgroups.Cgroup)
573	if !ok {
574		return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "cgroup v2 not implemented for Stats")
575	}
576	if cg == nil {
577		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
578	}
579	stats, err := cg.Stat(cgroups.IgnoreNotExist)
580	if err != nil {
581		return nil, err
582	}
583	data, err := typeurl.MarshalAny(stats)
584	if err != nil {
585		return nil, err
586	}
587	return &taskAPI.StatsResponse{
588		Stats: data,
589	}, nil
590}
591
592func (s *service) processExits() {
593	for e := range s.ec {
594		s.checkProcesses(e)
595	}
596}
597
598func (s *service) send(evt interface{}) {
599	s.events <- evt
600}
601
602func (s *service) sendL(evt interface{}) {
603	s.eventSendMu.Lock()
604	s.events <- evt
605	s.eventSendMu.Unlock()
606}
607
608func (s *service) checkProcesses(e runcC.Exit) {
609	container, err := s.getContainer()
610	if err != nil {
611		return
612	}
613
614	for _, p := range container.All() {
615		if p.Pid() == e.Pid {
616			if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
617				if ip, ok := p.(*process.Init); ok {
618					// Ensure all children are killed
619					if err := ip.KillAll(s.context); err != nil {
620						logrus.WithError(err).WithField("id", ip.ID()).
621							Error("failed to kill init's children")
622					}
623				}
624			}
625			p.SetExited(e.Status)
626			s.sendL(&eventstypes.TaskExit{
627				ContainerID: container.ID,
628				ID:          p.ID(),
629				Pid:         uint32(e.Pid),
630				ExitStatus:  uint32(e.Status),
631				ExitedAt:    p.ExitedAt(),
632			})
633			return
634		}
635	}
636}
637
638func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
639	p, err := s.container.Process("")
640	if err != nil {
641		return nil, errdefs.ToGRPC(err)
642	}
643	ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
644	if err != nil {
645		return nil, err
646	}
647	pids := make([]uint32, 0, len(ps))
648	for _, pid := range ps {
649		pids = append(pids, uint32(pid))
650	}
651	return pids, nil
652}
653
654func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
655	ns, _ := namespaces.Namespace(ctx)
656	ctx = namespaces.WithNamespace(context.Background(), ns)
657	for e := range s.events {
658		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
659		err := publisher.Publish(ctx, runc.GetTopic(e), e)
660		cancel()
661		if err != nil {
662			logrus.WithError(err).Error("post event")
663		}
664	}
665	publisher.Close()
666}
667
668func (s *service) getContainer() (*runc.Container, error) {
669	s.mu.Lock()
670	container := s.container
671	s.mu.Unlock()
672	if container == nil {
673		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
674	}
675	return container, nil
676}
677
678func (s *service) getProcess(execID string) (process.Process, error) {
679	container, err := s.getContainer()
680	if err != nil {
681		return nil, err
682	}
683	p, err := container.Process(execID)
684	if err != nil {
685		return nil, errdefs.ToGRPC(err)
686	}
687	return p, nil
688}
689
690// initialize a single epoll fd to manage our consoles. `initPlatform` should
691// only be called once.
692func (s *service) initPlatform() error {
693	if s.platform != nil {
694		return nil
695	}
696	p, err := runc.NewPlatform()
697	if err != nil {
698		return err
699	}
700	s.platform = p
701	return nil
702}
703