1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17package cio 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "io" 24 "net/url" 25 "os" 26 "path/filepath" 27 "strings" 28 "sync" 29 30 "github.com/containerd/containerd/defaults" 31) 32 33var bufPool = sync.Pool{ 34 New: func() interface{} { 35 buffer := make([]byte, 32<<10) 36 return &buffer 37 }, 38} 39 40// Config holds the IO configurations. 41type Config struct { 42 // Terminal is true if one has been allocated 43 Terminal bool 44 // Stdin path 45 Stdin string 46 // Stdout path 47 Stdout string 48 // Stderr path 49 Stderr string 50} 51 52// IO holds the io information for a task or process 53type IO interface { 54 // Config returns the IO configuration. 55 Config() Config 56 // Cancel aborts all current io operations. 57 Cancel() 58 // Wait blocks until all io copy operations have completed. 59 Wait() 60 // Close cleans up all open io resources. Cancel() is always called before 61 // Close() 62 Close() error 63} 64 65// Creator creates new IO sets for a task 66type Creator func(id string) (IO, error) 67 68// Attach allows callers to reattach to running tasks 69// 70// There should only be one reader for a task's IO set 71// because fifo's can only be read from one reader or the output 72// will be sent only to the first reads 73type Attach func(*FIFOSet) (IO, error) 74 75// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams 76type FIFOSet struct { 77 Config 78 close func() error 79} 80 81// Close the FIFOSet 82func (f *FIFOSet) Close() error { 83 if f != nil && f.close != nil { 84 return f.close() 85 } 86 return nil 87} 88 89// NewFIFOSet returns a new FIFOSet from a Config and a close function 90func NewFIFOSet(config Config, close func() error) *FIFOSet { 91 return &FIFOSet{Config: config, close: close} 92} 93 94// Streams used to configure a Creator or Attach 95type Streams struct { 96 Stdin io.Reader 97 Stdout io.Writer 98 Stderr io.Writer 99 Terminal bool 100 FIFODir string 101} 102 103// Opt customize options for creating a Creator or Attach 104type Opt func(*Streams) 105 106// WithStdio sets stream options to the standard input/output streams 107func WithStdio(opt *Streams) { 108 WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt) 109} 110 111// WithTerminal sets the terminal option 112func WithTerminal(opt *Streams) { 113 opt.Terminal = true 114} 115 116// WithStreams sets the stream options to the specified Reader and Writers 117func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt { 118 return func(opt *Streams) { 119 opt.Stdin = stdin 120 opt.Stdout = stdout 121 opt.Stderr = stderr 122 } 123} 124 125// WithFIFODir sets the fifo directory. 126// e.g. "/run/containerd/fifo", "/run/users/1001/containerd/fifo" 127func WithFIFODir(dir string) Opt { 128 return func(opt *Streams) { 129 opt.FIFODir = dir 130 } 131} 132 133// NewCreator returns an IO creator from the options 134func NewCreator(opts ...Opt) Creator { 135 streams := &Streams{} 136 for _, opt := range opts { 137 opt(streams) 138 } 139 if streams.FIFODir == "" { 140 streams.FIFODir = defaults.DefaultFIFODir 141 } 142 return func(id string) (IO, error) { 143 fifos, err := NewFIFOSetInDir(streams.FIFODir, id, streams.Terminal) 144 if err != nil { 145 return nil, err 146 } 147 if streams.Stdin == nil { 148 fifos.Stdin = "" 149 } 150 if streams.Stdout == nil { 151 fifos.Stdout = "" 152 } 153 if streams.Stderr == nil { 154 fifos.Stderr = "" 155 } 156 return copyIO(fifos, streams) 157 } 158} 159 160// NewAttach attaches the existing io for a task to the provided io.Reader/Writers 161func NewAttach(opts ...Opt) Attach { 162 streams := &Streams{} 163 for _, opt := range opts { 164 opt(streams) 165 } 166 return func(fifos *FIFOSet) (IO, error) { 167 if fifos == nil { 168 return nil, fmt.Errorf("cannot attach, missing fifos") 169 } 170 return copyIO(fifos, streams) 171 } 172} 173 174// NullIO redirects the container's IO into /dev/null 175func NullIO(_ string) (IO, error) { 176 return &cio{}, nil 177} 178 179// cio is a basic container IO implementation. 180type cio struct { 181 config Config 182 wg *sync.WaitGroup 183 closers []io.Closer 184 cancel context.CancelFunc 185} 186 187func (c *cio) Config() Config { 188 return c.config 189} 190 191func (c *cio) Wait() { 192 if c.wg != nil { 193 c.wg.Wait() 194 } 195} 196 197func (c *cio) Close() error { 198 var lastErr error 199 for _, closer := range c.closers { 200 if closer == nil { 201 continue 202 } 203 if err := closer.Close(); err != nil { 204 lastErr = err 205 } 206 } 207 return lastErr 208} 209 210func (c *cio) Cancel() { 211 if c.cancel != nil { 212 c.cancel() 213 } 214} 215 216type pipes struct { 217 Stdin io.WriteCloser 218 Stdout io.ReadCloser 219 Stderr io.ReadCloser 220} 221 222// DirectIO allows task IO to be handled externally by the caller 223type DirectIO struct { 224 pipes 225 cio 226} 227 228var ( 229 _ IO = &DirectIO{} 230 _ IO = &logURI{} 231) 232 233// LogURI provides the raw logging URI 234func LogURI(uri *url.URL) Creator { 235 return func(_ string) (IO, error) { 236 return &logURI{ 237 config: Config{ 238 Stdout: uri.String(), 239 Stderr: uri.String(), 240 }, 241 }, nil 242 } 243} 244 245// BinaryIO forwards container STDOUT|STDERR directly to a logging binary 246func BinaryIO(binary string, args map[string]string) Creator { 247 return func(_ string) (IO, error) { 248 uri, err := LogURIGenerator("binary", binary, args) 249 if err != nil { 250 return nil, err 251 } 252 253 res := uri.String() 254 return &logURI{ 255 config: Config{ 256 Stdout: res, 257 Stderr: res, 258 }, 259 }, nil 260 } 261} 262 263// TerminalBinaryIO forwards container STDOUT|STDERR directly to a logging binary 264// It also sets the terminal option to true 265func TerminalBinaryIO(binary string, args map[string]string) Creator { 266 return func(_ string) (IO, error) { 267 uri, err := LogURIGenerator("binary", binary, args) 268 if err != nil { 269 return nil, err 270 } 271 272 res := uri.String() 273 return &logURI{ 274 config: Config{ 275 Stdout: res, 276 Stderr: res, 277 Terminal: true, 278 }, 279 }, nil 280 } 281} 282 283// LogFile creates a file on disk that logs the task's STDOUT,STDERR. 284// If the log file already exists, the logs will be appended to the file. 285func LogFile(path string) Creator { 286 return func(_ string) (IO, error) { 287 uri, err := LogURIGenerator("file", path, nil) 288 if err != nil { 289 return nil, err 290 } 291 292 res := uri.String() 293 return &logURI{ 294 config: Config{ 295 Stdout: res, 296 Stderr: res, 297 }, 298 }, nil 299 } 300} 301 302// LogURIGenerator is the helper to generate log uri with specific scheme. 303func LogURIGenerator(scheme string, path string, args map[string]string) (*url.URL, error) { 304 path = filepath.Clean(path) 305 if !strings.HasPrefix(path, "/") { 306 return nil, errors.New("absolute path needed") 307 } 308 309 uri := &url.URL{ 310 Scheme: scheme, 311 Path: path, 312 } 313 314 if len(args) == 0 { 315 return uri, nil 316 } 317 318 q := uri.Query() 319 for k, v := range args { 320 q.Set(k, v) 321 } 322 uri.RawQuery = q.Encode() 323 return uri, nil 324} 325 326type logURI struct { 327 config Config 328} 329 330func (l *logURI) Config() Config { 331 return l.config 332} 333 334func (l *logURI) Cancel() { 335 336} 337 338func (l *logURI) Wait() { 339 340} 341 342func (l *logURI) Close() error { 343 return nil 344} 345 346// Load the io for a container but do not attach 347// 348// Allows io to be loaded on the task for deletion without 349// starting copy routines 350func Load(set *FIFOSet) (IO, error) { 351 return &cio{ 352 config: set.Config, 353 closers: []io.Closer{set}, 354 }, nil 355} 356 357func (p *pipes) closers() []io.Closer { 358 return []io.Closer{p.Stdin, p.Stdout, p.Stderr} 359} 360