1// +build linux 2 3/* 4 Copyright The containerd Authors. 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17*/ 18 19package linux 20 21import ( 22 "context" 23 "sync" 24 25 "github.com/containerd/cgroups" 26 eventstypes "github.com/containerd/containerd/api/events" 27 "github.com/containerd/containerd/api/types/task" 28 "github.com/containerd/containerd/errdefs" 29 "github.com/containerd/containerd/events/exchange" 30 "github.com/containerd/containerd/identifiers" 31 "github.com/containerd/containerd/log" 32 "github.com/containerd/containerd/runtime" 33 "github.com/containerd/containerd/runtime/v1/shim/client" 34 shim "github.com/containerd/containerd/runtime/v1/shim/v1" 35 runc "github.com/containerd/go-runc" 36 "github.com/containerd/ttrpc" 37 "github.com/containerd/typeurl" 38 "github.com/gogo/protobuf/types" 39 "github.com/pkg/errors" 40) 41 42// Task on a linux based system 43type Task struct { 44 mu sync.Mutex 45 id string 46 pid int 47 shim *client.Client 48 namespace string 49 cg cgroups.Cgroup 50 events *exchange.Exchange 51 tasks *runtime.TaskList 52 bundle *bundle 53} 54 55func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, runtime *runc.Runc, list *runtime.TaskList, bundle *bundle) (*Task, error) { 56 var ( 57 err error 58 cg cgroups.Cgroup 59 ) 60 if pid > 0 { 61 cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) 62 if err != nil && err != cgroups.ErrCgroupDeleted { 63 return nil, err 64 } 65 } 66 return &Task{ 67 id: id, 68 pid: pid, 69 shim: shim, 70 namespace: namespace, 71 cg: cg, 72 events: events, 73 tasks: list, 74 bundle: bundle, 75 }, nil 76} 77 78// ID of the task 79func (t *Task) ID() string { 80 return t.id 81} 82 83// Namespace of the task 84func (t *Task) Namespace() string { 85 return t.namespace 86} 87 88// Delete the task and return the exit status 89func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) { 90 rsp, err := t.shim.Delete(ctx, empty) 91 if err != nil { 92 return nil, errdefs.FromGRPC(err) 93 } 94 t.tasks.Delete(ctx, t.id) 95 if err := t.shim.KillShim(ctx); err != nil { 96 log.G(ctx).WithError(err).Error("failed to kill shim") 97 } 98 if err := t.bundle.Delete(); err != nil { 99 log.G(ctx).WithError(err).Error("failed to delete bundle") 100 } 101 t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ 102 ContainerID: t.id, 103 ExitStatus: rsp.ExitStatus, 104 ExitedAt: rsp.ExitedAt, 105 Pid: rsp.Pid, 106 }) 107 return &runtime.Exit{ 108 Status: rsp.ExitStatus, 109 Timestamp: rsp.ExitedAt, 110 Pid: rsp.Pid, 111 }, nil 112} 113 114// Start the task 115func (t *Task) Start(ctx context.Context) error { 116 t.mu.Lock() 117 hasCgroup := t.cg != nil 118 t.mu.Unlock() 119 r, err := t.shim.Start(ctx, &shim.StartRequest{ 120 ID: t.id, 121 }) 122 if err != nil { 123 return errdefs.FromGRPC(err) 124 } 125 t.pid = int(r.Pid) 126 if !hasCgroup { 127 cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid)) 128 if err != nil { 129 return err 130 } 131 t.mu.Lock() 132 t.cg = cg 133 t.mu.Unlock() 134 } 135 t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{ 136 ContainerID: t.id, 137 Pid: uint32(t.pid), 138 }) 139 return nil 140} 141 142// State returns runtime information for the task 143func (t *Task) State(ctx context.Context) (runtime.State, error) { 144 response, err := t.shim.State(ctx, &shim.StateRequest{ 145 ID: t.id, 146 }) 147 if err != nil { 148 if errors.Cause(err) != ttrpc.ErrClosed { 149 return runtime.State{}, errdefs.FromGRPC(err) 150 } 151 return runtime.State{}, errdefs.ErrNotFound 152 } 153 var status runtime.Status 154 switch response.Status { 155 case task.StatusCreated: 156 status = runtime.CreatedStatus 157 case task.StatusRunning: 158 status = runtime.RunningStatus 159 case task.StatusStopped: 160 status = runtime.StoppedStatus 161 case task.StatusPaused: 162 status = runtime.PausedStatus 163 case task.StatusPausing: 164 status = runtime.PausingStatus 165 } 166 return runtime.State{ 167 Pid: response.Pid, 168 Status: status, 169 Stdin: response.Stdin, 170 Stdout: response.Stdout, 171 Stderr: response.Stderr, 172 Terminal: response.Terminal, 173 ExitStatus: response.ExitStatus, 174 ExitedAt: response.ExitedAt, 175 }, nil 176} 177 178// Pause the task and all processes 179func (t *Task) Pause(ctx context.Context) error { 180 if _, err := t.shim.Pause(ctx, empty); err != nil { 181 return errdefs.FromGRPC(err) 182 } 183 t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{ 184 ContainerID: t.id, 185 }) 186 return nil 187} 188 189// Resume the task and all processes 190func (t *Task) Resume(ctx context.Context) error { 191 if _, err := t.shim.Resume(ctx, empty); err != nil { 192 return errdefs.FromGRPC(err) 193 } 194 t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{ 195 ContainerID: t.id, 196 }) 197 return nil 198} 199 200// Kill the task using the provided signal 201// 202// Optionally send the signal to all processes that are a child of the task 203func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error { 204 if _, err := t.shim.Kill(ctx, &shim.KillRequest{ 205 ID: t.id, 206 Signal: signal, 207 All: all, 208 }); err != nil { 209 return errdefs.FromGRPC(err) 210 } 211 return nil 212} 213 214// Exec creates a new process inside the task 215func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { 216 if err := identifiers.Validate(id); err != nil { 217 return nil, errors.Wrapf(err, "invalid exec id") 218 } 219 request := &shim.ExecProcessRequest{ 220 ID: id, 221 Stdin: opts.IO.Stdin, 222 Stdout: opts.IO.Stdout, 223 Stderr: opts.IO.Stderr, 224 Terminal: opts.IO.Terminal, 225 Spec: opts.Spec, 226 } 227 if _, err := t.shim.Exec(ctx, request); err != nil { 228 return nil, errdefs.FromGRPC(err) 229 } 230 return &Process{ 231 id: id, 232 t: t, 233 }, nil 234} 235 236// Pids returns all system level process ids running inside the task 237func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { 238 resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{ 239 ID: t.id, 240 }) 241 if err != nil { 242 return nil, errdefs.FromGRPC(err) 243 } 244 var processList []runtime.ProcessInfo 245 for _, p := range resp.Processes { 246 processList = append(processList, runtime.ProcessInfo{ 247 Pid: p.Pid, 248 Info: p.Info, 249 }) 250 } 251 return processList, nil 252} 253 254// ResizePty changes the side of the task's PTY to the provided width and height 255func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { 256 _, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{ 257 ID: t.id, 258 Width: size.Width, 259 Height: size.Height, 260 }) 261 if err != nil { 262 err = errdefs.FromGRPC(err) 263 } 264 return err 265} 266 267// CloseIO closes the provided IO on the task 268func (t *Task) CloseIO(ctx context.Context) error { 269 _, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{ 270 ID: t.id, 271 Stdin: true, 272 }) 273 if err != nil { 274 err = errdefs.FromGRPC(err) 275 } 276 return err 277} 278 279// Checkpoint creates a system level dump of the task and process information that can be later restored 280func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error { 281 r := &shim.CheckpointTaskRequest{ 282 Path: path, 283 Options: options, 284 } 285 if _, err := t.shim.Checkpoint(ctx, r); err != nil { 286 return errdefs.FromGRPC(err) 287 } 288 t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{ 289 ContainerID: t.id, 290 }) 291 return nil 292} 293 294// Update changes runtime information of a running task 295func (t *Task) Update(ctx context.Context, resources *types.Any) error { 296 if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{ 297 Resources: resources, 298 }); err != nil { 299 return errdefs.FromGRPC(err) 300 } 301 return nil 302} 303 304// Process returns a specific process inside the task by the process id 305func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) { 306 p := &Process{ 307 id: id, 308 t: t, 309 } 310 if _, err := p.State(ctx); err != nil { 311 return nil, err 312 } 313 return p, nil 314} 315 316// Stats returns runtime specific system level metric information for the task 317func (t *Task) Stats(ctx context.Context) (*types.Any, error) { 318 t.mu.Lock() 319 defer t.mu.Unlock() 320 if t.cg == nil { 321 return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist") 322 } 323 stats, err := t.cg.Stat(cgroups.IgnoreNotExist) 324 if err != nil { 325 return nil, err 326 } 327 return typeurl.MarshalAny(stats) 328} 329 330// Cgroup returns the underlying cgroup for a linux task 331func (t *Task) Cgroup() (cgroups.Cgroup, error) { 332 t.mu.Lock() 333 defer t.mu.Unlock() 334 if t.cg == nil { 335 return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist") 336 } 337 return t.cg, nil 338} 339 340// Wait for the task to exit returning the status and timestamp 341func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) { 342 r, err := t.shim.Wait(ctx, &shim.WaitRequest{ 343 ID: t.id, 344 }) 345 if err != nil { 346 return nil, err 347 } 348 return &runtime.Exit{ 349 Timestamp: r.ExitedAt, 350 Status: r.ExitStatus, 351 }, nil 352} 353