1// +build linux 2 3/* 4 Copyright The containerd Authors. 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17*/ 18 19package v2 20 21import ( 22 "context" 23 "encoding/json" 24 "io/ioutil" 25 "os" 26 "os/exec" 27 "path/filepath" 28 "sync" 29 "syscall" 30 "time" 31 32 "github.com/containerd/cgroups" 33 cgroupsv2 "github.com/containerd/cgroups/v2" 34 eventstypes "github.com/containerd/containerd/api/events" 35 "github.com/containerd/containerd/api/types/task" 36 "github.com/containerd/containerd/errdefs" 37 "github.com/containerd/containerd/mount" 38 "github.com/containerd/containerd/namespaces" 39 "github.com/containerd/containerd/pkg/oom" 40 oomv1 "github.com/containerd/containerd/pkg/oom/v1" 41 oomv2 "github.com/containerd/containerd/pkg/oom/v2" 42 "github.com/containerd/containerd/pkg/process" 43 "github.com/containerd/containerd/pkg/stdio" 44 "github.com/containerd/containerd/runtime/v2/runc" 45 "github.com/containerd/containerd/runtime/v2/runc/options" 46 "github.com/containerd/containerd/runtime/v2/shim" 47 taskAPI "github.com/containerd/containerd/runtime/v2/task" 48 "github.com/containerd/containerd/sys" 49 "github.com/containerd/containerd/sys/reaper" 50 runcC "github.com/containerd/go-runc" 51 "github.com/containerd/typeurl" 52 "github.com/gogo/protobuf/proto" 53 ptypes "github.com/gogo/protobuf/types" 54 "github.com/pkg/errors" 55 "github.com/sirupsen/logrus" 56 "golang.org/x/sys/unix" 57) 58 59var ( 60 _ = (taskAPI.TaskService)(&service{}) 61 empty = &ptypes.Empty{} 62) 63 64// group labels specifies how the shim groups services. 65// currently supports a runc.v2 specific .group label and the 66// standard k8s pod label. Order matters in this list 67var groupLabels = []string{ 68 "io.containerd.runc.v2.group", 69 "io.kubernetes.cri.sandbox-id", 70} 71 72type spec struct { 73 Annotations map[string]string `json:"annotations,omitempty"` 74} 75 76// New returns a new shim service that can be used via GRPC 77func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { 78 var ( 79 ep oom.Watcher 80 err error 81 ) 82 if cgroups.Mode() == cgroups.Unified { 83 ep, err = oomv2.New(publisher) 84 } else { 85 ep, err = oomv1.New(publisher) 86 } 87 if err != nil { 88 return nil, err 89 } 90 go ep.Run(ctx) 91 s := &service{ 92 id: id, 93 context: ctx, 94 events: make(chan interface{}, 128), 95 ec: reaper.Default.Subscribe(), 96 ep: ep, 97 cancel: shutdown, 98 containers: make(map[string]*runc.Container), 99 } 100 go s.processExits() 101 runcC.Monitor = reaper.Default 102 if err := s.initPlatform(); err != nil { 103 shutdown() 104 return nil, errors.Wrap(err, "failed to initialized platform behavior") 105 } 106 go s.forward(ctx, publisher) 107 108 if address, err := shim.ReadAddress("address"); err == nil { 109 s.shimAddress = address 110 } 111 return s, nil 112} 113 114// service is the shim implementation of a remote shim over GRPC 115type service struct { 116 mu sync.Mutex 117 eventSendMu sync.Mutex 118 119 context context.Context 120 events chan interface{} 121 platform stdio.Platform 122 ec chan runcC.Exit 123 ep oom.Watcher 124 125 // id only used in cleanup case 126 id string 127 128 containers map[string]*runc.Container 129 130 shimAddress string 131 cancel func() 132} 133 134func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) { 135 ns, err := namespaces.NamespaceRequired(ctx) 136 if err != nil { 137 return nil, err 138 } 139 self, err := os.Executable() 140 if err != nil { 141 return nil, err 142 } 143 cwd, err := os.Getwd() 144 if err != nil { 145 return nil, err 146 } 147 args := []string{ 148 "-namespace", ns, 149 "-id", id, 150 "-address", containerdAddress, 151 } 152 cmd := exec.Command(self, args...) 153 cmd.Dir = cwd 154 cmd.Env = append(os.Environ(), "GOMAXPROCS=4") 155 cmd.SysProcAttr = &syscall.SysProcAttr{ 156 Setpgid: true, 157 } 158 return cmd, nil 159} 160 161func readSpec() (*spec, error) { 162 f, err := os.Open("config.json") 163 if err != nil { 164 return nil, err 165 } 166 defer f.Close() 167 var s spec 168 if err := json.NewDecoder(f).Decode(&s); err != nil { 169 return nil, err 170 } 171 return &s, nil 172} 173 174func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (_ string, retErr error) { 175 cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress) 176 if err != nil { 177 return "", err 178 } 179 grouping := id 180 spec, err := readSpec() 181 if err != nil { 182 return "", err 183 } 184 for _, group := range groupLabels { 185 if groupID, ok := spec.Annotations[group]; ok { 186 grouping = groupID 187 break 188 } 189 } 190 address, err := shim.SocketAddress(ctx, containerdAddress, grouping) 191 if err != nil { 192 return "", err 193 } 194 195 socket, err := shim.NewSocket(address) 196 if err != nil { 197 // the only time where this would happen is if there is a bug and the socket 198 // was not cleaned up in the cleanup method of the shim or we are using the 199 // grouping functionality where the new process should be run with the same 200 // shim as an existing container 201 if !shim.SocketEaddrinuse(err) { 202 return "", errors.Wrap(err, "create new shim socket") 203 } 204 if shim.CanConnect(address) { 205 if err := shim.WriteAddress("address", address); err != nil { 206 return "", errors.Wrap(err, "write existing socket for shim") 207 } 208 return address, nil 209 } 210 if err := shim.RemoveSocket(address); err != nil { 211 return "", errors.Wrap(err, "remove pre-existing socket") 212 } 213 if socket, err = shim.NewSocket(address); err != nil { 214 return "", errors.Wrap(err, "try create new shim socket 2x") 215 } 216 } 217 defer func() { 218 if retErr != nil { 219 socket.Close() 220 _ = shim.RemoveSocket(address) 221 } 222 }() 223 f, err := socket.File() 224 if err != nil { 225 return "", err 226 } 227 228 cmd.ExtraFiles = append(cmd.ExtraFiles, f) 229 230 if err := cmd.Start(); err != nil { 231 f.Close() 232 return "", err 233 } 234 defer func() { 235 if retErr != nil { 236 cmd.Process.Kill() 237 } 238 }() 239 // make sure to wait after start 240 go cmd.Wait() 241 if err := shim.WriteAddress("address", address); err != nil { 242 return "", err 243 } 244 if data, err := ioutil.ReadAll(os.Stdin); err == nil { 245 if len(data) > 0 { 246 var any ptypes.Any 247 if err := proto.Unmarshal(data, &any); err != nil { 248 return "", err 249 } 250 v, err := typeurl.UnmarshalAny(&any) 251 if err != nil { 252 return "", err 253 } 254 if opts, ok := v.(*options.Options); ok { 255 if opts.ShimCgroup != "" { 256 if cgroups.Mode() == cgroups.Unified { 257 if err := cgroupsv2.VerifyGroupPath(opts.ShimCgroup); err != nil { 258 return "", errors.Wrapf(err, "failed to verify cgroup path %q", opts.ShimCgroup) 259 } 260 cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup) 261 if err != nil { 262 return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) 263 } 264 if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { 265 return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) 266 } 267 } else { 268 cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup)) 269 if err != nil { 270 return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup) 271 } 272 if err := cg.Add(cgroups.Process{ 273 Pid: cmd.Process.Pid, 274 }); err != nil { 275 return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup) 276 } 277 } 278 } 279 } 280 } 281 } 282 if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { 283 return "", errors.Wrap(err, "failed to adjust OOM score for shim") 284 } 285 return address, nil 286} 287 288func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { 289 cwd, err := os.Getwd() 290 if err != nil { 291 return nil, err 292 } 293 path := filepath.Join(filepath.Dir(cwd), s.id) 294 ns, err := namespaces.NamespaceRequired(ctx) 295 if err != nil { 296 return nil, err 297 } 298 runtime, err := runc.ReadRuntime(path) 299 if err != nil { 300 return nil, err 301 } 302 opts, err := runc.ReadOptions(path) 303 if err != nil { 304 return nil, err 305 } 306 root := process.RuncRoot 307 if opts != nil && opts.Root != "" { 308 root = opts.Root 309 } 310 311 r := process.NewRunc(root, path, ns, runtime, "", false) 312 if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ 313 Force: true, 314 }); err != nil { 315 logrus.WithError(err).Warn("failed to remove runc container") 316 } 317 if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { 318 logrus.WithError(err).Warn("failed to cleanup rootfs mount") 319 } 320 return &taskAPI.DeleteResponse{ 321 ExitedAt: time.Now(), 322 ExitStatus: 128 + uint32(unix.SIGKILL), 323 }, nil 324} 325 326// Create a new initial process and container with the underlying OCI runtime 327func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { 328 s.mu.Lock() 329 defer s.mu.Unlock() 330 331 container, err := runc.NewContainer(ctx, s.platform, r) 332 if err != nil { 333 return nil, err 334 } 335 336 s.containers[r.ID] = container 337 338 s.send(&eventstypes.TaskCreate{ 339 ContainerID: r.ID, 340 Bundle: r.Bundle, 341 Rootfs: r.Rootfs, 342 IO: &eventstypes.TaskIO{ 343 Stdin: r.Stdin, 344 Stdout: r.Stdout, 345 Stderr: r.Stderr, 346 Terminal: r.Terminal, 347 }, 348 Checkpoint: r.Checkpoint, 349 Pid: uint32(container.Pid()), 350 }) 351 352 return &taskAPI.CreateTaskResponse{ 353 Pid: uint32(container.Pid()), 354 }, nil 355} 356 357// Start a process 358func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { 359 container, err := s.getContainer(r.ID) 360 if err != nil { 361 return nil, err 362 } 363 364 // hold the send lock so that the start events are sent before any exit events in the error case 365 s.eventSendMu.Lock() 366 p, err := container.Start(ctx, r) 367 if err != nil { 368 s.eventSendMu.Unlock() 369 return nil, errdefs.ToGRPC(err) 370 } 371 372 switch r.ExecID { 373 case "": 374 switch cg := container.Cgroup().(type) { 375 case cgroups.Cgroup: 376 if err := s.ep.Add(container.ID, cg); err != nil { 377 logrus.WithError(err).Error("add cg to OOM monitor") 378 } 379 case *cgroupsv2.Manager: 380 allControllers, err := cg.RootControllers() 381 if err != nil { 382 logrus.WithError(err).Error("failed to get root controllers") 383 } else { 384 if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil { 385 if sys.RunningInUserNS() { 386 logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers) 387 } else { 388 logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers) 389 } 390 } 391 } 392 if err := s.ep.Add(container.ID, cg); err != nil { 393 logrus.WithError(err).Error("add cg to OOM monitor") 394 } 395 } 396 397 s.send(&eventstypes.TaskStart{ 398 ContainerID: container.ID, 399 Pid: uint32(p.Pid()), 400 }) 401 default: 402 s.send(&eventstypes.TaskExecStarted{ 403 ContainerID: container.ID, 404 ExecID: r.ExecID, 405 Pid: uint32(p.Pid()), 406 }) 407 } 408 s.eventSendMu.Unlock() 409 return &taskAPI.StartResponse{ 410 Pid: uint32(p.Pid()), 411 }, nil 412} 413 414// Delete the initial process and container 415func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { 416 container, err := s.getContainer(r.ID) 417 if err != nil { 418 return nil, err 419 } 420 p, err := container.Delete(ctx, r) 421 if err != nil { 422 return nil, errdefs.ToGRPC(err) 423 } 424 // if we deleted an init task, send the task delete event 425 if r.ExecID == "" { 426 s.mu.Lock() 427 delete(s.containers, r.ID) 428 s.mu.Unlock() 429 s.send(&eventstypes.TaskDelete{ 430 ContainerID: container.ID, 431 Pid: uint32(p.Pid()), 432 ExitStatus: uint32(p.ExitStatus()), 433 ExitedAt: p.ExitedAt(), 434 }) 435 } 436 return &taskAPI.DeleteResponse{ 437 ExitStatus: uint32(p.ExitStatus()), 438 ExitedAt: p.ExitedAt(), 439 Pid: uint32(p.Pid()), 440 }, nil 441} 442 443// Exec an additional process inside the container 444func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { 445 container, err := s.getContainer(r.ID) 446 if err != nil { 447 return nil, err 448 } 449 ok, cancel := container.ReserveProcess(r.ExecID) 450 if !ok { 451 return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) 452 } 453 process, err := container.Exec(ctx, r) 454 if err != nil { 455 cancel() 456 return nil, errdefs.ToGRPC(err) 457 } 458 459 s.send(&eventstypes.TaskExecAdded{ 460 ContainerID: container.ID, 461 ExecID: process.ID(), 462 }) 463 return empty, nil 464} 465 466// ResizePty of a process 467func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { 468 container, err := s.getContainer(r.ID) 469 if err != nil { 470 return nil, err 471 } 472 if err := container.ResizePty(ctx, r); err != nil { 473 return nil, errdefs.ToGRPC(err) 474 } 475 return empty, nil 476} 477 478// State returns runtime state information for a process 479func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { 480 container, err := s.getContainer(r.ID) 481 if err != nil { 482 return nil, err 483 } 484 p, err := container.Process(r.ExecID) 485 if err != nil { 486 return nil, err 487 } 488 st, err := p.Status(ctx) 489 if err != nil { 490 return nil, err 491 } 492 status := task.StatusUnknown 493 switch st { 494 case "created": 495 status = task.StatusCreated 496 case "running": 497 status = task.StatusRunning 498 case "stopped": 499 status = task.StatusStopped 500 case "paused": 501 status = task.StatusPaused 502 case "pausing": 503 status = task.StatusPausing 504 } 505 sio := p.Stdio() 506 return &taskAPI.StateResponse{ 507 ID: p.ID(), 508 Bundle: container.Bundle, 509 Pid: uint32(p.Pid()), 510 Status: status, 511 Stdin: sio.Stdin, 512 Stdout: sio.Stdout, 513 Stderr: sio.Stderr, 514 Terminal: sio.Terminal, 515 ExitStatus: uint32(p.ExitStatus()), 516 ExitedAt: p.ExitedAt(), 517 }, nil 518} 519 520// Pause the container 521func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { 522 container, err := s.getContainer(r.ID) 523 if err != nil { 524 return nil, err 525 } 526 if err := container.Pause(ctx); err != nil { 527 return nil, errdefs.ToGRPC(err) 528 } 529 s.send(&eventstypes.TaskPaused{ 530 ContainerID: container.ID, 531 }) 532 return empty, nil 533} 534 535// Resume the container 536func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { 537 container, err := s.getContainer(r.ID) 538 if err != nil { 539 return nil, err 540 } 541 if err := container.Resume(ctx); err != nil { 542 return nil, errdefs.ToGRPC(err) 543 } 544 s.send(&eventstypes.TaskResumed{ 545 ContainerID: container.ID, 546 }) 547 return empty, nil 548} 549 550// Kill a process with the provided signal 551func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { 552 container, err := s.getContainer(r.ID) 553 if err != nil { 554 return nil, err 555 } 556 if err := container.Kill(ctx, r); err != nil { 557 return nil, errdefs.ToGRPC(err) 558 } 559 return empty, nil 560} 561 562// Pids returns all pids inside the container 563func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { 564 container, err := s.getContainer(r.ID) 565 if err != nil { 566 return nil, err 567 } 568 pids, err := s.getContainerPids(ctx, r.ID) 569 if err != nil { 570 return nil, errdefs.ToGRPC(err) 571 } 572 var processes []*task.ProcessInfo 573 for _, pid := range pids { 574 pInfo := task.ProcessInfo{ 575 Pid: pid, 576 } 577 for _, p := range container.ExecdProcesses() { 578 if p.Pid() == int(pid) { 579 d := &options.ProcessDetails{ 580 ExecID: p.ID(), 581 } 582 a, err := typeurl.MarshalAny(d) 583 if err != nil { 584 return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) 585 } 586 pInfo.Info = a 587 break 588 } 589 } 590 processes = append(processes, &pInfo) 591 } 592 return &taskAPI.PidsResponse{ 593 Processes: processes, 594 }, nil 595} 596 597// CloseIO of a process 598func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { 599 container, err := s.getContainer(r.ID) 600 if err != nil { 601 return nil, err 602 } 603 if err := container.CloseIO(ctx, r); err != nil { 604 return nil, err 605 } 606 return empty, nil 607} 608 609// Checkpoint the container 610func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { 611 container, err := s.getContainer(r.ID) 612 if err != nil { 613 return nil, err 614 } 615 if err := container.Checkpoint(ctx, r); err != nil { 616 return nil, errdefs.ToGRPC(err) 617 } 618 return empty, nil 619} 620 621// Update a running container 622func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { 623 container, err := s.getContainer(r.ID) 624 if err != nil { 625 return nil, err 626 } 627 if err := container.Update(ctx, r); err != nil { 628 return nil, errdefs.ToGRPC(err) 629 } 630 return empty, nil 631} 632 633// Wait for a process to exit 634func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { 635 container, err := s.getContainer(r.ID) 636 if err != nil { 637 return nil, err 638 } 639 p, err := container.Process(r.ExecID) 640 if err != nil { 641 return nil, errdefs.ToGRPC(err) 642 } 643 p.Wait() 644 645 return &taskAPI.WaitResponse{ 646 ExitStatus: uint32(p.ExitStatus()), 647 ExitedAt: p.ExitedAt(), 648 }, nil 649} 650 651// Connect returns shim information such as the shim's pid 652func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { 653 var pid int 654 if container, err := s.getContainer(r.ID); err == nil { 655 pid = container.Pid() 656 } 657 return &taskAPI.ConnectResponse{ 658 ShimPid: uint32(os.Getpid()), 659 TaskPid: uint32(pid), 660 }, nil 661} 662 663func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { 664 s.mu.Lock() 665 // return out if the shim is still servicing containers 666 if len(s.containers) > 0 { 667 s.mu.Unlock() 668 return empty, nil 669 } 670 s.cancel() 671 close(s.events) 672 673 if s.platform != nil { 674 s.platform.Close() 675 } 676 if s.shimAddress != "" { 677 _ = shim.RemoveSocket(s.shimAddress) 678 } 679 return empty, nil 680} 681 682func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { 683 container, err := s.getContainer(r.ID) 684 if err != nil { 685 return nil, err 686 } 687 cgx := container.Cgroup() 688 if cgx == nil { 689 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") 690 } 691 var statsx interface{} 692 switch cg := cgx.(type) { 693 case cgroups.Cgroup: 694 stats, err := cg.Stat(cgroups.IgnoreNotExist) 695 if err != nil { 696 return nil, err 697 } 698 statsx = stats 699 case *cgroupsv2.Manager: 700 stats, err := cg.Stat() 701 if err != nil { 702 return nil, err 703 } 704 statsx = stats 705 default: 706 return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg) 707 } 708 data, err := typeurl.MarshalAny(statsx) 709 if err != nil { 710 return nil, err 711 } 712 return &taskAPI.StatsResponse{ 713 Stats: data, 714 }, nil 715} 716 717func (s *service) processExits() { 718 for e := range s.ec { 719 s.checkProcesses(e) 720 } 721} 722 723func (s *service) send(evt interface{}) { 724 s.events <- evt 725} 726 727func (s *service) sendL(evt interface{}) { 728 s.eventSendMu.Lock() 729 s.events <- evt 730 s.eventSendMu.Unlock() 731} 732 733func (s *service) checkProcesses(e runcC.Exit) { 734 s.mu.Lock() 735 defer s.mu.Unlock() 736 737 for _, container := range s.containers { 738 if !container.HasPid(e.Pid) { 739 continue 740 } 741 742 for _, p := range container.All() { 743 if p.Pid() != e.Pid { 744 continue 745 } 746 747 if ip, ok := p.(*process.Init); ok { 748 // Ensure all children are killed 749 if runc.ShouldKillAllOnExit(s.context, container.Bundle) { 750 if err := ip.KillAll(s.context); err != nil { 751 logrus.WithError(err).WithField("id", ip.ID()). 752 Error("failed to kill init's children") 753 } 754 } 755 } 756 757 p.SetExited(e.Status) 758 s.sendL(&eventstypes.TaskExit{ 759 ContainerID: container.ID, 760 ID: p.ID(), 761 Pid: uint32(e.Pid), 762 ExitStatus: uint32(e.Status), 763 ExitedAt: p.ExitedAt(), 764 }) 765 return 766 } 767 return 768 } 769} 770 771func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { 772 container, err := s.getContainer(id) 773 if err != nil { 774 return nil, err 775 } 776 p, err := container.Process("") 777 if err != nil { 778 return nil, errdefs.ToGRPC(err) 779 } 780 ps, err := p.(*process.Init).Runtime().Ps(ctx, id) 781 if err != nil { 782 return nil, err 783 } 784 pids := make([]uint32, 0, len(ps)) 785 for _, pid := range ps { 786 pids = append(pids, uint32(pid)) 787 } 788 return pids, nil 789} 790 791func (s *service) forward(ctx context.Context, publisher shim.Publisher) { 792 ns, _ := namespaces.Namespace(ctx) 793 ctx = namespaces.WithNamespace(context.Background(), ns) 794 for e := range s.events { 795 err := publisher.Publish(ctx, runc.GetTopic(e), e) 796 if err != nil { 797 logrus.WithError(err).Error("post event") 798 } 799 } 800 publisher.Close() 801} 802 803func (s *service) getContainer(id string) (*runc.Container, error) { 804 s.mu.Lock() 805 container := s.containers[id] 806 s.mu.Unlock() 807 if container == nil { 808 return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") 809 } 810 return container, nil 811} 812 813// initialize a single epoll fd to manage our consoles. `initPlatform` should 814// only be called once. 815func (s *service) initPlatform() error { 816 if s.platform != nil { 817 return nil 818 } 819 p, err := runc.NewPlatform() 820 if err != nil { 821 return err 822 } 823 s.platform = p 824 return nil 825} 826