1// +build linux
2
3package linux
4
5import (
6	"context"
7	"sync"
8
9	"github.com/pkg/errors"
10	"google.golang.org/grpc"
11
12	"github.com/containerd/cgroups"
13	eventstypes "github.com/containerd/containerd/api/events"
14	"github.com/containerd/containerd/api/types/task"
15	"github.com/containerd/containerd/errdefs"
16	"github.com/containerd/containerd/events/exchange"
17	"github.com/containerd/containerd/identifiers"
18	"github.com/containerd/containerd/linux/shim/client"
19	shim "github.com/containerd/containerd/linux/shim/v1"
20	"github.com/containerd/containerd/runtime"
21	runc "github.com/containerd/go-runc"
22	"github.com/gogo/protobuf/types"
23)
24
25// Task on a linux based system
26type Task struct {
27	mu        sync.Mutex
28	id        string
29	pid       int
30	shim      *client.Client
31	namespace string
32	cg        cgroups.Cgroup
33	monitor   runtime.TaskMonitor
34	events    *exchange.Exchange
35	runtime   *runc.Runc
36}
37
38func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange, runtime *runc.Runc) (*Task, error) {
39	var (
40		err error
41		cg  cgroups.Cgroup
42	)
43	if pid > 0 {
44		cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
45		if err != nil && err != cgroups.ErrCgroupDeleted {
46			return nil, err
47		}
48	}
49	return &Task{
50		id:        id,
51		pid:       pid,
52		shim:      shim,
53		namespace: namespace,
54		cg:        cg,
55		monitor:   monitor,
56		events:    events,
57		runtime:   runtime,
58	}, nil
59}
60
61// ID of the task
62func (t *Task) ID() string {
63	return t.id
64}
65
66// Info returns task information about the runtime and namespace
67func (t *Task) Info() runtime.TaskInfo {
68	return runtime.TaskInfo{
69		ID:        t.id,
70		Runtime:   pluginID,
71		Namespace: t.namespace,
72	}
73}
74
75// Start the task
76func (t *Task) Start(ctx context.Context) error {
77	t.mu.Lock()
78	hasCgroup := t.cg != nil
79	t.mu.Unlock()
80	r, err := t.shim.Start(ctx, &shim.StartRequest{
81		ID: t.id,
82	})
83	if err != nil {
84		return errdefs.FromGRPC(err)
85	}
86	t.pid = int(r.Pid)
87	if !hasCgroup {
88		cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid))
89		if err != nil {
90			return err
91		}
92		t.mu.Lock()
93		t.cg = cg
94		t.mu.Unlock()
95		if err := t.monitor.Monitor(t); err != nil {
96			return err
97		}
98	}
99	t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
100		ContainerID: t.id,
101		Pid:         uint32(t.pid),
102	})
103	return nil
104}
105
106// State returns runtime information for the task
107func (t *Task) State(ctx context.Context) (runtime.State, error) {
108	response, err := t.shim.State(ctx, &shim.StateRequest{
109		ID: t.id,
110	})
111	if err != nil {
112		if err != grpc.ErrServerStopped {
113			return runtime.State{}, errdefs.FromGRPC(err)
114		}
115		return runtime.State{}, errdefs.ErrNotFound
116	}
117	var status runtime.Status
118	switch response.Status {
119	case task.StatusCreated:
120		status = runtime.CreatedStatus
121	case task.StatusRunning:
122		status = runtime.RunningStatus
123	case task.StatusStopped:
124		status = runtime.StoppedStatus
125	case task.StatusPaused:
126		status = runtime.PausedStatus
127	case task.StatusPausing:
128		status = runtime.PausingStatus
129	}
130	return runtime.State{
131		Pid:        response.Pid,
132		Status:     status,
133		Stdin:      response.Stdin,
134		Stdout:     response.Stdout,
135		Stderr:     response.Stderr,
136		Terminal:   response.Terminal,
137		ExitStatus: response.ExitStatus,
138		ExitedAt:   response.ExitedAt,
139	}, nil
140}
141
142// Pause the task and all processes
143func (t *Task) Pause(ctx context.Context) error {
144	if _, err := t.shim.Pause(ctx, empty); err != nil {
145		return errdefs.FromGRPC(err)
146	}
147	t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
148		ContainerID: t.id,
149	})
150	return nil
151}
152
153// Resume the task and all processes
154func (t *Task) Resume(ctx context.Context) error {
155	if _, err := t.shim.Resume(ctx, empty); err != nil {
156		return errdefs.FromGRPC(err)
157	}
158	t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
159		ContainerID: t.id,
160	})
161	return nil
162}
163
164// Kill the task using the provided signal
165//
166// Optionally send the signal to all processes that are a child of the task
167func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
168	if _, err := t.shim.Kill(ctx, &shim.KillRequest{
169		ID:     t.id,
170		Signal: signal,
171		All:    all,
172	}); err != nil {
173		return errdefs.FromGRPC(err)
174	}
175	return nil
176}
177
178// Exec creates a new process inside the task
179func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
180	if err := identifiers.Validate(id); err != nil {
181		return nil, errors.Wrapf(err, "invalid exec id")
182	}
183	request := &shim.ExecProcessRequest{
184		ID:       id,
185		Stdin:    opts.IO.Stdin,
186		Stdout:   opts.IO.Stdout,
187		Stderr:   opts.IO.Stderr,
188		Terminal: opts.IO.Terminal,
189		Spec:     opts.Spec,
190	}
191	if _, err := t.shim.Exec(ctx, request); err != nil {
192		return nil, errdefs.FromGRPC(err)
193	}
194	return &Process{
195		id: id,
196		t:  t,
197	}, nil
198}
199
200// Pids returns all system level process ids running inside the task
201func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
202	resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{
203		ID: t.id,
204	})
205	if err != nil {
206		return nil, errdefs.FromGRPC(err)
207	}
208	var processList []runtime.ProcessInfo
209	for _, p := range resp.Processes {
210		processList = append(processList, runtime.ProcessInfo{
211			Pid:  p.Pid,
212			Info: p.Info,
213		})
214	}
215	return processList, nil
216}
217
218// ResizePty changes the side of the task's PTY to the provided width and height
219func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
220	_, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
221		ID:     t.id,
222		Width:  size.Width,
223		Height: size.Height,
224	})
225	if err != nil {
226		err = errdefs.FromGRPC(err)
227	}
228	return err
229}
230
231// CloseIO closes the provided IO on the task
232func (t *Task) CloseIO(ctx context.Context) error {
233	_, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{
234		ID:    t.id,
235		Stdin: true,
236	})
237	if err != nil {
238		err = errdefs.FromGRPC(err)
239	}
240	return err
241}
242
243// Checkpoint creates a system level dump of the task and process information that can be later restored
244func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error {
245	r := &shim.CheckpointTaskRequest{
246		Path:    path,
247		Options: options,
248	}
249	if _, err := t.shim.Checkpoint(ctx, r); err != nil {
250		return errdefs.FromGRPC(err)
251	}
252	t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
253		ContainerID: t.id,
254	})
255	return nil
256}
257
258// DeleteProcess removes the provided process from the task and deletes all on disk state
259func (t *Task) DeleteProcess(ctx context.Context, id string) (*runtime.Exit, error) {
260	r, err := t.shim.DeleteProcess(ctx, &shim.DeleteProcessRequest{
261		ID: id,
262	})
263	if err != nil {
264		return nil, errdefs.FromGRPC(err)
265	}
266	return &runtime.Exit{
267		Status:    r.ExitStatus,
268		Timestamp: r.ExitedAt,
269		Pid:       r.Pid,
270	}, nil
271}
272
273// Update changes runtime information of a running task
274func (t *Task) Update(ctx context.Context, resources *types.Any) error {
275	if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{
276		Resources: resources,
277	}); err != nil {
278		return errdefs.FromGRPC(err)
279	}
280	return nil
281}
282
283// Process returns a specific process inside the task by the process id
284func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
285	// TODO: verify process exists for container
286	return &Process{
287		id: id,
288		t:  t,
289	}, nil
290}
291
292// Metrics returns runtime specific system level metric information for the task
293func (t *Task) Metrics(ctx context.Context) (interface{}, error) {
294	t.mu.Lock()
295	defer t.mu.Unlock()
296	if t.cg == nil {
297		return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
298	}
299	stats, err := t.cg.Stat(cgroups.IgnoreNotExist)
300	if err != nil {
301		return nil, err
302	}
303	return stats, nil
304}
305
306// Cgroup returns the underlying cgroup for a linux task
307func (t *Task) Cgroup() (cgroups.Cgroup, error) {
308	t.mu.Lock()
309	defer t.mu.Unlock()
310	if t.cg == nil {
311		return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
312	}
313	return t.cg, nil
314}
315
316// Wait for the task to exit returning the status and timestamp
317func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
318	r, err := t.shim.Wait(ctx, &shim.WaitRequest{
319		ID: t.id,
320	})
321	if err != nil {
322		return nil, err
323	}
324	return &runtime.Exit{
325		Timestamp: r.ExitedAt,
326		Status:    r.ExitStatus,
327	}, nil
328}
329