1// +build linux 2 3/* 4 Copyright The containerd Authors. 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17*/ 18 19package v1 20 21import ( 22 "context" 23 "io/ioutil" 24 "os" 25 "os/exec" 26 "path/filepath" 27 "sync" 28 "syscall" 29 "time" 30 31 "github.com/containerd/cgroups" 32 eventstypes "github.com/containerd/containerd/api/events" 33 "github.com/containerd/containerd/api/types/task" 34 "github.com/containerd/containerd/errdefs" 35 "github.com/containerd/containerd/mount" 36 "github.com/containerd/containerd/namespaces" 37 "github.com/containerd/containerd/pkg/oom" 38 oomv1 "github.com/containerd/containerd/pkg/oom/v1" 39 "github.com/containerd/containerd/pkg/process" 40 "github.com/containerd/containerd/pkg/stdio" 41 "github.com/containerd/containerd/runtime/v2/runc" 42 "github.com/containerd/containerd/runtime/v2/runc/options" 43 "github.com/containerd/containerd/runtime/v2/shim" 44 taskAPI "github.com/containerd/containerd/runtime/v2/task" 45 "github.com/containerd/containerd/sys/reaper" 46 runcC "github.com/containerd/go-runc" 47 "github.com/containerd/typeurl" 48 "github.com/gogo/protobuf/proto" 49 ptypes "github.com/gogo/protobuf/types" 50 "github.com/pkg/errors" 51 "github.com/sirupsen/logrus" 52 "golang.org/x/sys/unix" 53) 54 55var ( 56 _ = (taskAPI.TaskService)(&service{}) 57 empty = &ptypes.Empty{} 58) 59 60// New returns a new shim service that can be used via GRPC 61func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { 62 ep, err := oomv1.New(publisher) 63 if err != nil { 64 return nil, err 65 } 66 go ep.Run(ctx) 67 s := &service{ 68 id: id, 69 context: ctx, 70 events: make(chan interface{}, 128), 71 ec: reaper.Default.Subscribe(), 72 ep: ep, 73 cancel: shutdown, 74 } 75 go s.processExits() 76 runcC.Monitor = reaper.Default 77 if err := s.initPlatform(); err != nil { 78 shutdown() 79 return nil, errors.Wrap(err, "failed to initialized platform behavior") 80 } 81 go s.forward(ctx, publisher) 82 return s, nil 83} 84 85// service is the shim implementation of a remote shim over GRPC 86type service struct { 87 mu sync.Mutex 88 eventSendMu sync.Mutex 89 90 context context.Context 91 events chan interface{} 92 platform stdio.Platform 93 ec chan runcC.Exit 94 ep oom.Watcher 95 96 id string 97 container *runc.Container 98 99 cancel func() 100} 101 102func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { 103 ns, err := namespaces.NamespaceRequired(ctx) 104 if err != nil { 105 return nil, err 106 } 107 self, err := os.Executable() 108 if err != nil { 109 return nil, err 110 } 111 cwd, err := os.Getwd() 112 if err != nil { 113 return nil, err 114 } 115 args := []string{ 116 "-namespace", ns, 117 "-id", id, 118 "-address", containerdAddress, 119 } 120 cmd := exec.Command(self, args...) 121 cmd.Dir = cwd 122 cmd.Env = append(os.Environ(), "GOMAXPROCS=2") 123 cmd.SysProcAttr = &syscall.SysProcAttr{ 124 Setpgid: true, 125 } 126 return cmd, nil 127} 128 129func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { 130 cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress) 131 if err != nil { 132 return "", err 133 } 134 address, err := shim.SocketAddress(ctx, containerdAddress, id) 135 if err != nil { 136 return "", err 137 } 138 socket, err := shim.NewSocket(address) 139 if err != nil { 140 if !shim.SocketEaddrinuse(err) { 141 return "", err 142 } 143 if err := shim.RemoveSocket(address); err != nil { 144 return "", errors.Wrap(err, "remove already used socket") 145 } 146 if socket, err = shim.NewSocket(address); err != nil { 147 return "", err 148 } 149 } 150 f, err := socket.File() 151 if err != nil { 152 return "", err 153 } 154 155 cmd.ExtraFiles = append(cmd.ExtraFiles, f) 156 157 if err := cmd.Start(); err != nil { 158 return "", err 159 } 160 defer func() { 161 if err != nil { 162 _ = shim.RemoveSocket(address) 163 cmd.Process.Kill() 164 } 165 }() 166 // make sure to wait after start 167 go cmd.Wait() 168 if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil { 169 return "", err 170 } 171 if err := shim.WriteAddress("address", address); err != nil { 172 return "", err 173 } 174 if data, err := ioutil.ReadAll(os.Stdin); err == nil { 175 if len(data) > 0 { 176 var any ptypes.Any 177 if err := proto.Unmarshal(data, &any); err != nil { 178 return "", err 179 } 180 v, err := typeurl.UnmarshalAny(&any) 181 if err != nil { 182 return "", err 183 } 184 if opts, ok := v.(*options.Options); ok { 185 if opts.ShimCgroup != "" { 186 cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) 187 if err != nil { 188 return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) 189 } 190 if err := cg.Add(cgroups.Process{ 191 Pid: cmd.Process.Pid, 192 }); err != nil { 193 return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) 194 } 195 } 196 } 197 } 198 } 199 if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { 200 return "", errors.Wrap(err, "failed to adjust OOM score for shim") 201 } 202 return address, nil 203} 204 205func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { 206 path, err := os.Getwd() 207 if err != nil { 208 return nil, err 209 } 210 ns, err := namespaces.NamespaceRequired(ctx) 211 if err != nil { 212 return nil, err 213 } 214 runtime, err := runc.ReadRuntime(path) 215 if err != nil { 216 return nil, err 217 } 218 opts, err := runc.ReadOptions(path) 219 if err != nil { 220 return nil, err 221 } 222 root := process.RuncRoot 223 if opts != nil && opts.Root != "" { 224 root = opts.Root 225 } 226 227 r := process.NewRunc(root, path, ns, runtime, "", false) 228 if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ 229 Force: true, 230 }); err != nil { 231 logrus.WithError(err).Warn("failed to remove runc container") 232 } 233 if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { 234 logrus.WithError(err).Warn("failed to cleanup rootfs mount") 235 } 236 return &taskAPI.DeleteResponse{ 237 ExitedAt: time.Now(), 238 ExitStatus: 128 + uint32(unix.SIGKILL), 239 }, nil 240} 241 242// Create a new initial process and container with the underlying OCI runtime 243func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { 244 s.mu.Lock() 245 defer s.mu.Unlock() 246 247 container, err := runc.NewContainer(ctx, s.platform, r) 248 if err != nil { 249 return nil, err 250 } 251 252 s.container = container 253 254 s.send(&eventstypes.TaskCreate{ 255 ContainerID: r.ID, 256 Bundle: r.Bundle, 257 Rootfs: r.Rootfs, 258 IO: &eventstypes.TaskIO{ 259 Stdin: r.Stdin, 260 Stdout: r.Stdout, 261 Stderr: r.Stderr, 262 Terminal: r.Terminal, 263 }, 264 Checkpoint: r.Checkpoint, 265 Pid: uint32(container.Pid()), 266 }) 267 268 return &taskAPI.CreateTaskResponse{ 269 Pid: uint32(container.Pid()), 270 }, nil 271} 272 273// Start a process 274func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { 275 container, err := s.getContainer() 276 if err != nil { 277 return nil, err 278 } 279 280 // hold the send lock so that the start events are sent before any exit events in the error case 281 s.eventSendMu.Lock() 282 p, err := container.Start(ctx, r) 283 if err != nil { 284 s.eventSendMu.Unlock() 285 return nil, errdefs.ToGRPC(err) 286 } 287 switch r.ExecID { 288 case "": 289 if cg, ok := container.Cgroup().(cgroups.Cgroup); ok { 290 if err := s.ep.Add(container.ID, cg); err != nil { 291 logrus.WithError(err).Error("add cg to OOM monitor") 292 } 293 } else { 294 logrus.WithError(errdefs.ErrNotImplemented).Error("add cg to OOM monitor") 295 } 296 s.send(&eventstypes.TaskStart{ 297 ContainerID: container.ID, 298 Pid: uint32(p.Pid()), 299 }) 300 default: 301 s.send(&eventstypes.TaskExecStarted{ 302 ContainerID: container.ID, 303 ExecID: r.ExecID, 304 Pid: uint32(p.Pid()), 305 }) 306 } 307 s.eventSendMu.Unlock() 308 return &taskAPI.StartResponse{ 309 Pid: uint32(p.Pid()), 310 }, nil 311} 312 313// Delete the initial process and container 314func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { 315 container, err := s.getContainer() 316 if err != nil { 317 return nil, err 318 } 319 p, err := container.Delete(ctx, r) 320 if err != nil { 321 return nil, errdefs.ToGRPC(err) 322 } 323 // if we deleted our init task, close the platform and send the task delete event 324 if r.ExecID == "" { 325 if s.platform != nil { 326 s.platform.Close() 327 } 328 s.send(&eventstypes.TaskDelete{ 329 ContainerID: container.ID, 330 Pid: uint32(p.Pid()), 331 ExitStatus: uint32(p.ExitStatus()), 332 ExitedAt: p.ExitedAt(), 333 }) 334 } 335 return &taskAPI.DeleteResponse{ 336 ExitStatus: uint32(p.ExitStatus()), 337 ExitedAt: p.ExitedAt(), 338 Pid: uint32(p.Pid()), 339 }, nil 340} 341 342// Exec an additional process inside the container 343func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { 344 container, err := s.getContainer() 345 if err != nil { 346 return nil, err 347 } 348 ok, cancel := container.ReserveProcess(r.ExecID) 349 if !ok { 350 return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) 351 } 352 process, err := container.Exec(ctx, r) 353 if err != nil { 354 cancel() 355 return nil, errdefs.ToGRPC(err) 356 } 357 358 s.send(&eventstypes.TaskExecAdded{ 359 ContainerID: s.container.ID, 360 ExecID: process.ID(), 361 }) 362 return empty, nil 363} 364 365// ResizePty of a process 366func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { 367 container, err := s.getContainer() 368 if err != nil { 369 return nil, err 370 } 371 if err := container.ResizePty(ctx, r); err != nil { 372 return nil, errdefs.ToGRPC(err) 373 } 374 return empty, nil 375} 376 377// State returns runtime state information for a process 378func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { 379 p, err := s.getProcess(r.ExecID) 380 if err != nil { 381 return nil, err 382 } 383 st, err := p.Status(ctx) 384 if err != nil { 385 return nil, err 386 } 387 status := task.StatusUnknown 388 switch st { 389 case "created": 390 status = task.StatusCreated 391 case "running": 392 status = task.StatusRunning 393 case "stopped": 394 status = task.StatusStopped 395 case "paused": 396 status = task.StatusPaused 397 case "pausing": 398 status = task.StatusPausing 399 } 400 sio := p.Stdio() 401 return &taskAPI.StateResponse{ 402 ID: p.ID(), 403 Bundle: s.container.Bundle, 404 Pid: uint32(p.Pid()), 405 Status: status, 406 Stdin: sio.Stdin, 407 Stdout: sio.Stdout, 408 Stderr: sio.Stderr, 409 Terminal: sio.Terminal, 410 ExitStatus: uint32(p.ExitStatus()), 411 ExitedAt: p.ExitedAt(), 412 }, nil 413} 414 415// Pause the container 416func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { 417 container, err := s.getContainer() 418 if err != nil { 419 return nil, err 420 } 421 if err := container.Pause(ctx); err != nil { 422 return nil, errdefs.ToGRPC(err) 423 } 424 s.send(&eventstypes.TaskPaused{ 425 ContainerID: container.ID, 426 }) 427 return empty, nil 428} 429 430// Resume the container 431func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { 432 container, err := s.getContainer() 433 if err != nil { 434 return nil, err 435 } 436 if err := container.Resume(ctx); err != nil { 437 return nil, errdefs.ToGRPC(err) 438 } 439 s.send(&eventstypes.TaskResumed{ 440 ContainerID: container.ID, 441 }) 442 return empty, nil 443} 444 445// Kill a process with the provided signal 446func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { 447 container, err := s.getContainer() 448 if err != nil { 449 return nil, err 450 } 451 if err := container.Kill(ctx, r); err != nil { 452 return nil, errdefs.ToGRPC(err) 453 } 454 return empty, nil 455} 456 457// Pids returns all pids inside the container 458func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { 459 container, err := s.getContainer() 460 if err != nil { 461 return nil, err 462 } 463 pids, err := s.getContainerPids(ctx, r.ID) 464 if err != nil { 465 return nil, errdefs.ToGRPC(err) 466 } 467 var processes []*task.ProcessInfo 468 for _, pid := range pids { 469 pInfo := task.ProcessInfo{ 470 Pid: pid, 471 } 472 for _, p := range container.ExecdProcesses() { 473 if p.Pid() == int(pid) { 474 d := &options.ProcessDetails{ 475 ExecID: p.ID(), 476 } 477 a, err := typeurl.MarshalAny(d) 478 if err != nil { 479 return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) 480 } 481 pInfo.Info = a 482 break 483 } 484 } 485 processes = append(processes, &pInfo) 486 } 487 return &taskAPI.PidsResponse{ 488 Processes: processes, 489 }, nil 490} 491 492// CloseIO of a process 493func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { 494 container, err := s.getContainer() 495 if err != nil { 496 return nil, err 497 } 498 if err := container.CloseIO(ctx, r); err != nil { 499 return nil, err 500 } 501 return empty, nil 502} 503 504// Checkpoint the container 505func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { 506 container, err := s.getContainer() 507 if err != nil { 508 return nil, err 509 } 510 if err := container.Checkpoint(ctx, r); err != nil { 511 return nil, errdefs.ToGRPC(err) 512 } 513 return empty, nil 514} 515 516// Update a running container 517func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { 518 container, err := s.getContainer() 519 if err != nil { 520 return nil, err 521 } 522 if err := container.Update(ctx, r); err != nil { 523 return nil, errdefs.ToGRPC(err) 524 } 525 return empty, nil 526} 527 528// Wait for a process to exit 529func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { 530 container, err := s.getContainer() 531 if err != nil { 532 return nil, err 533 } 534 p, err := container.Process(r.ExecID) 535 if err != nil { 536 return nil, errdefs.ToGRPC(err) 537 } 538 p.Wait() 539 540 return &taskAPI.WaitResponse{ 541 ExitStatus: uint32(p.ExitStatus()), 542 ExitedAt: p.ExitedAt(), 543 }, nil 544} 545 546// Connect returns shim information such as the shim's pid 547func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { 548 var pid int 549 if s.container != nil { 550 pid = s.container.Pid() 551 } 552 return &taskAPI.ConnectResponse{ 553 ShimPid: uint32(os.Getpid()), 554 TaskPid: uint32(pid), 555 }, nil 556} 557 558func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { 559 s.cancel() 560 close(s.events) 561 if address, err := shim.ReadAddress("address"); err == nil { 562 _ = shim.RemoveSocket(address) 563 } 564 return empty, nil 565} 566 567func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { 568 cgx := s.container.Cgroup() 569 if cgx == nil { 570 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") 571 } 572 cg, ok := cgx.(cgroups.Cgroup) 573 if !ok { 574 return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "cgroup v2 not implemented for Stats") 575 } 576 if cg == nil { 577 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") 578 } 579 stats, err := cg.Stat(cgroups.IgnoreNotExist) 580 if err != nil { 581 return nil, err 582 } 583 data, err := typeurl.MarshalAny(stats) 584 if err != nil { 585 return nil, err 586 } 587 return &taskAPI.StatsResponse{ 588 Stats: data, 589 }, nil 590} 591 592func (s *service) processExits() { 593 for e := range s.ec { 594 s.checkProcesses(e) 595 } 596} 597 598func (s *service) send(evt interface{}) { 599 s.events <- evt 600} 601 602func (s *service) sendL(evt interface{}) { 603 s.eventSendMu.Lock() 604 s.events <- evt 605 s.eventSendMu.Unlock() 606} 607 608func (s *service) checkProcesses(e runcC.Exit) { 609 container, err := s.getContainer() 610 if err != nil { 611 return 612 } 613 614 for _, p := range container.All() { 615 if p.Pid() == e.Pid { 616 if runc.ShouldKillAllOnExit(s.context, container.Bundle) { 617 if ip, ok := p.(*process.Init); ok { 618 // Ensure all children are killed 619 if err := ip.KillAll(s.context); err != nil { 620 logrus.WithError(err).WithField("id", ip.ID()). 621 Error("failed to kill init's children") 622 } 623 } 624 } 625 p.SetExited(e.Status) 626 s.sendL(&eventstypes.TaskExit{ 627 ContainerID: container.ID, 628 ID: p.ID(), 629 Pid: uint32(e.Pid), 630 ExitStatus: uint32(e.Status), 631 ExitedAt: p.ExitedAt(), 632 }) 633 return 634 } 635 } 636} 637 638func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { 639 p, err := s.container.Process("") 640 if err != nil { 641 return nil, errdefs.ToGRPC(err) 642 } 643 ps, err := p.(*process.Init).Runtime().Ps(ctx, id) 644 if err != nil { 645 return nil, err 646 } 647 pids := make([]uint32, 0, len(ps)) 648 for _, pid := range ps { 649 pids = append(pids, uint32(pid)) 650 } 651 return pids, nil 652} 653 654func (s *service) forward(ctx context.Context, publisher shim.Publisher) { 655 ns, _ := namespaces.Namespace(ctx) 656 ctx = namespaces.WithNamespace(context.Background(), ns) 657 for e := range s.events { 658 ctx, cancel := context.WithTimeout(ctx, 5*time.Second) 659 err := publisher.Publish(ctx, runc.GetTopic(e), e) 660 cancel() 661 if err != nil { 662 logrus.WithError(err).Error("post event") 663 } 664 } 665 publisher.Close() 666} 667 668func (s *service) getContainer() (*runc.Container, error) { 669 s.mu.Lock() 670 container := s.container 671 s.mu.Unlock() 672 if container == nil { 673 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") 674 } 675 return container, nil 676} 677 678func (s *service) getProcess(execID string) (process.Process, error) { 679 container, err := s.getContainer() 680 if err != nil { 681 return nil, err 682 } 683 p, err := container.Process(execID) 684 if err != nil { 685 return nil, errdefs.ToGRPC(err) 686 } 687 return p, nil 688} 689 690// initialize a single epoll fd to manage our consoles. `initPlatform` should 691// only be called once. 692func (s *service) initPlatform() error { 693 if s.platform != nil { 694 return nil 695 } 696 p, err := runc.NewPlatform() 697 if err != nil { 698 return err 699 } 700 s.platform = p 701 return nil 702} 703