1// +build linux 2 3/* 4 Copyright The containerd Authors. 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17*/ 18 19package linux 20 21import ( 22 "context" 23 "sync" 24 25 "github.com/containerd/cgroups" 26 eventstypes "github.com/containerd/containerd/api/events" 27 "github.com/containerd/containerd/api/types/task" 28 "github.com/containerd/containerd/errdefs" 29 "github.com/containerd/containerd/events/exchange" 30 "github.com/containerd/containerd/identifiers" 31 "github.com/containerd/containerd/log" 32 "github.com/containerd/containerd/runtime" 33 "github.com/containerd/containerd/runtime/v1/shim/client" 34 "github.com/containerd/containerd/runtime/v1/shim/v1" 35 "github.com/containerd/ttrpc" 36 "github.com/containerd/typeurl" 37 "github.com/gogo/protobuf/types" 38 "github.com/pkg/errors" 39) 40 41// Task on a linux based system 42type Task struct { 43 mu sync.Mutex 44 id string 45 pid int 46 shim *client.Client 47 namespace string 48 cg cgroups.Cgroup 49 events *exchange.Exchange 50 tasks *runtime.TaskList 51 bundle *bundle 52} 53 54func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.TaskList, bundle *bundle) (*Task, error) { 55 var ( 56 err error 57 cg cgroups.Cgroup 58 ) 59 if pid > 0 { 60 cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) 61 if err != nil && err != cgroups.ErrCgroupDeleted { 62 return nil, err 63 } 64 } 65 return &Task{ 66 id: id, 67 pid: pid, 68 shim: shim, 69 namespace: namespace, 70 cg: cg, 71 events: events, 72 tasks: list, 73 bundle: bundle, 74 }, nil 75} 76 77// ID of the task 78func (t *Task) ID() string { 79 return t.id 80} 81 82// Namespace of the task 83func (t *Task) Namespace() string { 84 return t.namespace 85} 86 87// Delete the task and return the exit status 88func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) { 89 rsp, err := t.shim.Delete(ctx, empty) 90 if err != nil { 91 return nil, errdefs.FromGRPC(err) 92 } 93 t.tasks.Delete(ctx, t.id) 94 if err := t.shim.KillShim(ctx); err != nil { 95 log.G(ctx).WithError(err).Error("failed to kill shim") 96 } 97 if err := t.bundle.Delete(); err != nil { 98 log.G(ctx).WithError(err).Error("failed to delete bundle") 99 } 100 t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ 101 ContainerID: t.id, 102 ExitStatus: rsp.ExitStatus, 103 ExitedAt: rsp.ExitedAt, 104 Pid: rsp.Pid, 105 }) 106 return &runtime.Exit{ 107 Status: rsp.ExitStatus, 108 Timestamp: rsp.ExitedAt, 109 Pid: rsp.Pid, 110 }, nil 111} 112 113// Start the task 114func (t *Task) Start(ctx context.Context) error { 115 t.mu.Lock() 116 hasCgroup := t.cg != nil 117 t.mu.Unlock() 118 r, err := t.shim.Start(ctx, &shim.StartRequest{ 119 ID: t.id, 120 }) 121 if err != nil { 122 return errdefs.FromGRPC(err) 123 } 124 t.pid = int(r.Pid) 125 if !hasCgroup { 126 cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid)) 127 if err != nil { 128 return err 129 } 130 t.mu.Lock() 131 t.cg = cg 132 t.mu.Unlock() 133 } 134 t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{ 135 ContainerID: t.id, 136 Pid: uint32(t.pid), 137 }) 138 return nil 139} 140 141// State returns runtime information for the task 142func (t *Task) State(ctx context.Context) (runtime.State, error) { 143 response, err := t.shim.State(ctx, &shim.StateRequest{ 144 ID: t.id, 145 }) 146 if err != nil { 147 if errors.Cause(err) != ttrpc.ErrClosed { 148 return runtime.State{}, errdefs.FromGRPC(err) 149 } 150 return runtime.State{}, errdefs.ErrNotFound 151 } 152 var status runtime.Status 153 switch response.Status { 154 case task.StatusCreated: 155 status = runtime.CreatedStatus 156 case task.StatusRunning: 157 status = runtime.RunningStatus 158 case task.StatusStopped: 159 status = runtime.StoppedStatus 160 case task.StatusPaused: 161 status = runtime.PausedStatus 162 case task.StatusPausing: 163 status = runtime.PausingStatus 164 } 165 return runtime.State{ 166 Pid: response.Pid, 167 Status: status, 168 Stdin: response.Stdin, 169 Stdout: response.Stdout, 170 Stderr: response.Stderr, 171 Terminal: response.Terminal, 172 ExitStatus: response.ExitStatus, 173 ExitedAt: response.ExitedAt, 174 }, nil 175} 176 177// Pause the task and all processes 178func (t *Task) Pause(ctx context.Context) error { 179 if _, err := t.shim.Pause(ctx, empty); err != nil { 180 return errdefs.FromGRPC(err) 181 } 182 t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{ 183 ContainerID: t.id, 184 }) 185 return nil 186} 187 188// Resume the task and all processes 189func (t *Task) Resume(ctx context.Context) error { 190 if _, err := t.shim.Resume(ctx, empty); err != nil { 191 return errdefs.FromGRPC(err) 192 } 193 t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{ 194 ContainerID: t.id, 195 }) 196 return nil 197} 198 199// Kill the task using the provided signal 200// 201// Optionally send the signal to all processes that are a child of the task 202func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error { 203 if _, err := t.shim.Kill(ctx, &shim.KillRequest{ 204 ID: t.id, 205 Signal: signal, 206 All: all, 207 }); err != nil { 208 return errdefs.FromGRPC(err) 209 } 210 return nil 211} 212 213// Exec creates a new process inside the task 214func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { 215 if err := identifiers.Validate(id); err != nil { 216 return nil, errors.Wrapf(err, "invalid exec id") 217 } 218 request := &shim.ExecProcessRequest{ 219 ID: id, 220 Stdin: opts.IO.Stdin, 221 Stdout: opts.IO.Stdout, 222 Stderr: opts.IO.Stderr, 223 Terminal: opts.IO.Terminal, 224 Spec: opts.Spec, 225 } 226 if _, err := t.shim.Exec(ctx, request); err != nil { 227 return nil, errdefs.FromGRPC(err) 228 } 229 return &Process{ 230 id: id, 231 t: t, 232 }, nil 233} 234 235// Pids returns all system level process ids running inside the task 236func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { 237 resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{ 238 ID: t.id, 239 }) 240 if err != nil { 241 return nil, errdefs.FromGRPC(err) 242 } 243 var processList []runtime.ProcessInfo 244 for _, p := range resp.Processes { 245 processList = append(processList, runtime.ProcessInfo{ 246 Pid: p.Pid, 247 Info: p.Info, 248 }) 249 } 250 return processList, nil 251} 252 253// ResizePty changes the side of the task's PTY to the provided width and height 254func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { 255 _, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{ 256 ID: t.id, 257 Width: size.Width, 258 Height: size.Height, 259 }) 260 if err != nil { 261 err = errdefs.FromGRPC(err) 262 } 263 return err 264} 265 266// CloseIO closes the provided IO on the task 267func (t *Task) CloseIO(ctx context.Context) error { 268 _, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{ 269 ID: t.id, 270 Stdin: true, 271 }) 272 if err != nil { 273 err = errdefs.FromGRPC(err) 274 } 275 return err 276} 277 278// Checkpoint creates a system level dump of the task and process information that can be later restored 279func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error { 280 r := &shim.CheckpointTaskRequest{ 281 Path: path, 282 Options: options, 283 } 284 if _, err := t.shim.Checkpoint(ctx, r); err != nil { 285 return errdefs.FromGRPC(err) 286 } 287 t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{ 288 ContainerID: t.id, 289 }) 290 return nil 291} 292 293// Update changes runtime information of a running task 294func (t *Task) Update(ctx context.Context, resources *types.Any) error { 295 if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{ 296 Resources: resources, 297 }); err != nil { 298 return errdefs.FromGRPC(err) 299 } 300 return nil 301} 302 303// Process returns a specific process inside the task by the process id 304func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) { 305 p := &Process{ 306 id: id, 307 t: t, 308 } 309 if _, err := p.State(ctx); err != nil { 310 return nil, err 311 } 312 return p, nil 313} 314 315// Stats returns runtime specific system level metric information for the task 316func (t *Task) Stats(ctx context.Context) (*types.Any, error) { 317 t.mu.Lock() 318 defer t.mu.Unlock() 319 if t.cg == nil { 320 return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist") 321 } 322 stats, err := t.cg.Stat(cgroups.IgnoreNotExist) 323 if err != nil { 324 return nil, err 325 } 326 return typeurl.MarshalAny(stats) 327} 328 329// Cgroup returns the underlying cgroup for a linux task 330func (t *Task) Cgroup() (cgroups.Cgroup, error) { 331 t.mu.Lock() 332 defer t.mu.Unlock() 333 if t.cg == nil { 334 return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist") 335 } 336 return t.cg, nil 337} 338 339// Wait for the task to exit returning the status and timestamp 340func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) { 341 r, err := t.shim.Wait(ctx, &shim.WaitRequest{ 342 ID: t.id, 343 }) 344 if err != nil { 345 return nil, err 346 } 347 return &runtime.Exit{ 348 Timestamp: r.ExitedAt, 349 Status: r.ExitStatus, 350 }, nil 351} 352