1// +build !windows
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package client
20
21import (
22	"context"
23	"fmt"
24	"io"
25	"io/ioutil"
26	"net"
27	"os"
28	"os/exec"
29	"path/filepath"
30	"strconv"
31	"strings"
32	"sync"
33	"syscall"
34	"time"
35
36	"golang.org/x/sys/unix"
37
38	"github.com/containerd/ttrpc"
39	"github.com/pkg/errors"
40	"github.com/sirupsen/logrus"
41
42	"github.com/containerd/containerd/events"
43	"github.com/containerd/containerd/log"
44	"github.com/containerd/containerd/pkg/dialer"
45	v1 "github.com/containerd/containerd/runtime/v1"
46	"github.com/containerd/containerd/runtime/v1/shim"
47	shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
48	"github.com/containerd/containerd/sys"
49	ptypes "github.com/gogo/protobuf/types"
50)
51
52var empty = &ptypes.Empty{}
53
54// Opt is an option for a shim client configuration
55type Opt func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error)
56
57// WithStart executes a new shim process
58func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHandler func()) Opt {
59	return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) {
60		socket, err := newSocket(address)
61		if err != nil {
62			if !eaddrinuse(err) {
63				return nil, nil, err
64			}
65			if err := RemoveSocket(address); err != nil {
66				return nil, nil, errors.Wrap(err, "remove already used socket")
67			}
68			if socket, err = newSocket(address); err != nil {
69				return nil, nil, err
70			}
71		}
72
73		f, err := socket.File()
74		if err != nil {
75			return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", address)
76		}
77		defer f.Close()
78
79		stdoutCopy := ioutil.Discard
80		stderrCopy := ioutil.Discard
81		stdoutLog, err := v1.OpenShimStdoutLog(ctx, config.WorkDir)
82		if err != nil {
83			return nil, nil, errors.Wrapf(err, "failed to create stdout log")
84		}
85
86		stderrLog, err := v1.OpenShimStderrLog(ctx, config.WorkDir)
87		if err != nil {
88			return nil, nil, errors.Wrapf(err, "failed to create stderr log")
89		}
90		if debug {
91			stdoutCopy = os.Stdout
92			stderrCopy = os.Stderr
93		}
94
95		go io.Copy(stdoutCopy, stdoutLog)
96		go io.Copy(stderrCopy, stderrLog)
97
98		cmd, err := newCommand(binary, daemonAddress, debug, config, f, stdoutLog, stderrLog)
99		if err != nil {
100			return nil, nil, err
101		}
102		if err := cmd.Start(); err != nil {
103			return nil, nil, errors.Wrapf(err, "failed to start shim")
104		}
105		defer func() {
106			if err != nil {
107				cmd.Process.Kill()
108			}
109		}()
110		go func() {
111			cmd.Wait()
112			exitHandler()
113			if stdoutLog != nil {
114				stdoutLog.Close()
115			}
116			if stderrLog != nil {
117				stderrLog.Close()
118			}
119			socket.Close()
120			RemoveSocket(address)
121		}()
122		log.G(ctx).WithFields(logrus.Fields{
123			"pid":     cmd.Process.Pid,
124			"address": address,
125			"debug":   debug,
126		}).Infof("shim %s started", binary)
127
128		if err := writeFile(filepath.Join(config.Path, "address"), address); err != nil {
129			return nil, nil, err
130		}
131		if err := writeFile(filepath.Join(config.Path, "shim.pid"), strconv.Itoa(cmd.Process.Pid)); err != nil {
132			return nil, nil, err
133		}
134		// set shim in cgroup if it is provided
135		if cgroup != "" {
136			if err := setCgroup(cgroup, cmd); err != nil {
137				return nil, nil, err
138			}
139			log.G(ctx).WithFields(logrus.Fields{
140				"pid":     cmd.Process.Pid,
141				"address": address,
142			}).Infof("shim placed in cgroup %s", cgroup)
143		}
144		if err = setupOOMScore(cmd.Process.Pid); err != nil {
145			return nil, nil, err
146		}
147		c, clo, err := WithConnect(address, func() {})(ctx, config)
148		if err != nil {
149			return nil, nil, errors.Wrap(err, "failed to connect")
150		}
151		return c, clo, nil
152	}
153}
154
155func eaddrinuse(err error) bool {
156	cause := errors.Cause(err)
157	netErr, ok := cause.(*net.OpError)
158	if !ok {
159		return false
160	}
161	if netErr.Op != "listen" {
162		return false
163	}
164	syscallErr, ok := netErr.Err.(*os.SyscallError)
165	if !ok {
166		return false
167	}
168	errno, ok := syscallErr.Err.(syscall.Errno)
169	if !ok {
170		return false
171	}
172	return errno == syscall.EADDRINUSE
173}
174
175// setupOOMScore gets containerd's oom score and adds +1 to it
176// to ensure a shim has a lower* score than the daemons
177func setupOOMScore(shimPid int) error {
178	pid := os.Getpid()
179	score, err := sys.GetOOMScoreAdj(pid)
180	if err != nil {
181		return errors.Wrap(err, "get daemon OOM score")
182	}
183	shimScore := score + 1
184	if err := sys.SetOOMScore(shimPid, shimScore); err != nil {
185		return errors.Wrap(err, "set shim OOM score")
186	}
187	return nil
188}
189
190func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File, stdout, stderr io.Writer) (*exec.Cmd, error) {
191	selfExe, err := os.Executable()
192	if err != nil {
193		return nil, err
194	}
195	args := []string{
196		"-namespace", config.Namespace,
197		"-workdir", config.WorkDir,
198		"-address", daemonAddress,
199		"-containerd-binary", selfExe,
200	}
201
202	if config.Criu != "" {
203		args = append(args, "-criu-path", config.Criu)
204	}
205	if config.RuntimeRoot != "" {
206		args = append(args, "-runtime-root", config.RuntimeRoot)
207	}
208	if config.SystemdCgroup {
209		args = append(args, "-systemd-cgroup")
210	}
211	if debug {
212		args = append(args, "-debug")
213	}
214
215	cmd := exec.Command(binary, args...)
216	cmd.Dir = config.Path
217	// make sure the shim can be re-parented to system init
218	// and is cloned in a new mount namespace because the overlay/filesystems
219	// will be mounted by the shim
220	cmd.SysProcAttr = getSysProcAttr()
221	cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
222	cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
223	cmd.Stdout = stdout
224	cmd.Stderr = stderr
225	return cmd, nil
226}
227
228// writeFile writes a address file atomically
229func writeFile(path, address string) error {
230	path, err := filepath.Abs(path)
231	if err != nil {
232		return err
233	}
234	tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
235	f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
236	if err != nil {
237		return err
238	}
239	_, err = f.WriteString(address)
240	f.Close()
241	if err != nil {
242		return err
243	}
244	return os.Rename(tempPath, path)
245}
246
247const (
248	abstractSocketPrefix = "\x00"
249	socketPathLimit      = 106
250)
251
252type socket string
253
254func (s socket) isAbstract() bool {
255	return !strings.HasPrefix(string(s), "unix://")
256}
257
258func (s socket) path() string {
259	path := strings.TrimPrefix(string(s), "unix://")
260	// if there was no trim performed, we assume an abstract socket
261	if len(path) == len(s) {
262		path = abstractSocketPrefix + path
263	}
264	return path
265}
266
267func newSocket(address string) (*net.UnixListener, error) {
268	if len(address) > socketPathLimit {
269		return nil, errors.Errorf("%q: unix socket path too long (> %d)", address, socketPathLimit)
270	}
271	var (
272		sock = socket(address)
273		path = sock.path()
274	)
275	if !sock.isAbstract() {
276		if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
277			return nil, errors.Wrapf(err, "%s", path)
278		}
279	}
280	l, err := net.Listen("unix", path)
281	if err != nil {
282		return nil, errors.Wrapf(err, "failed to listen to unix socket %q (abstract: %t)", address, sock.isAbstract())
283	}
284	if err := os.Chmod(path, 0600); err != nil {
285		l.Close()
286		return nil, err
287	}
288
289	return l.(*net.UnixListener), nil
290}
291
292// RemoveSocket removes the socket at the specified address if
293// it exists on the filesystem
294func RemoveSocket(address string) error {
295	sock := socket(address)
296	if !sock.isAbstract() {
297		return os.Remove(sock.path())
298	}
299	return nil
300}
301
302func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
303	return d(address, 100*time.Second)
304}
305
306func anonDialer(address string, timeout time.Duration) (net.Conn, error) {
307	return dialer.Dialer(socket(address).path(), timeout)
308}
309
310// WithConnect connects to an existing shim
311func WithConnect(address string, onClose func()) Opt {
312	return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
313		conn, err := connect(address, anonDialer)
314		if err != nil {
315			return nil, nil, err
316		}
317		client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
318		return shimapi.NewShimClient(client), conn, nil
319	}
320}
321
322// WithLocal uses an in process shim
323func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) {
324	return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
325		service, err := shim.NewService(config, publisher)
326		if err != nil {
327			return nil, nil, err
328		}
329		return shim.NewLocal(service), nil, nil
330	}
331}
332
333// New returns a new shim client
334func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) {
335	s, c, err := opt(ctx, config)
336	if err != nil {
337		return nil, err
338	}
339	return &Client{
340		ShimService: s,
341		c:           c,
342		exitCh:      make(chan struct{}),
343	}, nil
344}
345
346// Client is a shim client containing the connection to a shim
347type Client struct {
348	shimapi.ShimService
349
350	c        io.Closer
351	exitCh   chan struct{}
352	exitOnce sync.Once
353}
354
355// IsAlive returns true if the shim can be contacted.
356// NOTE: a negative answer doesn't mean that the process is gone.
357func (c *Client) IsAlive(ctx context.Context) (bool, error) {
358	_, err := c.ShimInfo(ctx, empty)
359	if err != nil {
360		// TODO(stevvooe): There are some error conditions that need to be
361		// handle with unix sockets existence to give the right answer here.
362		return false, err
363	}
364	return true, nil
365}
366
367// StopShim signals the shim to exit and wait for the process to disappear
368func (c *Client) StopShim(ctx context.Context) error {
369	return c.signalShim(ctx, unix.SIGTERM)
370}
371
372// KillShim kills the shim forcefully and wait for the process to disappear
373func (c *Client) KillShim(ctx context.Context) error {
374	return c.signalShim(ctx, unix.SIGKILL)
375}
376
377// Close the client connection
378func (c *Client) Close() error {
379	if c.c == nil {
380		return nil
381	}
382	return c.c.Close()
383}
384
385func (c *Client) signalShim(ctx context.Context, sig syscall.Signal) error {
386	info, err := c.ShimInfo(ctx, empty)
387	if err != nil {
388		return err
389	}
390	pid := int(info.ShimPid)
391	// make sure we don't kill ourselves if we are running a local shim
392	if os.Getpid() == pid {
393		return nil
394	}
395	if err := unix.Kill(pid, sig); err != nil && err != unix.ESRCH {
396		return err
397	}
398	// wait for shim to die after being signaled
399	select {
400	case <-ctx.Done():
401		return ctx.Err()
402	case <-c.waitForExit(ctx, pid):
403		return nil
404	}
405}
406
407func (c *Client) waitForExit(ctx context.Context, pid int) <-chan struct{} {
408	go c.exitOnce.Do(func() {
409		defer close(c.exitCh)
410
411		ticker := time.NewTicker(10 * time.Millisecond)
412		defer ticker.Stop()
413
414		for {
415			// use kill(pid, 0) here because the shim could have been reparented
416			// and we are no longer able to waitpid(pid, ...) on the shim
417			if err := unix.Kill(pid, 0); err == unix.ESRCH {
418				return
419			}
420
421			select {
422			case <-ticker.C:
423			case <-ctx.Done():
424				log.G(ctx).WithField("pid", pid).Warn("timed out while waiting for shim to exit")
425				return
426			}
427		}
428	})
429	return c.exitCh
430}
431