1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package cio
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"io"
24	"net/url"
25	"os"
26	"path/filepath"
27	"strings"
28	"sync"
29
30	"github.com/containerd/containerd/defaults"
31)
32
33var bufPool = sync.Pool{
34	New: func() interface{} {
35		buffer := make([]byte, 32<<10)
36		return &buffer
37	},
38}
39
40// Config holds the IO configurations.
41type Config struct {
42	// Terminal is true if one has been allocated
43	Terminal bool
44	// Stdin path
45	Stdin string
46	// Stdout path
47	Stdout string
48	// Stderr path
49	Stderr string
50}
51
52// IO holds the io information for a task or process
53type IO interface {
54	// Config returns the IO configuration.
55	Config() Config
56	// Cancel aborts all current io operations.
57	Cancel()
58	// Wait blocks until all io copy operations have completed.
59	Wait()
60	// Close cleans up all open io resources. Cancel() is always called before
61	// Close()
62	Close() error
63}
64
65// Creator creates new IO sets for a task
66type Creator func(id string) (IO, error)
67
68// Attach allows callers to reattach to running tasks
69//
70// There should only be one reader for a task's IO set
71// because fifo's can only be read from one reader or the output
72// will be sent only to the first reads
73type Attach func(*FIFOSet) (IO, error)
74
75// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
76type FIFOSet struct {
77	Config
78	close func() error
79}
80
81// Close the FIFOSet
82func (f *FIFOSet) Close() error {
83	if f != nil && f.close != nil {
84		return f.close()
85	}
86	return nil
87}
88
89// NewFIFOSet returns a new FIFOSet from a Config and a close function
90func NewFIFOSet(config Config, close func() error) *FIFOSet {
91	return &FIFOSet{Config: config, close: close}
92}
93
94// Streams used to configure a Creator or Attach
95type Streams struct {
96	Stdin    io.Reader
97	Stdout   io.Writer
98	Stderr   io.Writer
99	Terminal bool
100	FIFODir  string
101}
102
103// Opt customize options for creating a Creator or Attach
104type Opt func(*Streams)
105
106// WithStdio sets stream options to the standard input/output streams
107func WithStdio(opt *Streams) {
108	WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
109}
110
111// WithTerminal sets the terminal option
112func WithTerminal(opt *Streams) {
113	opt.Terminal = true
114}
115
116// WithStreams sets the stream options to the specified Reader and Writers
117func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
118	return func(opt *Streams) {
119		opt.Stdin = stdin
120		opt.Stdout = stdout
121		opt.Stderr = stderr
122	}
123}
124
125// WithFIFODir sets the fifo directory.
126// e.g. "/run/containerd/fifo", "/run/users/1001/containerd/fifo"
127func WithFIFODir(dir string) Opt {
128	return func(opt *Streams) {
129		opt.FIFODir = dir
130	}
131}
132
133// NewCreator returns an IO creator from the options
134func NewCreator(opts ...Opt) Creator {
135	streams := &Streams{}
136	for _, opt := range opts {
137		opt(streams)
138	}
139	if streams.FIFODir == "" {
140		streams.FIFODir = defaults.DefaultFIFODir
141	}
142	return func(id string) (IO, error) {
143		fifos, err := NewFIFOSetInDir(streams.FIFODir, id, streams.Terminal)
144		if err != nil {
145			return nil, err
146		}
147		if streams.Stdin == nil {
148			fifos.Stdin = ""
149		}
150		if streams.Stdout == nil {
151			fifos.Stdout = ""
152		}
153		if streams.Stderr == nil {
154			fifos.Stderr = ""
155		}
156		return copyIO(fifos, streams)
157	}
158}
159
160// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
161func NewAttach(opts ...Opt) Attach {
162	streams := &Streams{}
163	for _, opt := range opts {
164		opt(streams)
165	}
166	return func(fifos *FIFOSet) (IO, error) {
167		if fifos == nil {
168			return nil, fmt.Errorf("cannot attach, missing fifos")
169		}
170		return copyIO(fifos, streams)
171	}
172}
173
174// NullIO redirects the container's IO into /dev/null
175func NullIO(_ string) (IO, error) {
176	return &cio{}, nil
177}
178
179// cio is a basic container IO implementation.
180type cio struct {
181	config  Config
182	wg      *sync.WaitGroup
183	closers []io.Closer
184	cancel  context.CancelFunc
185}
186
187func (c *cio) Config() Config {
188	return c.config
189}
190
191func (c *cio) Wait() {
192	if c.wg != nil {
193		c.wg.Wait()
194	}
195}
196
197func (c *cio) Close() error {
198	var lastErr error
199	for _, closer := range c.closers {
200		if closer == nil {
201			continue
202		}
203		if err := closer.Close(); err != nil {
204			lastErr = err
205		}
206	}
207	return lastErr
208}
209
210func (c *cio) Cancel() {
211	if c.cancel != nil {
212		c.cancel()
213	}
214}
215
216type pipes struct {
217	Stdin  io.WriteCloser
218	Stdout io.ReadCloser
219	Stderr io.ReadCloser
220}
221
222// DirectIO allows task IO to be handled externally by the caller
223type DirectIO struct {
224	pipes
225	cio
226}
227
228var (
229	_ IO = &DirectIO{}
230	_ IO = &logURI{}
231)
232
233// LogURI provides the raw logging URI
234func LogURI(uri *url.URL) Creator {
235	return func(_ string) (IO, error) {
236		return &logURI{
237			config: Config{
238				Stdout: uri.String(),
239				Stderr: uri.String(),
240			},
241		}, nil
242	}
243}
244
245// BinaryIO forwards container STDOUT|STDERR directly to a logging binary
246func BinaryIO(binary string, args map[string]string) Creator {
247	return func(_ string) (IO, error) {
248		uri, err := LogURIGenerator("binary", binary, args)
249		if err != nil {
250			return nil, err
251		}
252
253		res := uri.String()
254		return &logURI{
255			config: Config{
256				Stdout: res,
257				Stderr: res,
258			},
259		}, nil
260	}
261}
262
263// TerminalBinaryIO forwards container STDOUT|STDERR directly to a logging binary
264// It also sets the terminal option to true
265func TerminalBinaryIO(binary string, args map[string]string) Creator {
266	return func(_ string) (IO, error) {
267		uri, err := LogURIGenerator("binary", binary, args)
268		if err != nil {
269			return nil, err
270		}
271
272		res := uri.String()
273		return &logURI{
274			config: Config{
275				Stdout:   res,
276				Stderr:   res,
277				Terminal: true,
278			},
279		}, nil
280	}
281}
282
283// LogFile creates a file on disk that logs the task's STDOUT,STDERR.
284// If the log file already exists, the logs will be appended to the file.
285func LogFile(path string) Creator {
286	return func(_ string) (IO, error) {
287		uri, err := LogURIGenerator("file", path, nil)
288		if err != nil {
289			return nil, err
290		}
291
292		res := uri.String()
293		return &logURI{
294			config: Config{
295				Stdout: res,
296				Stderr: res,
297			},
298		}, nil
299	}
300}
301
302// LogURIGenerator is the helper to generate log uri with specific scheme.
303func LogURIGenerator(scheme string, path string, args map[string]string) (*url.URL, error) {
304	path = filepath.Clean(path)
305	if !strings.HasPrefix(path, "/") {
306		return nil, errors.New("absolute path needed")
307	}
308
309	uri := &url.URL{
310		Scheme: scheme,
311		Path:   path,
312	}
313
314	if len(args) == 0 {
315		return uri, nil
316	}
317
318	q := uri.Query()
319	for k, v := range args {
320		q.Set(k, v)
321	}
322	uri.RawQuery = q.Encode()
323	return uri, nil
324}
325
326type logURI struct {
327	config Config
328}
329
330func (l *logURI) Config() Config {
331	return l.config
332}
333
334func (l *logURI) Cancel() {
335
336}
337
338func (l *logURI) Wait() {
339
340}
341
342func (l *logURI) Close() error {
343	return nil
344}
345
346// Load the io for a container but do not attach
347//
348// Allows io to be loaded on the task for deletion without
349// starting copy routines
350func Load(set *FIFOSet) (IO, error) {
351	return &cio{
352		config:  set.Config,
353		closers: []io.Closer{set},
354	}, nil
355}
356
357func (p *pipes) closers() []io.Closer {
358	return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
359}
360