1// +build linux
2
3/*
4   Copyright The containerd Authors.
5
6   Licensed under the Apache License, Version 2.0 (the "License");
7   you may not use this file except in compliance with the License.
8   You may obtain a copy of the License at
9
10       http://www.apache.org/licenses/LICENSE-2.0
11
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17*/
18
19package linux
20
21import (
22	"context"
23	"sync"
24
25	"github.com/containerd/cgroups"
26	eventstypes "github.com/containerd/containerd/api/events"
27	"github.com/containerd/containerd/api/types/task"
28	"github.com/containerd/containerd/errdefs"
29	"github.com/containerd/containerd/events/exchange"
30	"github.com/containerd/containerd/identifiers"
31	"github.com/containerd/containerd/log"
32	"github.com/containerd/containerd/runtime"
33	"github.com/containerd/containerd/runtime/v1/shim/client"
34	shim "github.com/containerd/containerd/runtime/v1/shim/v1"
35	runc "github.com/containerd/go-runc"
36	"github.com/containerd/ttrpc"
37	"github.com/containerd/typeurl"
38	"github.com/gogo/protobuf/types"
39	"github.com/pkg/errors"
40)
41
42// Task on a linux based system
43type Task struct {
44	mu        sync.Mutex
45	id        string
46	pid       int
47	shim      *client.Client
48	namespace string
49	cg        cgroups.Cgroup
50	events    *exchange.Exchange
51	tasks     *runtime.TaskList
52	bundle    *bundle
53}
54
55func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, runtime *runc.Runc, list *runtime.TaskList, bundle *bundle) (*Task, error) {
56	var (
57		err error
58		cg  cgroups.Cgroup
59	)
60	if pid > 0 {
61		cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
62		if err != nil && err != cgroups.ErrCgroupDeleted {
63			return nil, err
64		}
65	}
66	return &Task{
67		id:        id,
68		pid:       pid,
69		shim:      shim,
70		namespace: namespace,
71		cg:        cg,
72		events:    events,
73		tasks:     list,
74		bundle:    bundle,
75	}, nil
76}
77
78// ID of the task
79func (t *Task) ID() string {
80	return t.id
81}
82
83// Namespace of the task
84func (t *Task) Namespace() string {
85	return t.namespace
86}
87
88// Delete the task and return the exit status
89func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
90	rsp, err := t.shim.Delete(ctx, empty)
91	if err != nil {
92		return nil, errdefs.FromGRPC(err)
93	}
94	t.tasks.Delete(ctx, t.id)
95	if err := t.shim.KillShim(ctx); err != nil {
96		log.G(ctx).WithError(err).Error("failed to kill shim")
97	}
98	if err := t.bundle.Delete(); err != nil {
99		log.G(ctx).WithError(err).Error("failed to delete bundle")
100	}
101	t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
102		ContainerID: t.id,
103		ExitStatus:  rsp.ExitStatus,
104		ExitedAt:    rsp.ExitedAt,
105		Pid:         rsp.Pid,
106	})
107	return &runtime.Exit{
108		Status:    rsp.ExitStatus,
109		Timestamp: rsp.ExitedAt,
110		Pid:       rsp.Pid,
111	}, nil
112}
113
114// Start the task
115func (t *Task) Start(ctx context.Context) error {
116	t.mu.Lock()
117	hasCgroup := t.cg != nil
118	t.mu.Unlock()
119	r, err := t.shim.Start(ctx, &shim.StartRequest{
120		ID: t.id,
121	})
122	if err != nil {
123		return errdefs.FromGRPC(err)
124	}
125	t.pid = int(r.Pid)
126	if !hasCgroup {
127		cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid))
128		if err != nil {
129			return err
130		}
131		t.mu.Lock()
132		t.cg = cg
133		t.mu.Unlock()
134	}
135	t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
136		ContainerID: t.id,
137		Pid:         uint32(t.pid),
138	})
139	return nil
140}
141
142// State returns runtime information for the task
143func (t *Task) State(ctx context.Context) (runtime.State, error) {
144	response, err := t.shim.State(ctx, &shim.StateRequest{
145		ID: t.id,
146	})
147	if err != nil {
148		if errors.Cause(err) != ttrpc.ErrClosed {
149			return runtime.State{}, errdefs.FromGRPC(err)
150		}
151		return runtime.State{}, errdefs.ErrNotFound
152	}
153	var status runtime.Status
154	switch response.Status {
155	case task.StatusCreated:
156		status = runtime.CreatedStatus
157	case task.StatusRunning:
158		status = runtime.RunningStatus
159	case task.StatusStopped:
160		status = runtime.StoppedStatus
161	case task.StatusPaused:
162		status = runtime.PausedStatus
163	case task.StatusPausing:
164		status = runtime.PausingStatus
165	}
166	return runtime.State{
167		Pid:        response.Pid,
168		Status:     status,
169		Stdin:      response.Stdin,
170		Stdout:     response.Stdout,
171		Stderr:     response.Stderr,
172		Terminal:   response.Terminal,
173		ExitStatus: response.ExitStatus,
174		ExitedAt:   response.ExitedAt,
175	}, nil
176}
177
178// Pause the task and all processes
179func (t *Task) Pause(ctx context.Context) error {
180	if _, err := t.shim.Pause(ctx, empty); err != nil {
181		return errdefs.FromGRPC(err)
182	}
183	t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
184		ContainerID: t.id,
185	})
186	return nil
187}
188
189// Resume the task and all processes
190func (t *Task) Resume(ctx context.Context) error {
191	if _, err := t.shim.Resume(ctx, empty); err != nil {
192		return errdefs.FromGRPC(err)
193	}
194	t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
195		ContainerID: t.id,
196	})
197	return nil
198}
199
200// Kill the task using the provided signal
201//
202// Optionally send the signal to all processes that are a child of the task
203func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
204	if _, err := t.shim.Kill(ctx, &shim.KillRequest{
205		ID:     t.id,
206		Signal: signal,
207		All:    all,
208	}); err != nil {
209		return errdefs.FromGRPC(err)
210	}
211	return nil
212}
213
214// Exec creates a new process inside the task
215func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
216	if err := identifiers.Validate(id); err != nil {
217		return nil, errors.Wrapf(err, "invalid exec id")
218	}
219	request := &shim.ExecProcessRequest{
220		ID:       id,
221		Stdin:    opts.IO.Stdin,
222		Stdout:   opts.IO.Stdout,
223		Stderr:   opts.IO.Stderr,
224		Terminal: opts.IO.Terminal,
225		Spec:     opts.Spec,
226	}
227	if _, err := t.shim.Exec(ctx, request); err != nil {
228		return nil, errdefs.FromGRPC(err)
229	}
230	return &Process{
231		id: id,
232		t:  t,
233	}, nil
234}
235
236// Pids returns all system level process ids running inside the task
237func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
238	resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{
239		ID: t.id,
240	})
241	if err != nil {
242		return nil, errdefs.FromGRPC(err)
243	}
244	var processList []runtime.ProcessInfo
245	for _, p := range resp.Processes {
246		processList = append(processList, runtime.ProcessInfo{
247			Pid:  p.Pid,
248			Info: p.Info,
249		})
250	}
251	return processList, nil
252}
253
254// ResizePty changes the side of the task's PTY to the provided width and height
255func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
256	_, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
257		ID:     t.id,
258		Width:  size.Width,
259		Height: size.Height,
260	})
261	if err != nil {
262		err = errdefs.FromGRPC(err)
263	}
264	return err
265}
266
267// CloseIO closes the provided IO on the task
268func (t *Task) CloseIO(ctx context.Context) error {
269	_, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{
270		ID:    t.id,
271		Stdin: true,
272	})
273	if err != nil {
274		err = errdefs.FromGRPC(err)
275	}
276	return err
277}
278
279// Checkpoint creates a system level dump of the task and process information that can be later restored
280func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error {
281	r := &shim.CheckpointTaskRequest{
282		Path:    path,
283		Options: options,
284	}
285	if _, err := t.shim.Checkpoint(ctx, r); err != nil {
286		return errdefs.FromGRPC(err)
287	}
288	t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
289		ContainerID: t.id,
290	})
291	return nil
292}
293
294// Update changes runtime information of a running task
295func (t *Task) Update(ctx context.Context, resources *types.Any) error {
296	if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{
297		Resources: resources,
298	}); err != nil {
299		return errdefs.FromGRPC(err)
300	}
301	return nil
302}
303
304// Process returns a specific process inside the task by the process id
305func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
306	p := &Process{
307		id: id,
308		t:  t,
309	}
310	if _, err := p.State(ctx); err != nil {
311		return nil, err
312	}
313	return p, nil
314}
315
316// Stats returns runtime specific system level metric information for the task
317func (t *Task) Stats(ctx context.Context) (*types.Any, error) {
318	t.mu.Lock()
319	defer t.mu.Unlock()
320	if t.cg == nil {
321		return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
322	}
323	stats, err := t.cg.Stat(cgroups.IgnoreNotExist)
324	if err != nil {
325		return nil, err
326	}
327	return typeurl.MarshalAny(stats)
328}
329
330// Cgroup returns the underlying cgroup for a linux task
331func (t *Task) Cgroup() (cgroups.Cgroup, error) {
332	t.mu.Lock()
333	defer t.mu.Unlock()
334	if t.cg == nil {
335		return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
336	}
337	return t.cg, nil
338}
339
340// Wait for the task to exit returning the status and timestamp
341func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
342	r, err := t.shim.Wait(ctx, &shim.WaitRequest{
343		ID: t.id,
344	})
345	if err != nil {
346		return nil, err
347	}
348	return &runtime.Exit{
349		Timestamp: r.ExitedAt,
350		Status:    r.ExitStatus,
351	}, nil
352}
353