1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package v2
18
19import (
20	"context"
21	"io"
22	"io/ioutil"
23	"os"
24	"path/filepath"
25	"time"
26
27	eventstypes "github.com/containerd/containerd/api/events"
28	"github.com/containerd/containerd/api/types"
29	tasktypes "github.com/containerd/containerd/api/types/task"
30	"github.com/containerd/containerd/errdefs"
31	"github.com/containerd/containerd/events/exchange"
32	"github.com/containerd/containerd/identifiers"
33	"github.com/containerd/containerd/log"
34	"github.com/containerd/containerd/namespaces"
35	"github.com/containerd/containerd/pkg/timeout"
36	"github.com/containerd/containerd/runtime"
37	client "github.com/containerd/containerd/runtime/v2/shim"
38	"github.com/containerd/containerd/runtime/v2/task"
39	"github.com/containerd/ttrpc"
40	ptypes "github.com/gogo/protobuf/types"
41	"github.com/pkg/errors"
42	"github.com/sirupsen/logrus"
43)
44
45const (
46	loadTimeout     = "io.containerd.timeout.shim.load"
47	cleanupTimeout  = "io.containerd.timeout.shim.cleanup"
48	shutdownTimeout = "io.containerd.timeout.shim.shutdown"
49)
50
51func init() {
52	timeout.Set(loadTimeout, 5*time.Second)
53	timeout.Set(cleanupTimeout, 5*time.Second)
54	timeout.Set(shutdownTimeout, 3*time.Second)
55}
56
57func loadAddress(path string) (string, error) {
58	data, err := ioutil.ReadFile(path)
59	if err != nil {
60		return "", err
61	}
62	return string(data), nil
63}
64
65func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) {
66	address, err := loadAddress(filepath.Join(bundle.Path, "address"))
67	if err != nil {
68		return nil, err
69	}
70	conn, err := client.Connect(address, client.AnonReconnectDialer)
71	if err != nil {
72		return nil, err
73	}
74	defer func() {
75		if err != nil {
76			conn.Close()
77		}
78	}()
79	f, err := openShimLog(ctx, bundle, client.AnonReconnectDialer)
80	if err != nil {
81		return nil, errors.Wrap(err, "open shim log pipe")
82	}
83	defer func() {
84		if err != nil {
85			f.Close()
86		}
87	}()
88	// open the log pipe and block until the writer is ready
89	// this helps with synchronization of the shim
90	// copy the shim's logs to containerd's output
91	go func() {
92		defer f.Close()
93		if _, err := io.Copy(os.Stderr, f); err != nil {
94			// When using a multi-container shim the 2nd to Nth container in the
95			// shim will not have a separate log pipe. Ignore the failure log
96			// message here when the shim connect times out.
97			if !errors.Is(err, os.ErrNotExist) {
98				log.G(ctx).WithError(err).Error("copy shim log")
99			}
100		}
101	}()
102
103	client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
104	defer func() {
105		if err != nil {
106			client.Close()
107		}
108	}()
109	s := &shim{
110		client:  client,
111		task:    task.NewTaskClient(client),
112		bundle:  bundle,
113		events:  events,
114		rtTasks: rt,
115	}
116	ctx, cancel := timeout.WithContext(ctx, loadTimeout)
117	defer cancel()
118	if err := s.Connect(ctx); err != nil {
119		return nil, err
120	}
121	return s, nil
122}
123
124func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
125	ctx = namespaces.WithNamespace(ctx, ns)
126	ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
127	defer cancel()
128
129	log.G(ctx).WithFields(logrus.Fields{
130		"id":        id,
131		"namespace": ns,
132	}).Warn("cleaning up after shim disconnected")
133	response, err := binaryCall.Delete(ctx)
134	if err != nil {
135		log.G(ctx).WithError(err).WithFields(logrus.Fields{
136			"id":        id,
137			"namespace": ns,
138		}).Warn("failed to clean up after shim disconnected")
139	}
140
141	var (
142		pid        uint32
143		exitStatus uint32
144		exitedAt   time.Time
145	)
146	if response != nil {
147		pid = response.Pid
148		exitStatus = response.Status
149		exitedAt = response.Timestamp
150	} else {
151		exitStatus = 255
152		exitedAt = time.Now()
153	}
154	events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
155		ContainerID: id,
156		ID:          id,
157		Pid:         pid,
158		ExitStatus:  exitStatus,
159		ExitedAt:    exitedAt,
160	})
161
162	events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
163		ContainerID: id,
164		Pid:         pid,
165		ExitStatus:  exitStatus,
166		ExitedAt:    exitedAt,
167	})
168}
169
170type shim struct {
171	bundle  *Bundle
172	client  *ttrpc.Client
173	task    task.TaskService
174	taskPid int
175	events  *exchange.Exchange
176	rtTasks *runtime.TaskList
177}
178
179func (s *shim) Connect(ctx context.Context) error {
180	response, err := s.task.Connect(ctx, &task.ConnectRequest{
181		ID: s.ID(),
182	})
183	if err != nil {
184		return err
185	}
186	s.taskPid = int(response.TaskPid)
187	return nil
188}
189
190func (s *shim) Shutdown(ctx context.Context) error {
191	_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{
192		ID: s.ID(),
193	})
194	if err != nil && !errors.Is(err, ttrpc.ErrClosed) {
195		return errdefs.FromGRPC(err)
196	}
197	return nil
198}
199
200func (s *shim) waitShutdown(ctx context.Context) error {
201	ctx, cancel := timeout.WithContext(ctx, shutdownTimeout)
202	defer cancel()
203	return s.Shutdown(ctx)
204}
205
206// ID of the shim/task
207func (s *shim) ID() string {
208	return s.bundle.ID
209}
210
211// PID of the task
212func (s *shim) PID() uint32 {
213	return uint32(s.taskPid)
214}
215
216func (s *shim) Namespace() string {
217	return s.bundle.Namespace
218}
219
220func (s *shim) Close() error {
221	return s.client.Close()
222}
223
224func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
225	response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
226		ID: s.ID(),
227	})
228	if shimErr != nil {
229		log.G(ctx).WithField("id", s.ID()).WithError(shimErr).Debug("failed to delete task")
230		if !errors.Is(shimErr, ttrpc.ErrClosed) {
231			shimErr = errdefs.FromGRPC(shimErr)
232			if !errdefs.IsNotFound(shimErr) {
233				return nil, shimErr
234			}
235		}
236	}
237	// remove self from the runtime task list
238	// this seems dirty but it cleans up the API across runtimes, tasks, and the service
239	s.rtTasks.Delete(ctx, s.ID())
240	if err := s.waitShutdown(ctx); err != nil {
241		log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim")
242	}
243	s.Close()
244	if err := s.bundle.Delete(); err != nil {
245		log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
246	}
247	if shimErr != nil {
248		return nil, shimErr
249	}
250	return &runtime.Exit{
251		Status:    response.ExitStatus,
252		Timestamp: response.ExitedAt,
253		Pid:       response.Pid,
254	}, nil
255}
256
257func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
258	topts := opts.TaskOptions
259	if topts == nil {
260		topts = opts.RuntimeOptions
261	}
262	request := &task.CreateTaskRequest{
263		ID:         s.ID(),
264		Bundle:     s.bundle.Path,
265		Stdin:      opts.IO.Stdin,
266		Stdout:     opts.IO.Stdout,
267		Stderr:     opts.IO.Stderr,
268		Terminal:   opts.IO.Terminal,
269		Checkpoint: opts.Checkpoint,
270		Options:    topts,
271	}
272	for _, m := range opts.Rootfs {
273		request.Rootfs = append(request.Rootfs, &types.Mount{
274			Type:    m.Type,
275			Source:  m.Source,
276			Options: m.Options,
277		})
278	}
279	response, err := s.task.Create(ctx, request)
280	if err != nil {
281		return nil, errdefs.FromGRPC(err)
282	}
283	s.taskPid = int(response.Pid)
284	return s, nil
285}
286
287func (s *shim) Pause(ctx context.Context) error {
288	if _, err := s.task.Pause(ctx, &task.PauseRequest{
289		ID: s.ID(),
290	}); err != nil {
291		return errdefs.FromGRPC(err)
292	}
293	return nil
294}
295
296func (s *shim) Resume(ctx context.Context) error {
297	if _, err := s.task.Resume(ctx, &task.ResumeRequest{
298		ID: s.ID(),
299	}); err != nil {
300		return errdefs.FromGRPC(err)
301	}
302	return nil
303}
304
305func (s *shim) Start(ctx context.Context) error {
306	response, err := s.task.Start(ctx, &task.StartRequest{
307		ID: s.ID(),
308	})
309	if err != nil {
310		return errdefs.FromGRPC(err)
311	}
312	s.taskPid = int(response.Pid)
313	return nil
314}
315
316func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
317	if _, err := s.task.Kill(ctx, &task.KillRequest{
318		ID:     s.ID(),
319		Signal: signal,
320		All:    all,
321	}); err != nil {
322		return errdefs.FromGRPC(err)
323	}
324	return nil
325}
326
327func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
328	if err := identifiers.Validate(id); err != nil {
329		return nil, errors.Wrapf(err, "invalid exec id %s", id)
330	}
331	request := &task.ExecProcessRequest{
332		ID:       s.ID(),
333		ExecID:   id,
334		Stdin:    opts.IO.Stdin,
335		Stdout:   opts.IO.Stdout,
336		Stderr:   opts.IO.Stderr,
337		Terminal: opts.IO.Terminal,
338		Spec:     opts.Spec,
339	}
340	if _, err := s.task.Exec(ctx, request); err != nil {
341		return nil, errdefs.FromGRPC(err)
342	}
343	return &process{
344		id:   id,
345		shim: s,
346	}, nil
347}
348
349func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
350	resp, err := s.task.Pids(ctx, &task.PidsRequest{
351		ID: s.ID(),
352	})
353	if err != nil {
354		return nil, errdefs.FromGRPC(err)
355	}
356	var processList []runtime.ProcessInfo
357	for _, p := range resp.Processes {
358		processList = append(processList, runtime.ProcessInfo{
359			Pid:  p.Pid,
360			Info: p.Info,
361		})
362	}
363	return processList, nil
364}
365
366func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
367	_, err := s.task.ResizePty(ctx, &task.ResizePtyRequest{
368		ID:     s.ID(),
369		Width:  size.Width,
370		Height: size.Height,
371	})
372	if err != nil {
373		return errdefs.FromGRPC(err)
374	}
375	return nil
376}
377
378func (s *shim) CloseIO(ctx context.Context) error {
379	_, err := s.task.CloseIO(ctx, &task.CloseIORequest{
380		ID:    s.ID(),
381		Stdin: true,
382	})
383	if err != nil {
384		return errdefs.FromGRPC(err)
385	}
386	return nil
387}
388
389func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
390	response, err := s.task.Wait(ctx, &task.WaitRequest{
391		ID: s.ID(),
392	})
393	if err != nil {
394		return nil, errdefs.FromGRPC(err)
395	}
396	return &runtime.Exit{
397		Pid:       uint32(s.taskPid),
398		Timestamp: response.ExitedAt,
399		Status:    response.ExitStatus,
400	}, nil
401}
402
403func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error {
404	request := &task.CheckpointTaskRequest{
405		ID:      s.ID(),
406		Path:    path,
407		Options: options,
408	}
409	if _, err := s.task.Checkpoint(ctx, request); err != nil {
410		return errdefs.FromGRPC(err)
411	}
412	return nil
413}
414
415func (s *shim) Update(ctx context.Context, resources *ptypes.Any) error {
416	if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{
417		ID:        s.ID(),
418		Resources: resources,
419	}); err != nil {
420		return errdefs.FromGRPC(err)
421	}
422	return nil
423}
424
425func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
426	response, err := s.task.Stats(ctx, &task.StatsRequest{
427		ID: s.ID(),
428	})
429	if err != nil {
430		return nil, errdefs.FromGRPC(err)
431	}
432	return response.Stats, nil
433}
434
435func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) {
436	p := &process{
437		id:   id,
438		shim: s,
439	}
440	if _, err := p.State(ctx); err != nil {
441		return nil, err
442	}
443	return p, nil
444}
445
446func (s *shim) State(ctx context.Context) (runtime.State, error) {
447	response, err := s.task.State(ctx, &task.StateRequest{
448		ID: s.ID(),
449	})
450	if err != nil {
451		if !errors.Is(err, ttrpc.ErrClosed) {
452			return runtime.State{}, errdefs.FromGRPC(err)
453		}
454		return runtime.State{}, errdefs.ErrNotFound
455	}
456	var status runtime.Status
457	switch response.Status {
458	case tasktypes.StatusCreated:
459		status = runtime.CreatedStatus
460	case tasktypes.StatusRunning:
461		status = runtime.RunningStatus
462	case tasktypes.StatusStopped:
463		status = runtime.StoppedStatus
464	case tasktypes.StatusPaused:
465		status = runtime.PausedStatus
466	case tasktypes.StatusPausing:
467		status = runtime.PausingStatus
468	}
469	return runtime.State{
470		Pid:        response.Pid,
471		Status:     status,
472		Stdin:      response.Stdin,
473		Stdout:     response.Stdout,
474		Stderr:     response.Stderr,
475		Terminal:   response.Terminal,
476		ExitStatus: response.ExitStatus,
477		ExitedAt:   response.ExitedAt,
478	}, nil
479}
480