1// +build linux
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package linux
20
21import (
22	"context"
23	"sync"
24
25	"github.com/containerd/cgroups"
26	eventstypes "github.com/containerd/containerd/api/events"
27	"github.com/containerd/containerd/api/types/task"
28	"github.com/containerd/containerd/errdefs"
29	"github.com/containerd/containerd/events/exchange"
30	"github.com/containerd/containerd/identifiers"
31	"github.com/containerd/containerd/log"
32	"github.com/containerd/containerd/runtime"
33	"github.com/containerd/containerd/runtime/v1/shim/client"
34	"github.com/containerd/containerd/runtime/v1/shim/v1"
35	"github.com/containerd/ttrpc"
36	"github.com/containerd/typeurl"
37	"github.com/gogo/protobuf/types"
38	"github.com/pkg/errors"
39)
40
41// Task on a linux based system
42type Task struct {
43	mu        sync.Mutex
44	id        string
45	pid       int
46	shim      *client.Client
47	namespace string
48	cg        cgroups.Cgroup
49	events    *exchange.Exchange
50	tasks     *runtime.TaskList
51	bundle    *bundle
52}
53
54func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.TaskList, bundle *bundle) (*Task, error) {
55	var (
56		err error
57		cg  cgroups.Cgroup
58	)
59	if pid > 0 {
60		cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
61		if err != nil && err != cgroups.ErrCgroupDeleted {
62			return nil, err
63		}
64	}
65	return &Task{
66		id:        id,
67		pid:       pid,
68		shim:      shim,
69		namespace: namespace,
70		cg:        cg,
71		events:    events,
72		tasks:     list,
73		bundle:    bundle,
74	}, nil
75}
76
77// ID of the task
78func (t *Task) ID() string {
79	return t.id
80}
81
82// Namespace of the task
83func (t *Task) Namespace() string {
84	return t.namespace
85}
86
87// Delete the task and return the exit status
88func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
89	rsp, err := t.shim.Delete(ctx, empty)
90	if err != nil {
91		return nil, errdefs.FromGRPC(err)
92	}
93	t.tasks.Delete(ctx, t.id)
94	if err := t.shim.KillShim(ctx); err != nil {
95		log.G(ctx).WithError(err).Error("failed to kill shim")
96	}
97	if err := t.bundle.Delete(); err != nil {
98		log.G(ctx).WithError(err).Error("failed to delete bundle")
99	}
100	t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
101		ContainerID: t.id,
102		ExitStatus:  rsp.ExitStatus,
103		ExitedAt:    rsp.ExitedAt,
104		Pid:         rsp.Pid,
105	})
106	return &runtime.Exit{
107		Status:    rsp.ExitStatus,
108		Timestamp: rsp.ExitedAt,
109		Pid:       rsp.Pid,
110	}, nil
111}
112
113// Start the task
114func (t *Task) Start(ctx context.Context) error {
115	t.mu.Lock()
116	hasCgroup := t.cg != nil
117	t.mu.Unlock()
118	r, err := t.shim.Start(ctx, &shim.StartRequest{
119		ID: t.id,
120	})
121	if err != nil {
122		return errdefs.FromGRPC(err)
123	}
124	t.pid = int(r.Pid)
125	if !hasCgroup {
126		cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid))
127		if err != nil {
128			return err
129		}
130		t.mu.Lock()
131		t.cg = cg
132		t.mu.Unlock()
133	}
134	t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
135		ContainerID: t.id,
136		Pid:         uint32(t.pid),
137	})
138	return nil
139}
140
141// State returns runtime information for the task
142func (t *Task) State(ctx context.Context) (runtime.State, error) {
143	response, err := t.shim.State(ctx, &shim.StateRequest{
144		ID: t.id,
145	})
146	if err != nil {
147		if errors.Cause(err) != ttrpc.ErrClosed {
148			return runtime.State{}, errdefs.FromGRPC(err)
149		}
150		return runtime.State{}, errdefs.ErrNotFound
151	}
152	var status runtime.Status
153	switch response.Status {
154	case task.StatusCreated:
155		status = runtime.CreatedStatus
156	case task.StatusRunning:
157		status = runtime.RunningStatus
158	case task.StatusStopped:
159		status = runtime.StoppedStatus
160	case task.StatusPaused:
161		status = runtime.PausedStatus
162	case task.StatusPausing:
163		status = runtime.PausingStatus
164	}
165	return runtime.State{
166		Pid:        response.Pid,
167		Status:     status,
168		Stdin:      response.Stdin,
169		Stdout:     response.Stdout,
170		Stderr:     response.Stderr,
171		Terminal:   response.Terminal,
172		ExitStatus: response.ExitStatus,
173		ExitedAt:   response.ExitedAt,
174	}, nil
175}
176
177// Pause the task and all processes
178func (t *Task) Pause(ctx context.Context) error {
179	if _, err := t.shim.Pause(ctx, empty); err != nil {
180		return errdefs.FromGRPC(err)
181	}
182	t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
183		ContainerID: t.id,
184	})
185	return nil
186}
187
188// Resume the task and all processes
189func (t *Task) Resume(ctx context.Context) error {
190	if _, err := t.shim.Resume(ctx, empty); err != nil {
191		return errdefs.FromGRPC(err)
192	}
193	t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
194		ContainerID: t.id,
195	})
196	return nil
197}
198
199// Kill the task using the provided signal
200//
201// Optionally send the signal to all processes that are a child of the task
202func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
203	if _, err := t.shim.Kill(ctx, &shim.KillRequest{
204		ID:     t.id,
205		Signal: signal,
206		All:    all,
207	}); err != nil {
208		return errdefs.FromGRPC(err)
209	}
210	return nil
211}
212
213// Exec creates a new process inside the task
214func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
215	if err := identifiers.Validate(id); err != nil {
216		return nil, errors.Wrapf(err, "invalid exec id")
217	}
218	request := &shim.ExecProcessRequest{
219		ID:       id,
220		Stdin:    opts.IO.Stdin,
221		Stdout:   opts.IO.Stdout,
222		Stderr:   opts.IO.Stderr,
223		Terminal: opts.IO.Terminal,
224		Spec:     opts.Spec,
225	}
226	if _, err := t.shim.Exec(ctx, request); err != nil {
227		return nil, errdefs.FromGRPC(err)
228	}
229	return &Process{
230		id: id,
231		t:  t,
232	}, nil
233}
234
235// Pids returns all system level process ids running inside the task
236func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
237	resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{
238		ID: t.id,
239	})
240	if err != nil {
241		return nil, errdefs.FromGRPC(err)
242	}
243	var processList []runtime.ProcessInfo
244	for _, p := range resp.Processes {
245		processList = append(processList, runtime.ProcessInfo{
246			Pid:  p.Pid,
247			Info: p.Info,
248		})
249	}
250	return processList, nil
251}
252
253// ResizePty changes the side of the task's PTY to the provided width and height
254func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
255	_, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
256		ID:     t.id,
257		Width:  size.Width,
258		Height: size.Height,
259	})
260	if err != nil {
261		err = errdefs.FromGRPC(err)
262	}
263	return err
264}
265
266// CloseIO closes the provided IO on the task
267func (t *Task) CloseIO(ctx context.Context) error {
268	_, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{
269		ID:    t.id,
270		Stdin: true,
271	})
272	if err != nil {
273		err = errdefs.FromGRPC(err)
274	}
275	return err
276}
277
278// Checkpoint creates a system level dump of the task and process information that can be later restored
279func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error {
280	r := &shim.CheckpointTaskRequest{
281		Path:    path,
282		Options: options,
283	}
284	if _, err := t.shim.Checkpoint(ctx, r); err != nil {
285		return errdefs.FromGRPC(err)
286	}
287	t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
288		ContainerID: t.id,
289	})
290	return nil
291}
292
293// Update changes runtime information of a running task
294func (t *Task) Update(ctx context.Context, resources *types.Any) error {
295	if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{
296		Resources: resources,
297	}); err != nil {
298		return errdefs.FromGRPC(err)
299	}
300	return nil
301}
302
303// Process returns a specific process inside the task by the process id
304func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
305	p := &Process{
306		id: id,
307		t:  t,
308	}
309	if _, err := p.State(ctx); err != nil {
310		return nil, err
311	}
312	return p, nil
313}
314
315// Stats returns runtime specific system level metric information for the task
316func (t *Task) Stats(ctx context.Context) (*types.Any, error) {
317	t.mu.Lock()
318	defer t.mu.Unlock()
319	if t.cg == nil {
320		return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
321	}
322	stats, err := t.cg.Stat(cgroups.IgnoreNotExist)
323	if err != nil {
324		return nil, err
325	}
326	return typeurl.MarshalAny(stats)
327}
328
329// Cgroup returns the underlying cgroup for a linux task
330func (t *Task) Cgroup() (cgroups.Cgroup, error) {
331	t.mu.Lock()
332	defer t.mu.Unlock()
333	if t.cg == nil {
334		return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
335	}
336	return t.cg, nil
337}
338
339// Wait for the task to exit returning the status and timestamp
340func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
341	r, err := t.shim.Wait(ctx, &shim.WaitRequest{
342		ID: t.id,
343	})
344	if err != nil {
345		return nil, err
346	}
347	return &runtime.Exit{
348		Timestamp: r.ExitedAt,
349		Status:    r.ExitStatus,
350	}, nil
351}
352