1// +build linux 2 3/* 4 Copyright The containerd Authors. 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17*/ 18 19package v2 20 21import ( 22 "context" 23 "encoding/json" 24 "io/ioutil" 25 "os" 26 "os/exec" 27 "path/filepath" 28 "sync" 29 "syscall" 30 "time" 31 32 "github.com/containerd/cgroups" 33 cgroupsv2 "github.com/containerd/cgroups/v2" 34 eventstypes "github.com/containerd/containerd/api/events" 35 "github.com/containerd/containerd/api/types/task" 36 "github.com/containerd/containerd/errdefs" 37 "github.com/containerd/containerd/mount" 38 "github.com/containerd/containerd/namespaces" 39 "github.com/containerd/containerd/pkg/oom" 40 oomv1 "github.com/containerd/containerd/pkg/oom/v1" 41 oomv2 "github.com/containerd/containerd/pkg/oom/v2" 42 "github.com/containerd/containerd/pkg/process" 43 "github.com/containerd/containerd/pkg/stdio" 44 "github.com/containerd/containerd/pkg/userns" 45 "github.com/containerd/containerd/runtime/v2/runc" 46 "github.com/containerd/containerd/runtime/v2/runc/options" 47 "github.com/containerd/containerd/runtime/v2/shim" 48 taskAPI "github.com/containerd/containerd/runtime/v2/task" 49 "github.com/containerd/containerd/sys/reaper" 50 runcC "github.com/containerd/go-runc" 51 "github.com/containerd/typeurl" 52 "github.com/gogo/protobuf/proto" 53 ptypes "github.com/gogo/protobuf/types" 54 "github.com/pkg/errors" 55 "github.com/sirupsen/logrus" 56 "golang.org/x/sys/unix" 57) 58 59var ( 60 _ = (taskAPI.TaskService)(&service{}) 61 empty = &ptypes.Empty{} 62) 63 64// group labels specifies how the shim groups services. 65// currently supports a runc.v2 specific .group label and the 66// standard k8s pod label. Order matters in this list 67var groupLabels = []string{ 68 "io.containerd.runc.v2.group", 69 "io.kubernetes.cri.sandbox-id", 70} 71 72type spec struct { 73 Annotations map[string]string `json:"annotations,omitempty"` 74} 75 76// New returns a new shim service that can be used via GRPC 77func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { 78 var ( 79 ep oom.Watcher 80 err error 81 ) 82 if cgroups.Mode() == cgroups.Unified { 83 ep, err = oomv2.New(publisher) 84 } else { 85 ep, err = oomv1.New(publisher) 86 } 87 if err != nil { 88 return nil, err 89 } 90 go ep.Run(ctx) 91 s := &service{ 92 id: id, 93 context: ctx, 94 events: make(chan interface{}, 128), 95 ec: reaper.Default.Subscribe(), 96 ep: ep, 97 cancel: shutdown, 98 containers: make(map[string]*runc.Container), 99 } 100 go s.processExits() 101 runcC.Monitor = reaper.Default 102 if err := s.initPlatform(); err != nil { 103 shutdown() 104 return nil, errors.Wrap(err, "failed to initialized platform behavior") 105 } 106 go s.forward(ctx, publisher) 107 108 if address, err := shim.ReadAddress("address"); err == nil { 109 s.shimAddress = address 110 } 111 return s, nil 112} 113 114// service is the shim implementation of a remote shim over GRPC 115type service struct { 116 mu sync.Mutex 117 eventSendMu sync.Mutex 118 119 context context.Context 120 events chan interface{} 121 platform stdio.Platform 122 ec chan runcC.Exit 123 ep oom.Watcher 124 125 // id only used in cleanup case 126 id string 127 128 containers map[string]*runc.Container 129 130 shimAddress string 131 cancel func() 132} 133 134func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { 135 ns, err := namespaces.NamespaceRequired(ctx) 136 if err != nil { 137 return nil, err 138 } 139 self, err := os.Executable() 140 if err != nil { 141 return nil, err 142 } 143 cwd, err := os.Getwd() 144 if err != nil { 145 return nil, err 146 } 147 args := []string{ 148 "-namespace", ns, 149 "-id", id, 150 "-address", containerdAddress, 151 } 152 cmd := exec.Command(self, args...) 153 cmd.Dir = cwd 154 cmd.Env = append(os.Environ(), "GOMAXPROCS=4") 155 cmd.SysProcAttr = &syscall.SysProcAttr{ 156 Setpgid: true, 157 } 158 return cmd, nil 159} 160 161func readSpec() (*spec, error) { 162 f, err := os.Open("config.json") 163 if err != nil { 164 return nil, err 165 } 166 defer f.Close() 167 var s spec 168 if err := json.NewDecoder(f).Decode(&s); err != nil { 169 return nil, err 170 } 171 return &s, nil 172} 173 174func (s *service) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) { 175 cmd, err := newCommand(ctx, opts.ID, opts.ContainerdBinary, opts.Address, opts.TTRPCAddress) 176 if err != nil { 177 return "", err 178 } 179 grouping := opts.ID 180 spec, err := readSpec() 181 if err != nil { 182 return "", err 183 } 184 for _, group := range groupLabels { 185 if groupID, ok := spec.Annotations[group]; ok { 186 grouping = groupID 187 break 188 } 189 } 190 address, err := shim.SocketAddress(ctx, opts.Address, grouping) 191 if err != nil { 192 return "", err 193 } 194 195 socket, err := shim.NewSocket(address) 196 if err != nil { 197 // the only time where this would happen is if there is a bug and the socket 198 // was not cleaned up in the cleanup method of the shim or we are using the 199 // grouping functionality where the new process should be run with the same 200 // shim as an existing container 201 if !shim.SocketEaddrinuse(err) { 202 return "", errors.Wrap(err, "create new shim socket") 203 } 204 if shim.CanConnect(address) { 205 if err := shim.WriteAddress("address", address); err != nil { 206 return "", errors.Wrap(err, "write existing socket for shim") 207 } 208 return address, nil 209 } 210 if err := shim.RemoveSocket(address); err != nil { 211 return "", errors.Wrap(err, "remove pre-existing socket") 212 } 213 if socket, err = shim.NewSocket(address); err != nil { 214 return "", errors.Wrap(err, "try create new shim socket 2x") 215 } 216 } 217 defer func() { 218 if retErr != nil { 219 socket.Close() 220 _ = shim.RemoveSocket(address) 221 } 222 }() 223 224 // make sure that reexec shim-v2 binary use the value if need 225 if err := shim.WriteAddress("address", address); err != nil { 226 return "", err 227 } 228 229 f, err := socket.File() 230 if err != nil { 231 return "", err 232 } 233 234 cmd.ExtraFiles = append(cmd.ExtraFiles, f) 235 236 if err := cmd.Start(); err != nil { 237 f.Close() 238 return "", err 239 } 240 defer func() { 241 if retErr != nil { 242 cmd.Process.Kill() 243 } 244 }() 245 // make sure to wait after start 246 go cmd.Wait() 247 if data, err := ioutil.ReadAll(os.Stdin); err == nil { 248 if len(data) > 0 { 249 var any ptypes.Any 250 if err := proto.Unmarshal(data, &any); err != nil { 251 return "", err 252 } 253 v, err := typeurl.UnmarshalAny(&any) 254 if err != nil { 255 return "", err 256 } 257 if opts, ok := v.(*options.Options); ok { 258 if opts.ShimCgroup != "" { 259 if cgroups.Mode() == cgroups.Unified { 260 if err := cgroupsv2.VerifyGroupPath(opts.ShimCgroup); err != nil { 261 return "", errors.Wrapf(err, "failed to verify cgroup path %q", opts.ShimCgroup) 262 } 263 cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) 264 if err != nil { 265 return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) 266 } 267 if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { 268 return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) 269 } 270 } else { 271 cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) 272 if err != nil { 273 return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) 274 } 275 if err := cg.Add(cgroups.Process{ 276 Pid: cmd.Process.Pid, 277 }); err != nil { 278 return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) 279 } 280 } 281 } 282 } 283 } 284 } 285 if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { 286 return "", errors.Wrap(err, "failed to adjust OOM score for shim") 287 } 288 return address, nil 289} 290 291func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { 292 cwd, err := os.Getwd() 293 if err != nil { 294 return nil, err 295 } 296 297 path := filepath.Join(filepath.Dir(cwd), s.id) 298 ns, err := namespaces.NamespaceRequired(ctx) 299 if err != nil { 300 return nil, err 301 } 302 runtime, err := runc.ReadRuntime(path) 303 if err != nil { 304 return nil, err 305 } 306 opts, err := runc.ReadOptions(path) 307 if err != nil { 308 return nil, err 309 } 310 root := process.RuncRoot 311 if opts != nil && opts.Root != "" { 312 root = opts.Root 313 } 314 315 r := process.NewRunc(root, path, ns, runtime, "", false) 316 if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ 317 Force: true, 318 }); err != nil { 319 logrus.WithError(err).Warn("failed to remove runc container") 320 } 321 if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { 322 logrus.WithError(err).Warn("failed to cleanup rootfs mount") 323 } 324 return &taskAPI.DeleteResponse{ 325 ExitedAt: time.Now(), 326 ExitStatus: 128 + uint32(unix.SIGKILL), 327 }, nil 328} 329 330// Create a new initial process and container with the underlying OCI runtime 331func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { 332 s.mu.Lock() 333 defer s.mu.Unlock() 334 335 container, err := runc.NewContainer(ctx, s.platform, r) 336 if err != nil { 337 return nil, err 338 } 339 340 s.containers[r.ID] = container 341 342 s.send(&eventstypes.TaskCreate{ 343 ContainerID: r.ID, 344 Bundle: r.Bundle, 345 Rootfs: r.Rootfs, 346 IO: &eventstypes.TaskIO{ 347 Stdin: r.Stdin, 348 Stdout: r.Stdout, 349 Stderr: r.Stderr, 350 Terminal: r.Terminal, 351 }, 352 Checkpoint: r.Checkpoint, 353 Pid: uint32(container.Pid()), 354 }) 355 356 return &taskAPI.CreateTaskResponse{ 357 Pid: uint32(container.Pid()), 358 }, nil 359} 360 361// Start a process 362func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { 363 container, err := s.getContainer(r.ID) 364 if err != nil { 365 return nil, err 366 } 367 368 // hold the send lock so that the start events are sent before any exit events in the error case 369 s.eventSendMu.Lock() 370 p, err := container.Start(ctx, r) 371 if err != nil { 372 s.eventSendMu.Unlock() 373 return nil, errdefs.ToGRPC(err) 374 } 375 376 switch r.ExecID { 377 case "": 378 switch cg := container.Cgroup().(type) { 379 case cgroups.Cgroup: 380 if err := s.ep.Add(container.ID, cg); err != nil { 381 logrus.WithError(err).Error("add cg to OOM monitor") 382 } 383 case *cgroupsv2.Manager: 384 allControllers, err := cg.RootControllers() 385 if err != nil { 386 logrus.WithError(err).Error("failed to get root controllers") 387 } else { 388 if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { 389 if userns.RunningInUserNS() { 390 logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) 391 } else { 392 logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) 393 } 394 } 395 } 396 if err := s.ep.Add(container.ID, cg); err != nil { 397 logrus.WithError(err).Error("add cg to OOM monitor") 398 } 399 } 400 401 s.send(&eventstypes.TaskStart{ 402 ContainerID: container.ID, 403 Pid: uint32(p.Pid()), 404 }) 405 default: 406 s.send(&eventstypes.TaskExecStarted{ 407 ContainerID: container.ID, 408 ExecID: r.ExecID, 409 Pid: uint32(p.Pid()), 410 }) 411 } 412 s.eventSendMu.Unlock() 413 return &taskAPI.StartResponse{ 414 Pid: uint32(p.Pid()), 415 }, nil 416} 417 418// Delete the initial process and container 419func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { 420 container, err := s.getContainer(r.ID) 421 if err != nil { 422 return nil, err 423 } 424 p, err := container.Delete(ctx, r) 425 if err != nil { 426 return nil, errdefs.ToGRPC(err) 427 } 428 // if we deleted an init task, send the task delete event 429 if r.ExecID == "" { 430 s.mu.Lock() 431 delete(s.containers, r.ID) 432 s.mu.Unlock() 433 s.send(&eventstypes.TaskDelete{ 434 ContainerID: container.ID, 435 Pid: uint32(p.Pid()), 436 ExitStatus: uint32(p.ExitStatus()), 437 ExitedAt: p.ExitedAt(), 438 }) 439 } 440 return &taskAPI.DeleteResponse{ 441 ExitStatus: uint32(p.ExitStatus()), 442 ExitedAt: p.ExitedAt(), 443 Pid: uint32(p.Pid()), 444 }, nil 445} 446 447// Exec an additional process inside the container 448func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { 449 container, err := s.getContainer(r.ID) 450 if err != nil { 451 return nil, err 452 } 453 ok, cancel := container.ReserveProcess(r.ExecID) 454 if !ok { 455 return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) 456 } 457 process, err := container.Exec(ctx, r) 458 if err != nil { 459 cancel() 460 return nil, errdefs.ToGRPC(err) 461 } 462 463 s.send(&eventstypes.TaskExecAdded{ 464 ContainerID: container.ID, 465 ExecID: process.ID(), 466 }) 467 return empty, nil 468} 469 470// ResizePty of a process 471func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { 472 container, err := s.getContainer(r.ID) 473 if err != nil { 474 return nil, err 475 } 476 if err := container.ResizePty(ctx, r); err != nil { 477 return nil, errdefs.ToGRPC(err) 478 } 479 return empty, nil 480} 481 482// State returns runtime state information for a process 483func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { 484 container, err := s.getContainer(r.ID) 485 if err != nil { 486 return nil, err 487 } 488 p, err := container.Process(r.ExecID) 489 if err != nil { 490 return nil, errdefs.ToGRPC(err) 491 } 492 st, err := p.Status(ctx) 493 if err != nil { 494 return nil, err 495 } 496 status := task.StatusUnknown 497 switch st { 498 case "created": 499 status = task.StatusCreated 500 case "running": 501 status = task.StatusRunning 502 case "stopped": 503 status = task.StatusStopped 504 case "paused": 505 status = task.StatusPaused 506 case "pausing": 507 status = task.StatusPausing 508 } 509 sio := p.Stdio() 510 return &taskAPI.StateResponse{ 511 ID: p.ID(), 512 Bundle: container.Bundle, 513 Pid: uint32(p.Pid()), 514 Status: status, 515 Stdin: sio.Stdin, 516 Stdout: sio.Stdout, 517 Stderr: sio.Stderr, 518 Terminal: sio.Terminal, 519 ExitStatus: uint32(p.ExitStatus()), 520 ExitedAt: p.ExitedAt(), 521 }, nil 522} 523 524// Pause the container 525func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { 526 container, err := s.getContainer(r.ID) 527 if err != nil { 528 return nil, err 529 } 530 if err := container.Pause(ctx); err != nil { 531 return nil, errdefs.ToGRPC(err) 532 } 533 s.send(&eventstypes.TaskPaused{ 534 ContainerID: container.ID, 535 }) 536 return empty, nil 537} 538 539// Resume the container 540func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { 541 container, err := s.getContainer(r.ID) 542 if err != nil { 543 return nil, err 544 } 545 if err := container.Resume(ctx); err != nil { 546 return nil, errdefs.ToGRPC(err) 547 } 548 s.send(&eventstypes.TaskResumed{ 549 ContainerID: container.ID, 550 }) 551 return empty, nil 552} 553 554// Kill a process with the provided signal 555func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { 556 container, err := s.getContainer(r.ID) 557 if err != nil { 558 return nil, err 559 } 560 if err := container.Kill(ctx, r); err != nil { 561 return nil, errdefs.ToGRPC(err) 562 } 563 return empty, nil 564} 565 566// Pids returns all pids inside the container 567func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { 568 container, err := s.getContainer(r.ID) 569 if err != nil { 570 return nil, err 571 } 572 pids, err := s.getContainerPids(ctx, r.ID) 573 if err != nil { 574 return nil, errdefs.ToGRPC(err) 575 } 576 var processes []*task.ProcessInfo 577 for _, pid := range pids { 578 pInfo := task.ProcessInfo{ 579 Pid: pid, 580 } 581 for _, p := range container.ExecdProcesses() { 582 if p.Pid() == int(pid) { 583 d := &options.ProcessDetails{ 584 ExecID: p.ID(), 585 } 586 a, err := typeurl.MarshalAny(d) 587 if err != nil { 588 return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) 589 } 590 pInfo.Info = a 591 break 592 } 593 } 594 processes = append(processes, &pInfo) 595 } 596 return &taskAPI.PidsResponse{ 597 Processes: processes, 598 }, nil 599} 600 601// CloseIO of a process 602func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { 603 container, err := s.getContainer(r.ID) 604 if err != nil { 605 return nil, err 606 } 607 if err := container.CloseIO(ctx, r); err != nil { 608 return nil, err 609 } 610 return empty, nil 611} 612 613// Checkpoint the container 614func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { 615 container, err := s.getContainer(r.ID) 616 if err != nil { 617 return nil, err 618 } 619 if err := container.Checkpoint(ctx, r); err != nil { 620 return nil, errdefs.ToGRPC(err) 621 } 622 return empty, nil 623} 624 625// Update a running container 626func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { 627 container, err := s.getContainer(r.ID) 628 if err != nil { 629 return nil, err 630 } 631 if err := container.Update(ctx, r); err != nil { 632 return nil, errdefs.ToGRPC(err) 633 } 634 return empty, nil 635} 636 637// Wait for a process to exit 638func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { 639 container, err := s.getContainer(r.ID) 640 if err != nil { 641 return nil, err 642 } 643 p, err := container.Process(r.ExecID) 644 if err != nil { 645 return nil, errdefs.ToGRPC(err) 646 } 647 p.Wait() 648 649 return &taskAPI.WaitResponse{ 650 ExitStatus: uint32(p.ExitStatus()), 651 ExitedAt: p.ExitedAt(), 652 }, nil 653} 654 655// Connect returns shim information such as the shim's pid 656func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { 657 var pid int 658 if container, err := s.getContainer(r.ID); err == nil { 659 pid = container.Pid() 660 } 661 return &taskAPI.ConnectResponse{ 662 ShimPid: uint32(os.Getpid()), 663 TaskPid: uint32(pid), 664 }, nil 665} 666 667func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { 668 s.mu.Lock() 669 defer s.mu.Unlock() 670 671 // return out if the shim is still servicing containers 672 if len(s.containers) > 0 { 673 return empty, nil 674 } 675 676 if s.platform != nil { 677 s.platform.Close() 678 } 679 680 if s.shimAddress != "" { 681 _ = shim.RemoveSocket(s.shimAddress) 682 } 683 684 // please make sure that temporary resource has been cleanup 685 // before shutdown service. 686 s.cancel() 687 close(s.events) 688 return empty, nil 689} 690 691func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { 692 container, err := s.getContainer(r.ID) 693 if err != nil { 694 return nil, err 695 } 696 cgx := container.Cgroup() 697 if cgx == nil { 698 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") 699 } 700 var statsx interface{} 701 switch cg := cgx.(type) { 702 case cgroups.Cgroup: 703 stats, err := cg.Stat(cgroups.IgnoreNotExist) 704 if err != nil { 705 return nil, err 706 } 707 statsx = stats 708 case *cgroupsv2.Manager: 709 stats, err := cg.Stat() 710 if err != nil { 711 return nil, err 712 } 713 statsx = stats 714 default: 715 return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) 716 } 717 data, err := typeurl.MarshalAny(statsx) 718 if err != nil { 719 return nil, err 720 } 721 return &taskAPI.StatsResponse{ 722 Stats: data, 723 }, nil 724} 725 726func (s *service) processExits() { 727 for e := range s.ec { 728 s.checkProcesses(e) 729 } 730} 731 732func (s *service) send(evt interface{}) { 733 s.events <- evt 734} 735 736func (s *service) sendL(evt interface{}) { 737 s.eventSendMu.Lock() 738 s.events <- evt 739 s.eventSendMu.Unlock() 740} 741 742func (s *service) checkProcesses(e runcC.Exit) { 743 s.mu.Lock() 744 defer s.mu.Unlock() 745 746 for _, container := range s.containers { 747 if !container.HasPid(e.Pid) { 748 continue 749 } 750 751 for _, p := range container.All() { 752 if p.Pid() != e.Pid { 753 continue 754 } 755 756 if ip, ok := p.(*process.Init); ok { 757 // Ensure all children are killed 758 if runc.ShouldKillAllOnExit(s.context, container.Bundle) { 759 if err := ip.KillAll(s.context); err != nil { 760 logrus.WithError(err).WithField("id", ip.ID()). 761 Error("failed to kill init's children") 762 } 763 } 764 } 765 766 p.SetExited(e.Status) 767 s.sendL(&eventstypes.TaskExit{ 768 ContainerID: container.ID, 769 ID: p.ID(), 770 Pid: uint32(e.Pid), 771 ExitStatus: uint32(e.Status), 772 ExitedAt: p.ExitedAt(), 773 }) 774 return 775 } 776 return 777 } 778} 779 780func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { 781 container, err := s.getContainer(id) 782 if err != nil { 783 return nil, err 784 } 785 p, err := container.Process("") 786 if err != nil { 787 return nil, errdefs.ToGRPC(err) 788 } 789 ps, err := p.(*process.Init).Runtime().Ps(ctx, id) 790 if err != nil { 791 return nil, err 792 } 793 pids := make([]uint32, 0, len(ps)) 794 for _, pid := range ps { 795 pids = append(pids, uint32(pid)) 796 } 797 return pids, nil 798} 799 800func (s *service) forward(ctx context.Context, publisher shim.Publisher) { 801 ns, _ := namespaces.Namespace(ctx) 802 ctx = namespaces.WithNamespace(context.Background(), ns) 803 for e := range s.events { 804 err := publisher.Publish(ctx, runc.GetTopic(e), e) 805 if err != nil { 806 logrus.WithError(err).Error("post event") 807 } 808 } 809 publisher.Close() 810} 811 812func (s *service) getContainer(id string) (*runc.Container, error) { 813 s.mu.Lock() 814 container := s.containers[id] 815 s.mu.Unlock() 816 if container == nil { 817 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") 818 } 819 return container, nil 820} 821 822// initialize a single epoll fd to manage our consoles. `initPlatform` should 823// only be called once. 824func (s *service) initPlatform() error { 825 if s.platform != nil { 826 return nil 827 } 828 p, err := runc.NewPlatform() 829 if err != nil { 830 return err 831 } 832 s.platform = p 833 return nil 834} 835