1// +build linux 2 3/* 4 Copyright The containerd Authors. 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17*/ 18 19package v2 20 21import ( 22 "context" 23 "encoding/json" 24 "io/ioutil" 25 "os" 26 "os/exec" 27 "path/filepath" 28 "strings" 29 "sync" 30 "syscall" 31 "time" 32 33 "github.com/containerd/cgroups" 34 eventstypes "github.com/containerd/containerd/api/events" 35 "github.com/containerd/containerd/api/types/task" 36 "github.com/containerd/containerd/errdefs" 37 "github.com/containerd/containerd/log" 38 "github.com/containerd/containerd/mount" 39 "github.com/containerd/containerd/namespaces" 40 "github.com/containerd/containerd/pkg/oom" 41 "github.com/containerd/containerd/pkg/process" 42 "github.com/containerd/containerd/pkg/stdio" 43 "github.com/containerd/containerd/runtime/v2/runc" 44 "github.com/containerd/containerd/runtime/v2/runc/options" 45 "github.com/containerd/containerd/runtime/v2/shim" 46 taskAPI "github.com/containerd/containerd/runtime/v2/task" 47 "github.com/containerd/containerd/sys/reaper" 48 runcC "github.com/containerd/go-runc" 49 "github.com/containerd/typeurl" 50 "github.com/gogo/protobuf/proto" 51 "github.com/gogo/protobuf/types" 52 ptypes "github.com/gogo/protobuf/types" 53 specs "github.com/opencontainers/runtime-spec/specs-go" 54 "github.com/pkg/errors" 55 "github.com/sirupsen/logrus" 56 "golang.org/x/sys/unix" 57) 58 59var ( 60 _ = (taskAPI.TaskService)(&service{}) 61 empty = &ptypes.Empty{} 62) 63 64// group labels specifies how the shim groups services. 65// currently supports a runc.v2 specific .group label and the 66// standard k8s pod label. Order matters in this list 67var groupLabels = []string{ 68 "io.containerd.runc.v2.group", 69 "io.kubernetes.cri.sandbox-id", 70} 71 72type spec struct { 73 Annotations map[string]string `json:"annotations,omitempty"` 74} 75 76// New returns a new shim service that can be used via GRPC 77func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { 78 ep, err := oom.New(publisher) 79 if err != nil { 80 return nil, err 81 } 82 go ep.Run(ctx) 83 s := &service{ 84 id: id, 85 context: ctx, 86 events: make(chan interface{}, 128), 87 ec: reaper.Default.Subscribe(), 88 ep: ep, 89 cancel: shutdown, 90 containers: make(map[string]*runc.Container), 91 } 92 go s.processExits() 93 runcC.Monitor = reaper.Default 94 if err := s.initPlatform(); err != nil { 95 shutdown() 96 return nil, errors.Wrap(err, "failed to initialized platform behavior") 97 } 98 go s.forward(ctx, publisher) 99 return s, nil 100} 101 102// service is the shim implementation of a remote shim over GRPC 103type service struct { 104 mu sync.Mutex 105 eventSendMu sync.Mutex 106 107 context context.Context 108 events chan interface{} 109 platform stdio.Platform 110 ec chan runcC.Exit 111 ep *oom.Epoller 112 113 // id only used in cleanup case 114 id string 115 116 containers map[string]*runc.Container 117 118 cancel func() 119} 120 121func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { 122 ns, err := namespaces.NamespaceRequired(ctx) 123 if err != nil { 124 return nil, err 125 } 126 self, err := os.Executable() 127 if err != nil { 128 return nil, err 129 } 130 cwd, err := os.Getwd() 131 if err != nil { 132 return nil, err 133 } 134 args := []string{ 135 "-namespace", ns, 136 "-id", id, 137 "-address", containerdAddress, 138 } 139 cmd := exec.Command(self, args...) 140 cmd.Dir = cwd 141 cmd.Env = append(os.Environ(), "GOMAXPROCS=4") 142 cmd.SysProcAttr = &syscall.SysProcAttr{ 143 Setpgid: true, 144 } 145 return cmd, nil 146} 147 148func readSpec() (*spec, error) { 149 f, err := os.Open("config.json") 150 if err != nil { 151 return nil, err 152 } 153 defer f.Close() 154 var s spec 155 if err := json.NewDecoder(f).Decode(&s); err != nil { 156 return nil, err 157 } 158 return &s, nil 159} 160 161func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { 162 cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress) 163 if err != nil { 164 return "", err 165 } 166 grouping := id 167 spec, err := readSpec() 168 if err != nil { 169 return "", err 170 } 171 for _, group := range groupLabels { 172 if groupID, ok := spec.Annotations[group]; ok { 173 grouping = groupID 174 break 175 } 176 } 177 address, err := shim.SocketAddress(ctx, grouping) 178 if err != nil { 179 return "", err 180 } 181 socket, err := shim.NewSocket(address) 182 if err != nil { 183 if strings.Contains(err.Error(), "address already in use") { 184 if err := shim.WriteAddress("address", address); err != nil { 185 return "", err 186 } 187 return address, nil 188 } 189 return "", err 190 } 191 defer socket.Close() 192 f, err := socket.File() 193 if err != nil { 194 return "", err 195 } 196 defer f.Close() 197 198 cmd.ExtraFiles = append(cmd.ExtraFiles, f) 199 200 if err := cmd.Start(); err != nil { 201 return "", err 202 } 203 defer func() { 204 if err != nil { 205 cmd.Process.Kill() 206 } 207 }() 208 // make sure to wait after start 209 go cmd.Wait() 210 if err := shim.WriteAddress("address", address); err != nil { 211 return "", err 212 } 213 if data, err := ioutil.ReadAll(os.Stdin); err == nil { 214 if len(data) > 0 { 215 var any types.Any 216 if err := proto.Unmarshal(data, &any); err != nil { 217 return "", err 218 } 219 v, err := typeurl.UnmarshalAny(&any) 220 if err != nil { 221 return "", err 222 } 223 if opts, ok := v.(*options.Options); ok { 224 if opts.ShimCgroup != "" { 225 cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) 226 if err != nil { 227 return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) 228 } 229 if err := cg.Add(cgroups.Process{ 230 Pid: cmd.Process.Pid, 231 }); err != nil { 232 return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) 233 } 234 } 235 } 236 } 237 } 238 if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { 239 return "", errors.Wrap(err, "failed to adjust OOM score for shim") 240 } 241 return address, nil 242} 243 244func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { 245 cwd, err := os.Getwd() 246 if err != nil { 247 return nil, err 248 } 249 path := filepath.Join(filepath.Dir(cwd), s.id) 250 ns, err := namespaces.NamespaceRequired(ctx) 251 if err != nil { 252 return nil, err 253 } 254 255 runtime, err := runc.ReadRuntime(path) 256 if err != nil { 257 return nil, err 258 } 259 r := process.NewRunc(process.RuncRoot, path, ns, runtime, "", false) 260 if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ 261 Force: true, 262 }); err != nil { 263 logrus.WithError(err).Warn("failed to remove runc container") 264 } 265 if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { 266 logrus.WithError(err).Warn("failed to cleanup rootfs mount") 267 } 268 return &taskAPI.DeleteResponse{ 269 ExitedAt: time.Now(), 270 ExitStatus: 128 + uint32(unix.SIGKILL), 271 }, nil 272} 273 274// Create a new initial process and container with the underlying OCI runtime 275func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { 276 s.mu.Lock() 277 defer s.mu.Unlock() 278 279 container, err := runc.NewContainer(ctx, s.platform, r) 280 if err != nil { 281 return nil, err 282 } 283 284 s.containers[r.ID] = container 285 286 s.send(&eventstypes.TaskCreate{ 287 ContainerID: r.ID, 288 Bundle: r.Bundle, 289 Rootfs: r.Rootfs, 290 IO: &eventstypes.TaskIO{ 291 Stdin: r.Stdin, 292 Stdout: r.Stdout, 293 Stderr: r.Stderr, 294 Terminal: r.Terminal, 295 }, 296 Checkpoint: r.Checkpoint, 297 Pid: uint32(container.Pid()), 298 }) 299 300 return &taskAPI.CreateTaskResponse{ 301 Pid: uint32(container.Pid()), 302 }, nil 303} 304 305// Start a process 306func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { 307 container, err := s.getContainer(r.ID) 308 if err != nil { 309 return nil, err 310 } 311 312 // hold the send lock so that the start events are sent before any exit events in the error case 313 s.eventSendMu.Lock() 314 p, err := container.Start(ctx, r) 315 if err != nil { 316 s.eventSendMu.Unlock() 317 return nil, errdefs.ToGRPC(err) 318 } 319 if err := s.ep.Add(container.ID, container.Cgroup()); err != nil { 320 logrus.WithError(err).Error("add cg to OOM monitor") 321 } 322 switch r.ExecID { 323 case "": 324 s.send(&eventstypes.TaskStart{ 325 ContainerID: container.ID, 326 Pid: uint32(p.Pid()), 327 }) 328 default: 329 s.send(&eventstypes.TaskExecStarted{ 330 ContainerID: container.ID, 331 ExecID: r.ExecID, 332 Pid: uint32(p.Pid()), 333 }) 334 } 335 s.eventSendMu.Unlock() 336 return &taskAPI.StartResponse{ 337 Pid: uint32(p.Pid()), 338 }, nil 339} 340 341// Delete the initial process and container 342func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { 343 container, err := s.getContainer(r.ID) 344 if err != nil { 345 return nil, err 346 } 347 p, err := container.Delete(ctx, r) 348 if err != nil { 349 return nil, errdefs.ToGRPC(err) 350 } 351 // if we deleted our init task, close the platform and send the task delete event 352 if r.ExecID == "" { 353 s.mu.Lock() 354 delete(s.containers, r.ID) 355 hasContainers := len(s.containers) > 0 356 s.mu.Unlock() 357 if s.platform != nil && !hasContainers { 358 s.platform.Close() 359 } 360 s.send(&eventstypes.TaskDelete{ 361 ContainerID: container.ID, 362 Pid: uint32(p.Pid()), 363 ExitStatus: uint32(p.ExitStatus()), 364 ExitedAt: p.ExitedAt(), 365 }) 366 } 367 return &taskAPI.DeleteResponse{ 368 ExitStatus: uint32(p.ExitStatus()), 369 ExitedAt: p.ExitedAt(), 370 Pid: uint32(p.Pid()), 371 }, nil 372} 373 374// Exec an additional process inside the container 375func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { 376 container, err := s.getContainer(r.ID) 377 if err != nil { 378 return nil, err 379 } 380 ok, cancel := container.ReserveProcess(r.ExecID) 381 if !ok { 382 return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) 383 } 384 process, err := container.Exec(ctx, r) 385 if err != nil { 386 cancel() 387 return nil, errdefs.ToGRPC(err) 388 } 389 390 s.send(&eventstypes.TaskExecAdded{ 391 ContainerID: container.ID, 392 ExecID: process.ID(), 393 }) 394 return empty, nil 395} 396 397// ResizePty of a process 398func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { 399 container, err := s.getContainer(r.ID) 400 if err != nil { 401 return nil, err 402 } 403 if err := container.ResizePty(ctx, r); err != nil { 404 return nil, errdefs.ToGRPC(err) 405 } 406 return empty, nil 407} 408 409// State returns runtime state information for a process 410func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { 411 container, err := s.getContainer(r.ID) 412 if err != nil { 413 return nil, err 414 } 415 p, err := container.Process(r.ExecID) 416 if err != nil { 417 return nil, err 418 } 419 st, err := p.Status(ctx) 420 if err != nil { 421 return nil, err 422 } 423 status := task.StatusUnknown 424 switch st { 425 case "created": 426 status = task.StatusCreated 427 case "running": 428 status = task.StatusRunning 429 case "stopped": 430 status = task.StatusStopped 431 case "paused": 432 status = task.StatusPaused 433 case "pausing": 434 status = task.StatusPausing 435 } 436 sio := p.Stdio() 437 return &taskAPI.StateResponse{ 438 ID: p.ID(), 439 Bundle: container.Bundle, 440 Pid: uint32(p.Pid()), 441 Status: status, 442 Stdin: sio.Stdin, 443 Stdout: sio.Stdout, 444 Stderr: sio.Stderr, 445 Terminal: sio.Terminal, 446 ExitStatus: uint32(p.ExitStatus()), 447 ExitedAt: p.ExitedAt(), 448 }, nil 449} 450 451// Pause the container 452func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { 453 container, err := s.getContainer(r.ID) 454 if err != nil { 455 return nil, err 456 } 457 if err := container.Pause(ctx); err != nil { 458 return nil, errdefs.ToGRPC(err) 459 } 460 s.send(&eventstypes.TaskPaused{ 461 ContainerID: container.ID, 462 }) 463 return empty, nil 464} 465 466// Resume the container 467func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { 468 container, err := s.getContainer(r.ID) 469 if err != nil { 470 return nil, err 471 } 472 if err := container.Resume(ctx); err != nil { 473 return nil, errdefs.ToGRPC(err) 474 } 475 s.send(&eventstypes.TaskResumed{ 476 ContainerID: container.ID, 477 }) 478 return empty, nil 479} 480 481// Kill a process with the provided signal 482func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { 483 container, err := s.getContainer(r.ID) 484 if err != nil { 485 return nil, err 486 } 487 if err := container.Kill(ctx, r); err != nil { 488 return nil, errdefs.ToGRPC(err) 489 } 490 return empty, nil 491} 492 493// Pids returns all pids inside the container 494func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { 495 container, err := s.getContainer(r.ID) 496 if err != nil { 497 return nil, err 498 } 499 pids, err := s.getContainerPids(ctx, r.ID) 500 if err != nil { 501 return nil, errdefs.ToGRPC(err) 502 } 503 var processes []*task.ProcessInfo 504 for _, pid := range pids { 505 pInfo := task.ProcessInfo{ 506 Pid: pid, 507 } 508 for _, p := range container.ExecdProcesses() { 509 if p.Pid() == int(pid) { 510 d := &options.ProcessDetails{ 511 ExecID: p.ID(), 512 } 513 a, err := typeurl.MarshalAny(d) 514 if err != nil { 515 return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) 516 } 517 pInfo.Info = a 518 break 519 } 520 } 521 processes = append(processes, &pInfo) 522 } 523 return &taskAPI.PidsResponse{ 524 Processes: processes, 525 }, nil 526} 527 528// CloseIO of a process 529func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { 530 container, err := s.getContainer(r.ID) 531 if err != nil { 532 return nil, err 533 } 534 if err := container.CloseIO(ctx, r); err != nil { 535 return nil, err 536 } 537 return empty, nil 538} 539 540// Checkpoint the container 541func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { 542 container, err := s.getContainer(r.ID) 543 if err != nil { 544 return nil, err 545 } 546 if err := container.Checkpoint(ctx, r); err != nil { 547 return nil, errdefs.ToGRPC(err) 548 } 549 return empty, nil 550} 551 552// Update a running container 553func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { 554 container, err := s.getContainer(r.ID) 555 if err != nil { 556 return nil, err 557 } 558 if err := container.Update(ctx, r); err != nil { 559 return nil, errdefs.ToGRPC(err) 560 } 561 return empty, nil 562} 563 564// Wait for a process to exit 565func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { 566 container, err := s.getContainer(r.ID) 567 if err != nil { 568 return nil, err 569 } 570 p, err := container.Process(r.ExecID) 571 if err != nil { 572 return nil, errdefs.ToGRPC(err) 573 } 574 p.Wait() 575 576 return &taskAPI.WaitResponse{ 577 ExitStatus: uint32(p.ExitStatus()), 578 ExitedAt: p.ExitedAt(), 579 }, nil 580} 581 582// Connect returns shim information such as the shim's pid 583func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { 584 var pid int 585 if container, err := s.getContainer(r.ID); err == nil { 586 pid = container.Pid() 587 } 588 return &taskAPI.ConnectResponse{ 589 ShimPid: uint32(os.Getpid()), 590 TaskPid: uint32(pid), 591 }, nil 592} 593 594func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { 595 s.mu.Lock() 596 // return out if the shim is still servicing containers 597 if len(s.containers) > 0 { 598 s.mu.Unlock() 599 return empty, nil 600 } 601 s.cancel() 602 close(s.events) 603 return empty, nil 604} 605 606func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { 607 container, err := s.getContainer(r.ID) 608 if err != nil { 609 return nil, err 610 } 611 cg := container.Cgroup() 612 if cg == nil { 613 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") 614 } 615 stats, err := cg.Stat(cgroups.IgnoreNotExist) 616 if err != nil { 617 return nil, err 618 } 619 data, err := typeurl.MarshalAny(stats) 620 if err != nil { 621 return nil, err 622 } 623 return &taskAPI.StatsResponse{ 624 Stats: data, 625 }, nil 626} 627 628func (s *service) processExits() { 629 for e := range s.ec { 630 s.checkProcesses(e) 631 } 632} 633 634func (s *service) send(evt interface{}) { 635 s.events <- evt 636} 637 638func (s *service) sendL(evt interface{}) { 639 s.eventSendMu.Lock() 640 s.events <- evt 641 s.eventSendMu.Unlock() 642} 643 644func (s *service) checkProcesses(e runcC.Exit) { 645 s.mu.Lock() 646 defer s.mu.Unlock() 647 648 for _, container := range s.containers { 649 if !container.HasPid(e.Pid) { 650 continue 651 } 652 653 for _, p := range container.All() { 654 if p.Pid() != e.Pid { 655 continue 656 } 657 658 if ip, ok := p.(*process.Init); ok { 659 shouldKillAll, err := shouldKillAllOnExit(container.Bundle) 660 if err != nil { 661 log.G(s.context).WithError(err).Error("failed to check shouldKillAll") 662 } 663 664 // Ensure all children are killed 665 if shouldKillAll { 666 if err := ip.KillAll(s.context); err != nil { 667 logrus.WithError(err).WithField("id", ip.ID()). 668 Error("failed to kill init's children") 669 } 670 } 671 } 672 673 p.SetExited(e.Status) 674 s.sendL(&eventstypes.TaskExit{ 675 ContainerID: container.ID, 676 ID: p.ID(), 677 Pid: uint32(e.Pid), 678 ExitStatus: uint32(e.Status), 679 ExitedAt: p.ExitedAt(), 680 }) 681 return 682 } 683 return 684 } 685} 686 687func shouldKillAllOnExit(bundlePath string) (bool, error) { 688 var bundleSpec specs.Spec 689 bundleConfigContents, err := ioutil.ReadFile(filepath.Join(bundlePath, "config.json")) 690 if err != nil { 691 return false, err 692 } 693 json.Unmarshal(bundleConfigContents, &bundleSpec) 694 695 if bundleSpec.Linux != nil { 696 for _, ns := range bundleSpec.Linux.Namespaces { 697 if ns.Type == specs.PIDNamespace && ns.Path == "" { 698 return false, nil 699 } 700 } 701 } 702 703 return true, nil 704} 705 706func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { 707 container, err := s.getContainer(id) 708 if err != nil { 709 return nil, err 710 } 711 p, err := container.Process("") 712 if err != nil { 713 return nil, errdefs.ToGRPC(err) 714 } 715 ps, err := p.(*process.Init).Runtime().Ps(ctx, id) 716 if err != nil { 717 return nil, err 718 } 719 pids := make([]uint32, 0, len(ps)) 720 for _, pid := range ps { 721 pids = append(pids, uint32(pid)) 722 } 723 return pids, nil 724} 725 726func (s *service) forward(ctx context.Context, publisher shim.Publisher) { 727 ns, _ := namespaces.Namespace(ctx) 728 ctx = namespaces.WithNamespace(context.Background(), ns) 729 for e := range s.events { 730 ctx, cancel := context.WithTimeout(ctx, 5*time.Second) 731 err := publisher.Publish(ctx, runc.GetTopic(e), e) 732 cancel() 733 if err != nil { 734 logrus.WithError(err).Error("post event") 735 } 736 } 737 publisher.Close() 738} 739 740func (s *service) getContainer(id string) (*runc.Container, error) { 741 s.mu.Lock() 742 container := s.containers[id] 743 s.mu.Unlock() 744 if container == nil { 745 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") 746 } 747 return container, nil 748} 749 750// initialize a single epoll fd to manage our consoles. `initPlatform` should 751// only be called once. 752func (s *service) initPlatform() error { 753 if s.platform != nil { 754 return nil 755 } 756 p, err := runc.NewPlatform() 757 if err != nil { 758 return err 759 } 760 s.platform = p 761 return nil 762} 763