1// +build !windows 2 3/* 4 Copyright The containerd Authors. 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17*/ 18 19package client 20 21import ( 22 "context" 23 "fmt" 24 "io" 25 "io/ioutil" 26 "net" 27 "os" 28 "os/exec" 29 "path/filepath" 30 "strconv" 31 "strings" 32 "sync" 33 "syscall" 34 "time" 35 36 "golang.org/x/sys/unix" 37 38 "github.com/containerd/ttrpc" 39 "github.com/pkg/errors" 40 "github.com/sirupsen/logrus" 41 42 "github.com/containerd/containerd/events" 43 "github.com/containerd/containerd/log" 44 "github.com/containerd/containerd/pkg/dialer" 45 v1 "github.com/containerd/containerd/runtime/v1" 46 "github.com/containerd/containerd/runtime/v1/shim" 47 shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" 48 "github.com/containerd/containerd/sys" 49 ptypes "github.com/gogo/protobuf/types" 50) 51 52var empty = &ptypes.Empty{} 53 54// Opt is an option for a shim client configuration 55type Opt func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) 56 57// WithStart executes a new shim process 58func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHandler func()) Opt { 59 return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) { 60 socket, err := newSocket(address) 61 if err != nil { 62 if !eaddrinuse(err) { 63 return nil, nil, err 64 } 65 if err := RemoveSocket(address); err != nil { 66 return nil, nil, errors.Wrap(err, "remove already used socket") 67 } 68 if socket, err = newSocket(address); err != nil { 69 return nil, nil, err 70 } 71 } 72 73 f, err := socket.File() 74 if err != nil { 75 return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", address) 76 } 77 defer f.Close() 78 79 stdoutCopy := ioutil.Discard 80 stderrCopy := ioutil.Discard 81 stdoutLog, err := v1.OpenShimStdoutLog(ctx, config.WorkDir) 82 if err != nil { 83 return nil, nil, errors.Wrapf(err, "failed to create stdout log") 84 } 85 86 stderrLog, err := v1.OpenShimStderrLog(ctx, config.WorkDir) 87 if err != nil { 88 return nil, nil, errors.Wrapf(err, "failed to create stderr log") 89 } 90 if debug { 91 stdoutCopy = os.Stdout 92 stderrCopy = os.Stderr 93 } 94 95 go io.Copy(stdoutCopy, stdoutLog) 96 go io.Copy(stderrCopy, stderrLog) 97 98 cmd, err := newCommand(binary, daemonAddress, debug, config, f, stdoutLog, stderrLog) 99 if err != nil { 100 return nil, nil, err 101 } 102 if err := cmd.Start(); err != nil { 103 return nil, nil, errors.Wrapf(err, "failed to start shim") 104 } 105 defer func() { 106 if err != nil { 107 cmd.Process.Kill() 108 } 109 }() 110 go func() { 111 cmd.Wait() 112 exitHandler() 113 if stdoutLog != nil { 114 stdoutLog.Close() 115 } 116 if stderrLog != nil { 117 stderrLog.Close() 118 } 119 socket.Close() 120 RemoveSocket(address) 121 }() 122 log.G(ctx).WithFields(logrus.Fields{ 123 "pid": cmd.Process.Pid, 124 "address": address, 125 "debug": debug, 126 }).Infof("shim %s started", binary) 127 128 if err := writeFile(filepath.Join(config.Path, "address"), address); err != nil { 129 return nil, nil, err 130 } 131 if err := writeFile(filepath.Join(config.Path, "shim.pid"), strconv.Itoa(cmd.Process.Pid)); err != nil { 132 return nil, nil, err 133 } 134 // set shim in cgroup if it is provided 135 if cgroup != "" { 136 if err := setCgroup(cgroup, cmd); err != nil { 137 return nil, nil, err 138 } 139 log.G(ctx).WithFields(logrus.Fields{ 140 "pid": cmd.Process.Pid, 141 "address": address, 142 }).Infof("shim placed in cgroup %s", cgroup) 143 } 144 if err = setupOOMScore(cmd.Process.Pid); err != nil { 145 return nil, nil, err 146 } 147 c, clo, err := WithConnect(address, func() {})(ctx, config) 148 if err != nil { 149 return nil, nil, errors.Wrap(err, "failed to connect") 150 } 151 return c, clo, nil 152 } 153} 154 155func eaddrinuse(err error) bool { 156 cause := errors.Cause(err) 157 netErr, ok := cause.(*net.OpError) 158 if !ok { 159 return false 160 } 161 if netErr.Op != "listen" { 162 return false 163 } 164 syscallErr, ok := netErr.Err.(*os.SyscallError) 165 if !ok { 166 return false 167 } 168 errno, ok := syscallErr.Err.(syscall.Errno) 169 if !ok { 170 return false 171 } 172 return errno == syscall.EADDRINUSE 173} 174 175// setupOOMScore gets containerd's oom score and adds +1 to it 176// to ensure a shim has a lower* score than the daemons 177func setupOOMScore(shimPid int) error { 178 pid := os.Getpid() 179 score, err := sys.GetOOMScoreAdj(pid) 180 if err != nil { 181 return errors.Wrap(err, "get daemon OOM score") 182 } 183 shimScore := score + 1 184 if err := sys.SetOOMScore(shimPid, shimScore); err != nil { 185 return errors.Wrap(err, "set shim OOM score") 186 } 187 return nil 188} 189 190func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File, stdout, stderr io.Writer) (*exec.Cmd, error) { 191 selfExe, err := os.Executable() 192 if err != nil { 193 return nil, err 194 } 195 args := []string{ 196 "-namespace", config.Namespace, 197 "-workdir", config.WorkDir, 198 "-address", daemonAddress, 199 "-containerd-binary", selfExe, 200 } 201 202 if config.Criu != "" { 203 args = append(args, "-criu-path", config.Criu) 204 } 205 if config.RuntimeRoot != "" { 206 args = append(args, "-runtime-root", config.RuntimeRoot) 207 } 208 if config.SystemdCgroup { 209 args = append(args, "-systemd-cgroup") 210 } 211 if debug { 212 args = append(args, "-debug") 213 } 214 215 cmd := exec.Command(binary, args...) 216 cmd.Dir = config.Path 217 // make sure the shim can be re-parented to system init 218 // and is cloned in a new mount namespace because the overlay/filesystems 219 // will be mounted by the shim 220 cmd.SysProcAttr = getSysProcAttr() 221 cmd.ExtraFiles = append(cmd.ExtraFiles, socket) 222 cmd.Env = append(os.Environ(), "GOMAXPROCS=2") 223 cmd.Stdout = stdout 224 cmd.Stderr = stderr 225 return cmd, nil 226} 227 228// writeFile writes a address file atomically 229func writeFile(path, address string) error { 230 path, err := filepath.Abs(path) 231 if err != nil { 232 return err 233 } 234 tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path))) 235 f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666) 236 if err != nil { 237 return err 238 } 239 _, err = f.WriteString(address) 240 f.Close() 241 if err != nil { 242 return err 243 } 244 return os.Rename(tempPath, path) 245} 246 247const ( 248 abstractSocketPrefix = "\x00" 249 socketPathLimit = 106 250) 251 252type socket string 253 254func (s socket) isAbstract() bool { 255 return !strings.HasPrefix(string(s), "unix://") 256} 257 258func (s socket) path() string { 259 path := strings.TrimPrefix(string(s), "unix://") 260 // if there was no trim performed, we assume an abstract socket 261 if len(path) == len(s) { 262 path = abstractSocketPrefix + path 263 } 264 return path 265} 266 267func newSocket(address string) (*net.UnixListener, error) { 268 if len(address) > socketPathLimit { 269 return nil, errors.Errorf("%q: unix socket path too long (> %d)", address, socketPathLimit) 270 } 271 var ( 272 sock = socket(address) 273 path = sock.path() 274 ) 275 if !sock.isAbstract() { 276 if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil { 277 return nil, errors.Wrapf(err, "%s", path) 278 } 279 } 280 l, err := net.Listen("unix", path) 281 if err != nil { 282 return nil, errors.Wrapf(err, "failed to listen to unix socket %q (abstract: %t)", address, sock.isAbstract()) 283 } 284 if err := os.Chmod(path, 0600); err != nil { 285 l.Close() 286 return nil, err 287 } 288 289 return l.(*net.UnixListener), nil 290} 291 292// RemoveSocket removes the socket at the specified address if 293// it exists on the filesystem 294func RemoveSocket(address string) error { 295 sock := socket(address) 296 if !sock.isAbstract() { 297 return os.Remove(sock.path()) 298 } 299 return nil 300} 301 302func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { 303 return d(address, 100*time.Second) 304} 305 306func anonDialer(address string, timeout time.Duration) (net.Conn, error) { 307 return dialer.Dialer(socket(address).path(), timeout) 308} 309 310// WithConnect connects to an existing shim 311func WithConnect(address string, onClose func()) Opt { 312 return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) { 313 conn, err := connect(address, anonDialer) 314 if err != nil { 315 return nil, nil, err 316 } 317 client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) 318 return shimapi.NewShimClient(client), conn, nil 319 } 320} 321 322// WithLocal uses an in process shim 323func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) { 324 return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) { 325 service, err := shim.NewService(config, publisher) 326 if err != nil { 327 return nil, nil, err 328 } 329 return shim.NewLocal(service), nil, nil 330 } 331} 332 333// New returns a new shim client 334func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) { 335 s, c, err := opt(ctx, config) 336 if err != nil { 337 return nil, err 338 } 339 return &Client{ 340 ShimService: s, 341 c: c, 342 exitCh: make(chan struct{}), 343 }, nil 344} 345 346// Client is a shim client containing the connection to a shim 347type Client struct { 348 shimapi.ShimService 349 350 c io.Closer 351 exitCh chan struct{} 352 exitOnce sync.Once 353} 354 355// IsAlive returns true if the shim can be contacted. 356// NOTE: a negative answer doesn't mean that the process is gone. 357func (c *Client) IsAlive(ctx context.Context) (bool, error) { 358 _, err := c.ShimInfo(ctx, empty) 359 if err != nil { 360 // TODO(stevvooe): There are some error conditions that need to be 361 // handle with unix sockets existence to give the right answer here. 362 return false, err 363 } 364 return true, nil 365} 366 367// StopShim signals the shim to exit and wait for the process to disappear 368func (c *Client) StopShim(ctx context.Context) error { 369 return c.signalShim(ctx, unix.SIGTERM) 370} 371 372// KillShim kills the shim forcefully and wait for the process to disappear 373func (c *Client) KillShim(ctx context.Context) error { 374 return c.signalShim(ctx, unix.SIGKILL) 375} 376 377// Close the client connection 378func (c *Client) Close() error { 379 if c.c == nil { 380 return nil 381 } 382 return c.c.Close() 383} 384 385func (c *Client) signalShim(ctx context.Context, sig syscall.Signal) error { 386 info, err := c.ShimInfo(ctx, empty) 387 if err != nil { 388 return err 389 } 390 pid := int(info.ShimPid) 391 // make sure we don't kill ourselves if we are running a local shim 392 if os.Getpid() == pid { 393 return nil 394 } 395 if err := unix.Kill(pid, sig); err != nil && err != unix.ESRCH { 396 return err 397 } 398 // wait for shim to die after being signaled 399 select { 400 case <-ctx.Done(): 401 return ctx.Err() 402 case <-c.waitForExit(ctx, pid): 403 return nil 404 } 405} 406 407func (c *Client) waitForExit(ctx context.Context, pid int) <-chan struct{} { 408 go c.exitOnce.Do(func() { 409 defer close(c.exitCh) 410 411 ticker := time.NewTicker(10 * time.Millisecond) 412 defer ticker.Stop() 413 414 for { 415 // use kill(pid, 0) here because the shim could have been reparented 416 // and we are no longer able to waitpid(pid, ...) on the shim 417 if err := unix.Kill(pid, 0); err == unix.ESRCH { 418 return 419 } 420 421 select { 422 case <-ticker.C: 423 case <-ctx.Done(): 424 log.G(ctx).WithField("pid", pid).Warn("timed out while waiting for shim to exit") 425 return 426 } 427 } 428 }) 429 return c.exitCh 430} 431