1// +build linux 2 3package linux 4 5import ( 6 "context" 7 "sync" 8 9 "github.com/pkg/errors" 10 "google.golang.org/grpc" 11 12 "github.com/containerd/cgroups" 13 eventstypes "github.com/containerd/containerd/api/events" 14 "github.com/containerd/containerd/api/types/task" 15 "github.com/containerd/containerd/errdefs" 16 "github.com/containerd/containerd/events/exchange" 17 "github.com/containerd/containerd/identifiers" 18 "github.com/containerd/containerd/linux/shim/client" 19 shim "github.com/containerd/containerd/linux/shim/v1" 20 "github.com/containerd/containerd/runtime" 21 runc "github.com/containerd/go-runc" 22 "github.com/gogo/protobuf/types" 23) 24 25// Task on a linux based system 26type Task struct { 27 mu sync.Mutex 28 id string 29 pid int 30 shim *client.Client 31 namespace string 32 cg cgroups.Cgroup 33 monitor runtime.TaskMonitor 34 events *exchange.Exchange 35 runtime *runc.Runc 36} 37 38func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange, runtime *runc.Runc) (*Task, error) { 39 var ( 40 err error 41 cg cgroups.Cgroup 42 ) 43 if pid > 0 { 44 cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) 45 if err != nil && err != cgroups.ErrCgroupDeleted { 46 return nil, err 47 } 48 } 49 return &Task{ 50 id: id, 51 pid: pid, 52 shim: shim, 53 namespace: namespace, 54 cg: cg, 55 monitor: monitor, 56 events: events, 57 runtime: runtime, 58 }, nil 59} 60 61// ID of the task 62func (t *Task) ID() string { 63 return t.id 64} 65 66// Info returns task information about the runtime and namespace 67func (t *Task) Info() runtime.TaskInfo { 68 return runtime.TaskInfo{ 69 ID: t.id, 70 Runtime: pluginID, 71 Namespace: t.namespace, 72 } 73} 74 75// Start the task 76func (t *Task) Start(ctx context.Context) error { 77 t.mu.Lock() 78 hasCgroup := t.cg != nil 79 t.mu.Unlock() 80 r, err := t.shim.Start(ctx, &shim.StartRequest{ 81 ID: t.id, 82 }) 83 if err != nil { 84 return errdefs.FromGRPC(err) 85 } 86 t.pid = int(r.Pid) 87 if !hasCgroup { 88 cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid)) 89 if err != nil { 90 return err 91 } 92 t.mu.Lock() 93 t.cg = cg 94 t.mu.Unlock() 95 if err := t.monitor.Monitor(t); err != nil { 96 return err 97 } 98 } 99 t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{ 100 ContainerID: t.id, 101 Pid: uint32(t.pid), 102 }) 103 return nil 104} 105 106// State returns runtime information for the task 107func (t *Task) State(ctx context.Context) (runtime.State, error) { 108 response, err := t.shim.State(ctx, &shim.StateRequest{ 109 ID: t.id, 110 }) 111 if err != nil { 112 if err != grpc.ErrServerStopped { 113 return runtime.State{}, errdefs.FromGRPC(err) 114 } 115 return runtime.State{}, errdefs.ErrNotFound 116 } 117 var status runtime.Status 118 switch response.Status { 119 case task.StatusCreated: 120 status = runtime.CreatedStatus 121 case task.StatusRunning: 122 status = runtime.RunningStatus 123 case task.StatusStopped: 124 status = runtime.StoppedStatus 125 case task.StatusPaused: 126 status = runtime.PausedStatus 127 case task.StatusPausing: 128 status = runtime.PausingStatus 129 } 130 return runtime.State{ 131 Pid: response.Pid, 132 Status: status, 133 Stdin: response.Stdin, 134 Stdout: response.Stdout, 135 Stderr: response.Stderr, 136 Terminal: response.Terminal, 137 ExitStatus: response.ExitStatus, 138 ExitedAt: response.ExitedAt, 139 }, nil 140} 141 142// Pause the task and all processes 143func (t *Task) Pause(ctx context.Context) error { 144 if _, err := t.shim.Pause(ctx, empty); err != nil { 145 return errdefs.FromGRPC(err) 146 } 147 t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{ 148 ContainerID: t.id, 149 }) 150 return nil 151} 152 153// Resume the task and all processes 154func (t *Task) Resume(ctx context.Context) error { 155 if _, err := t.shim.Resume(ctx, empty); err != nil { 156 return errdefs.FromGRPC(err) 157 } 158 t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{ 159 ContainerID: t.id, 160 }) 161 return nil 162} 163 164// Kill the task using the provided signal 165// 166// Optionally send the signal to all processes that are a child of the task 167func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error { 168 if _, err := t.shim.Kill(ctx, &shim.KillRequest{ 169 ID: t.id, 170 Signal: signal, 171 All: all, 172 }); err != nil { 173 return errdefs.FromGRPC(err) 174 } 175 return nil 176} 177 178// Exec creates a new process inside the task 179func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { 180 if err := identifiers.Validate(id); err != nil { 181 return nil, errors.Wrapf(err, "invalid exec id") 182 } 183 request := &shim.ExecProcessRequest{ 184 ID: id, 185 Stdin: opts.IO.Stdin, 186 Stdout: opts.IO.Stdout, 187 Stderr: opts.IO.Stderr, 188 Terminal: opts.IO.Terminal, 189 Spec: opts.Spec, 190 } 191 if _, err := t.shim.Exec(ctx, request); err != nil { 192 return nil, errdefs.FromGRPC(err) 193 } 194 return &Process{ 195 id: id, 196 t: t, 197 }, nil 198} 199 200// Pids returns all system level process ids running inside the task 201func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { 202 resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{ 203 ID: t.id, 204 }) 205 if err != nil { 206 return nil, errdefs.FromGRPC(err) 207 } 208 var processList []runtime.ProcessInfo 209 for _, p := range resp.Processes { 210 processList = append(processList, runtime.ProcessInfo{ 211 Pid: p.Pid, 212 Info: p.Info, 213 }) 214 } 215 return processList, nil 216} 217 218// ResizePty changes the side of the task's PTY to the provided width and height 219func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { 220 _, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{ 221 ID: t.id, 222 Width: size.Width, 223 Height: size.Height, 224 }) 225 if err != nil { 226 err = errdefs.FromGRPC(err) 227 } 228 return err 229} 230 231// CloseIO closes the provided IO on the task 232func (t *Task) CloseIO(ctx context.Context) error { 233 _, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{ 234 ID: t.id, 235 Stdin: true, 236 }) 237 if err != nil { 238 err = errdefs.FromGRPC(err) 239 } 240 return err 241} 242 243// Checkpoint creates a system level dump of the task and process information that can be later restored 244func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error { 245 r := &shim.CheckpointTaskRequest{ 246 Path: path, 247 Options: options, 248 } 249 if _, err := t.shim.Checkpoint(ctx, r); err != nil { 250 return errdefs.FromGRPC(err) 251 } 252 t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{ 253 ContainerID: t.id, 254 }) 255 return nil 256} 257 258// DeleteProcess removes the provided process from the task and deletes all on disk state 259func (t *Task) DeleteProcess(ctx context.Context, id string) (*runtime.Exit, error) { 260 r, err := t.shim.DeleteProcess(ctx, &shim.DeleteProcessRequest{ 261 ID: id, 262 }) 263 if err != nil { 264 return nil, errdefs.FromGRPC(err) 265 } 266 return &runtime.Exit{ 267 Status: r.ExitStatus, 268 Timestamp: r.ExitedAt, 269 Pid: r.Pid, 270 }, nil 271} 272 273// Update changes runtime information of a running task 274func (t *Task) Update(ctx context.Context, resources *types.Any) error { 275 if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{ 276 Resources: resources, 277 }); err != nil { 278 return errdefs.FromGRPC(err) 279 } 280 return nil 281} 282 283// Process returns a specific process inside the task by the process id 284func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) { 285 // TODO: verify process exists for container 286 return &Process{ 287 id: id, 288 t: t, 289 }, nil 290} 291 292// Metrics returns runtime specific system level metric information for the task 293func (t *Task) Metrics(ctx context.Context) (interface{}, error) { 294 t.mu.Lock() 295 defer t.mu.Unlock() 296 if t.cg == nil { 297 return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist") 298 } 299 stats, err := t.cg.Stat(cgroups.IgnoreNotExist) 300 if err != nil { 301 return nil, err 302 } 303 return stats, nil 304} 305 306// Cgroup returns the underlying cgroup for a linux task 307func (t *Task) Cgroup() (cgroups.Cgroup, error) { 308 t.mu.Lock() 309 defer t.mu.Unlock() 310 if t.cg == nil { 311 return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist") 312 } 313 return t.cg, nil 314} 315 316// Wait for the task to exit returning the status and timestamp 317func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) { 318 r, err := t.shim.Wait(ctx, &shim.WaitRequest{ 319 ID: t.id, 320 }) 321 if err != nil { 322 return nil, err 323 } 324 return &runtime.Exit{ 325 Timestamp: r.ExitedAt, 326 Status: r.ExitStatus, 327 }, nil 328} 329