1// +build !windows
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package shim
20
21import (
22	"context"
23	"encoding/json"
24	"fmt"
25	"io/ioutil"
26	"os"
27	"path/filepath"
28	"sync"
29
30	"github.com/containerd/console"
31	eventstypes "github.com/containerd/containerd/api/events"
32	"github.com/containerd/containerd/api/types/task"
33	"github.com/containerd/containerd/errdefs"
34	"github.com/containerd/containerd/events"
35	"github.com/containerd/containerd/log"
36	"github.com/containerd/containerd/mount"
37	"github.com/containerd/containerd/namespaces"
38	"github.com/containerd/containerd/pkg/process"
39	"github.com/containerd/containerd/pkg/stdio"
40	"github.com/containerd/containerd/runtime"
41	"github.com/containerd/containerd/runtime/linux/runctypes"
42	shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
43	"github.com/containerd/containerd/sys/reaper"
44	runc "github.com/containerd/go-runc"
45	"github.com/containerd/typeurl"
46	ptypes "github.com/gogo/protobuf/types"
47	specs "github.com/opencontainers/runtime-spec/specs-go"
48	"github.com/pkg/errors"
49	"github.com/sirupsen/logrus"
50	"google.golang.org/grpc/codes"
51	"google.golang.org/grpc/status"
52)
53
54var (
55	empty   = &ptypes.Empty{}
56	bufPool = sync.Pool{
57		New: func() interface{} {
58			buffer := make([]byte, 4096)
59			return &buffer
60		},
61	}
62)
63
64// Config contains shim specific configuration
65type Config struct {
66	Path          string
67	Namespace     string
68	WorkDir       string
69	Criu          string
70	RuntimeRoot   string
71	SystemdCgroup bool
72}
73
74// NewService returns a new shim service that can be used via GRPC
75func NewService(config Config, publisher events.Publisher) (*Service, error) {
76	if config.Namespace == "" {
77		return nil, fmt.Errorf("shim namespace cannot be empty")
78	}
79	ctx := namespaces.WithNamespace(context.Background(), config.Namespace)
80	ctx = log.WithLogger(ctx, logrus.WithFields(logrus.Fields{
81		"namespace": config.Namespace,
82		"path":      config.Path,
83		"pid":       os.Getpid(),
84	}))
85	s := &Service{
86		config:    config,
87		context:   ctx,
88		processes: make(map[string]process.Process),
89		events:    make(chan interface{}, 128),
90		ec:        reaper.Default.Subscribe(),
91	}
92	go s.processExits()
93	if err := s.initPlatform(); err != nil {
94		return nil, errors.Wrap(err, "failed to initialized platform behavior")
95	}
96	go s.forward(publisher)
97	return s, nil
98}
99
100// Service is the shim implementation of a remote shim over GRPC
101type Service struct {
102	mu sync.Mutex
103
104	config    Config
105	context   context.Context
106	processes map[string]process.Process
107	events    chan interface{}
108	platform  stdio.Platform
109	ec        chan runc.Exit
110
111	// Filled by Create()
112	id     string
113	bundle string
114}
115
116// Create a new initial process and container with the underlying OCI runtime
117func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
118	var mounts []process.Mount
119	for _, m := range r.Rootfs {
120		mounts = append(mounts, process.Mount{
121			Type:    m.Type,
122			Source:  m.Source,
123			Target:  m.Target,
124			Options: m.Options,
125		})
126	}
127
128	rootfs := ""
129	if len(mounts) > 0 {
130		rootfs = filepath.Join(r.Bundle, "rootfs")
131		if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
132			return nil, err
133		}
134	}
135
136	config := &process.CreateConfig{
137		ID:               r.ID,
138		Bundle:           r.Bundle,
139		Runtime:          r.Runtime,
140		Rootfs:           mounts,
141		Terminal:         r.Terminal,
142		Stdin:            r.Stdin,
143		Stdout:           r.Stdout,
144		Stderr:           r.Stderr,
145		Checkpoint:       r.Checkpoint,
146		ParentCheckpoint: r.ParentCheckpoint,
147		Options:          r.Options,
148	}
149	defer func() {
150		if err != nil {
151			if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
152				log.G(ctx).WithError(err2).Warn("Failed to cleanup rootfs mount")
153			}
154		}
155	}()
156	for _, rm := range mounts {
157		m := &mount.Mount{
158			Type:    rm.Type,
159			Source:  rm.Source,
160			Options: rm.Options,
161		}
162		if err := m.Mount(rootfs); err != nil {
163			return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
164		}
165	}
166
167	s.mu.Lock()
168	defer s.mu.Unlock()
169
170	process, err := newInit(
171		ctx,
172		s.config.Path,
173		s.config.WorkDir,
174		s.config.RuntimeRoot,
175		s.config.Namespace,
176		s.config.Criu,
177		s.config.SystemdCgroup,
178		s.platform,
179		config,
180		rootfs,
181	)
182	if err != nil {
183		return nil, errdefs.ToGRPC(err)
184	}
185	if err := process.Create(ctx, config); err != nil {
186		return nil, errdefs.ToGRPC(err)
187	}
188	// save the main task id and bundle to the shim for additional requests
189	s.id = r.ID
190	s.bundle = r.Bundle
191	pid := process.Pid()
192	s.processes[r.ID] = process
193	return &shimapi.CreateTaskResponse{
194		Pid: uint32(pid),
195	}, nil
196}
197
198// Start a process
199func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
200	p, err := s.getExecProcess(r.ID)
201	if err != nil {
202		return nil, err
203	}
204	if err := p.Start(ctx); err != nil {
205		return nil, err
206	}
207	return &shimapi.StartResponse{
208		ID:  p.ID(),
209		Pid: uint32(p.Pid()),
210	}, nil
211}
212
213// Delete the initial process and container
214func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) {
215	p, err := s.getInitProcess()
216	if err != nil {
217		return nil, err
218	}
219	if err := p.Delete(ctx); err != nil {
220		return nil, errdefs.ToGRPC(err)
221	}
222	s.mu.Lock()
223	delete(s.processes, s.id)
224	s.mu.Unlock()
225	s.platform.Close()
226	return &shimapi.DeleteResponse{
227		ExitStatus: uint32(p.ExitStatus()),
228		ExitedAt:   p.ExitedAt(),
229		Pid:        uint32(p.Pid()),
230	}, nil
231}
232
233// DeleteProcess deletes an exec'd process
234func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
235	if r.ID == s.id {
236		return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
237	}
238	p, err := s.getExecProcess(r.ID)
239	if err != nil {
240		return nil, err
241	}
242	if err := p.Delete(ctx); err != nil {
243		return nil, errdefs.ToGRPC(err)
244	}
245	s.mu.Lock()
246	delete(s.processes, r.ID)
247	s.mu.Unlock()
248	return &shimapi.DeleteResponse{
249		ExitStatus: uint32(p.ExitStatus()),
250		ExitedAt:   p.ExitedAt(),
251		Pid:        uint32(p.Pid()),
252	}, nil
253}
254
255// Exec an additional process inside the container
256func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
257	s.mu.Lock()
258
259	if p := s.processes[r.ID]; p != nil {
260		s.mu.Unlock()
261		return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID)
262	}
263
264	p := s.processes[s.id]
265	s.mu.Unlock()
266	if p == nil {
267		return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
268	}
269
270	process, err := p.(*process.Init).Exec(ctx, s.config.Path, &process.ExecConfig{
271		ID:       r.ID,
272		Terminal: r.Terminal,
273		Stdin:    r.Stdin,
274		Stdout:   r.Stdout,
275		Stderr:   r.Stderr,
276		Spec:     r.Spec,
277	})
278	if err != nil {
279		return nil, errdefs.ToGRPC(err)
280	}
281	s.mu.Lock()
282	s.processes[r.ID] = process
283	s.mu.Unlock()
284	return empty, nil
285}
286
287// ResizePty of a process
288func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
289	if r.ID == "" {
290		return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
291	}
292	ws := console.WinSize{
293		Width:  uint16(r.Width),
294		Height: uint16(r.Height),
295	}
296	s.mu.Lock()
297	p := s.processes[r.ID]
298	s.mu.Unlock()
299	if p == nil {
300		return nil, errors.Errorf("process does not exist %s", r.ID)
301	}
302	if err := p.Resize(ws); err != nil {
303		return nil, errdefs.ToGRPC(err)
304	}
305	return empty, nil
306}
307
308// State returns runtime state information for a process
309func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
310	p, err := s.getExecProcess(r.ID)
311	if err != nil {
312		return nil, err
313	}
314	st, err := p.Status(ctx)
315	if err != nil {
316		return nil, err
317	}
318	status := task.StatusUnknown
319	switch st {
320	case "created":
321		status = task.StatusCreated
322	case "running":
323		status = task.StatusRunning
324	case "stopped":
325		status = task.StatusStopped
326	case "paused":
327		status = task.StatusPaused
328	case "pausing":
329		status = task.StatusPausing
330	}
331	sio := p.Stdio()
332	return &shimapi.StateResponse{
333		ID:         p.ID(),
334		Bundle:     s.bundle,
335		Pid:        uint32(p.Pid()),
336		Status:     status,
337		Stdin:      sio.Stdin,
338		Stdout:     sio.Stdout,
339		Stderr:     sio.Stderr,
340		Terminal:   sio.Terminal,
341		ExitStatus: uint32(p.ExitStatus()),
342		ExitedAt:   p.ExitedAt(),
343	}, nil
344}
345
346// Pause the container
347func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
348	p, err := s.getInitProcess()
349	if err != nil {
350		return nil, err
351	}
352	if err := p.(*process.Init).Pause(ctx); err != nil {
353		return nil, err
354	}
355	return empty, nil
356}
357
358// Resume the container
359func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
360	p, err := s.getInitProcess()
361	if err != nil {
362		return nil, err
363	}
364	if err := p.(*process.Init).Resume(ctx); err != nil {
365		return nil, err
366	}
367	return empty, nil
368}
369
370// Kill a process with the provided signal
371func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) {
372	if r.ID == "" {
373		p, err := s.getInitProcess()
374		if err != nil {
375			return nil, err
376		}
377		if err := p.Kill(ctx, r.Signal, r.All); err != nil {
378			return nil, errdefs.ToGRPC(err)
379		}
380		return empty, nil
381	}
382
383	p, err := s.getExecProcess(r.ID)
384	if err != nil {
385		return nil, err
386	}
387	if err := p.Kill(ctx, r.Signal, r.All); err != nil {
388		return nil, errdefs.ToGRPC(err)
389	}
390	return empty, nil
391}
392
393// ListPids returns all pids inside the container
394func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
395	pids, err := s.getContainerPids(ctx, r.ID)
396	if err != nil {
397		return nil, errdefs.ToGRPC(err)
398	}
399	var processes []*task.ProcessInfo
400	for _, pid := range pids {
401		pInfo := task.ProcessInfo{
402			Pid: pid,
403		}
404		for _, p := range s.processes {
405			if p.Pid() == int(pid) {
406				d := &runctypes.ProcessDetails{
407					ExecID: p.ID(),
408				}
409				a, err := typeurl.MarshalAny(d)
410				if err != nil {
411					return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
412				}
413				pInfo.Info = a
414				break
415			}
416		}
417		processes = append(processes, &pInfo)
418	}
419	return &shimapi.ListPidsResponse{
420		Processes: processes,
421	}, nil
422}
423
424// CloseIO of a process
425func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) {
426	p, err := s.getExecProcess(r.ID)
427	if err != nil {
428		return nil, err
429	}
430	if stdin := p.Stdin(); stdin != nil {
431		if err := stdin.Close(); err != nil {
432			return nil, errors.Wrap(err, "close stdin")
433		}
434	}
435	return empty, nil
436}
437
438// Checkpoint the container
439func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
440	p, err := s.getInitProcess()
441	if err != nil {
442		return nil, err
443	}
444	var options runctypes.CheckpointOptions
445	if r.Options != nil {
446		v, err := typeurl.UnmarshalAny(r.Options)
447		if err != nil {
448			return nil, err
449		}
450		options = *v.(*runctypes.CheckpointOptions)
451	}
452	if err := p.(*process.Init).Checkpoint(ctx, &process.CheckpointConfig{
453		Path:                     r.Path,
454		Exit:                     options.Exit,
455		AllowOpenTCP:             options.OpenTcp,
456		AllowExternalUnixSockets: options.ExternalUnixSockets,
457		AllowTerminal:            options.Terminal,
458		FileLocks:                options.FileLocks,
459		EmptyNamespaces:          options.EmptyNamespaces,
460		WorkDir:                  options.WorkPath,
461	}); err != nil {
462		return nil, errdefs.ToGRPC(err)
463	}
464	return empty, nil
465}
466
467// ShimInfo returns shim information such as the shim's pid
468func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
469	return &shimapi.ShimInfoResponse{
470		ShimPid: uint32(os.Getpid()),
471	}, nil
472}
473
474// Update a running container
475func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
476	p, err := s.getInitProcess()
477	if err != nil {
478		return nil, err
479	}
480	if err := p.(*process.Init).Update(ctx, r.Resources); err != nil {
481		return nil, errdefs.ToGRPC(err)
482	}
483	return empty, nil
484}
485
486// Wait for a process to exit
487func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
488	p, err := s.getExecProcess(r.ID)
489	if err != nil {
490		return nil, err
491	}
492	p.Wait()
493
494	return &shimapi.WaitResponse{
495		ExitStatus: uint32(p.ExitStatus()),
496		ExitedAt:   p.ExitedAt(),
497	}, nil
498}
499
500func (s *Service) processExits() {
501	for e := range s.ec {
502		s.checkProcesses(e)
503	}
504}
505
506func (s *Service) allProcesses() []process.Process {
507	s.mu.Lock()
508	defer s.mu.Unlock()
509
510	res := make([]process.Process, 0, len(s.processes))
511	for _, p := range s.processes {
512		res = append(res, p)
513	}
514	return res
515}
516
517func (s *Service) checkProcesses(e runc.Exit) {
518	for _, p := range s.allProcesses() {
519		if p.Pid() != e.Pid {
520			continue
521		}
522
523		if ip, ok := p.(*process.Init); ok {
524			shouldKillAll, err := shouldKillAllOnExit(s.bundle)
525			if err != nil {
526				log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
527			}
528
529			// Ensure all children are killed
530			if shouldKillAll {
531				if err := ip.KillAll(s.context); err != nil {
532					log.G(s.context).WithError(err).WithField("id", ip.ID()).
533						Error("failed to kill init's children")
534				}
535			}
536		}
537
538		p.SetExited(e.Status)
539		s.events <- &eventstypes.TaskExit{
540			ContainerID: s.id,
541			ID:          p.ID(),
542			Pid:         uint32(e.Pid),
543			ExitStatus:  uint32(e.Status),
544			ExitedAt:    p.ExitedAt(),
545		}
546		return
547	}
548}
549
550func shouldKillAllOnExit(bundlePath string) (bool, error) {
551	var bundleSpec specs.Spec
552	bundleConfigContents, err := ioutil.ReadFile(filepath.Join(bundlePath, "config.json"))
553	if err != nil {
554		return false, err
555	}
556	json.Unmarshal(bundleConfigContents, &bundleSpec)
557
558	if bundleSpec.Linux != nil {
559		for _, ns := range bundleSpec.Linux.Namespaces {
560			if ns.Type == specs.PIDNamespace && ns.Path == "" {
561				return false, nil
562			}
563		}
564	}
565
566	return true, nil
567}
568
569func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
570	p, err := s.getInitProcess()
571	if err != nil {
572		return nil, err
573	}
574
575	ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
576	if err != nil {
577		return nil, err
578	}
579	pids := make([]uint32, 0, len(ps))
580	for _, pid := range ps {
581		pids = append(pids, uint32(pid))
582	}
583	return pids, nil
584}
585
586func (s *Service) forward(publisher events.Publisher) {
587	for e := range s.events {
588		if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
589			log.G(s.context).WithError(err).Error("post event")
590		}
591	}
592}
593
594// getInitProcess returns initial process
595func (s *Service) getInitProcess() (process.Process, error) {
596	s.mu.Lock()
597	defer s.mu.Unlock()
598
599	p := s.processes[s.id]
600	if p == nil {
601		return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
602	}
603	return p, nil
604}
605
606// getExecProcess returns exec process
607func (s *Service) getExecProcess(id string) (process.Process, error) {
608	s.mu.Lock()
609	defer s.mu.Unlock()
610
611	p := s.processes[id]
612	if p == nil {
613		return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s does not exist", id)
614	}
615	return p, nil
616}
617
618func getTopic(ctx context.Context, e interface{}) string {
619	switch e.(type) {
620	case *eventstypes.TaskCreate:
621		return runtime.TaskCreateEventTopic
622	case *eventstypes.TaskStart:
623		return runtime.TaskStartEventTopic
624	case *eventstypes.TaskOOM:
625		return runtime.TaskOOMEventTopic
626	case *eventstypes.TaskExit:
627		return runtime.TaskExitEventTopic
628	case *eventstypes.TaskDelete:
629		return runtime.TaskDeleteEventTopic
630	case *eventstypes.TaskExecAdded:
631		return runtime.TaskExecAddedEventTopic
632	case *eventstypes.TaskExecStarted:
633		return runtime.TaskExecStartedEventTopic
634	case *eventstypes.TaskPaused:
635		return runtime.TaskPausedEventTopic
636	case *eventstypes.TaskResumed:
637		return runtime.TaskResumedEventTopic
638	case *eventstypes.TaskCheckpointed:
639		return runtime.TaskCheckpointedEventTopic
640	default:
641		logrus.Warnf("no topic for type %#v", e)
642	}
643	return runtime.TaskUnknownTopic
644}
645
646func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform stdio.Platform, r *process.CreateConfig, rootfs string) (*process.Init, error) {
647	var options runctypes.CreateOptions
648	if r.Options != nil {
649		v, err := typeurl.UnmarshalAny(r.Options)
650		if err != nil {
651			return nil, err
652		}
653		options = *v.(*runctypes.CreateOptions)
654	}
655
656	runtime := process.NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup)
657	p := process.New(r.ID, runtime, stdio.Stdio{
658		Stdin:    r.Stdin,
659		Stdout:   r.Stdout,
660		Stderr:   r.Stderr,
661		Terminal: r.Terminal,
662	})
663	p.Bundle = r.Bundle
664	p.Platform = platform
665	p.Rootfs = rootfs
666	p.WorkDir = workDir
667	p.IoUID = int(options.IoUid)
668	p.IoGID = int(options.IoGid)
669	p.NoPivotRoot = options.NoPivotRoot
670	p.NoNewKeyring = options.NoNewKeyring
671	p.CriuWorkPath = options.CriuWorkPath
672	if p.CriuWorkPath == "" {
673		// if criu work path not set, use container WorkDir
674		p.CriuWorkPath = p.WorkDir
675	}
676
677	return p, nil
678}
679