1package eval 2 3import ( 4 "bufio" 5 "errors" 6 "fmt" 7 "io" 8 "os" 9 "sync" 10 11 "src.elv.sh/pkg/eval/errs" 12 "src.elv.sh/pkg/eval/vals" 13 "src.elv.sh/pkg/strutil" 14) 15 16// Port conveys data stream. It always consists of a byte band and a channel band. 17type Port struct { 18 File *os.File 19 Chan chan interface{} 20 closeFile bool 21 closeChan bool 22 23 // The following two fields are populated as an additional control 24 // mechanism for output ports. When no more value should be send on Chan, 25 // chanSendError is populated and chanSendStop is closed. This is used for 26 // both detection of reader termination (see readerGone below) and closed 27 // ports. 28 sendStop chan struct{} 29 sendError *error 30 31 // Only populated in output ports writing to another command in a pipeline. 32 // When the reading end of the pipe exits, it stores 1 in readerGone. This 33 // is used to check if an external command killed by SIGPIPE is caused by 34 // the termination of the reader of the pipe. 35 readerGone *int32 36} 37 38// ErrNoValueOutput is thrown when writing to a pipe without a value output 39// component. 40var ErrNoValueOutput = errors.New("port has no value output") 41 42// A closed channel, suitable as a value for Port.sendStop when there is no 43// reader to start with. 44var closedSendStop = make(chan struct{}) 45 46func init() { close(closedSendStop) } 47 48// Returns a copy of the Port with the Close* flags unset. 49func (p *Port) fork() *Port { 50 return &Port{p.File, p.Chan, false, false, p.sendStop, p.sendError, p.readerGone} 51} 52 53// Closes a Port. 54func (p *Port) close() { 55 if p == nil { 56 return 57 } 58 if p.closeFile { 59 p.File.Close() 60 } 61 if p.closeChan { 62 close(p.Chan) 63 } 64} 65 66var ( 67 // ClosedChan is a closed channel, suitable as a placeholder input channel. 68 ClosedChan = getClosedChan() 69 // BlackholeChan is a channel that absorbs all values written to it, 70 // suitable as a placeholder output channel. 71 BlackholeChan = getBlackholeChan() 72 // DevNull is /dev/null, suitable as a placeholder file for either input or 73 // output. 74 DevNull = getDevNull() 75 76 // DummyInputPort is a port made up from DevNull and ClosedChan, suitable as 77 // a placeholder input port. 78 DummyInputPort = &Port{File: DevNull, Chan: ClosedChan} 79 // DummyOutputPort is a port made up from DevNull and BlackholeChan, 80 // suitable as a placeholder output port. 81 DummyOutputPort = &Port{File: DevNull, Chan: BlackholeChan} 82 83 // DummyPorts contains 3 dummy ports, suitable as stdin, stdout and stderr. 84 DummyPorts = []*Port{DummyInputPort, DummyOutputPort, DummyOutputPort} 85) 86 87func getClosedChan() chan interface{} { 88 ch := make(chan interface{}) 89 close(ch) 90 return ch 91} 92 93func getBlackholeChan() chan interface{} { 94 ch := make(chan interface{}) 95 go func() { 96 for range ch { 97 } 98 }() 99 return ch 100} 101 102func getDevNull() *os.File { 103 f, err := os.Open(os.DevNull) 104 if err != nil { 105 fmt.Fprintf(os.Stderr, 106 "cannot open %s, shell might not function normally\n", os.DevNull) 107 } 108 return f 109} 110 111// PipePort returns an output *Port whose value and byte components are both 112// piped. The supplied functions are called on a separate goroutine with the 113// read ends of the value and byte components of the port. It also returns a 114// function to clean up the port and wait for the callbacks to finish. 115func PipePort(vCb func(<-chan interface{}), bCb func(*os.File)) (*Port, func(), error) { 116 r, w, err := os.Pipe() 117 if err != nil { 118 return nil, nil, err 119 } 120 ch := make(chan interface{}, outputCaptureBufferSize) 121 122 var wg sync.WaitGroup 123 wg.Add(2) 124 go func() { 125 defer wg.Done() 126 vCb(ch) 127 }() 128 go func() { 129 defer wg.Done() 130 defer r.Close() 131 bCb(r) 132 }() 133 134 port := &Port{Chan: ch, closeChan: true, File: w, closeFile: true} 135 done := func() { 136 port.close() 137 wg.Wait() 138 } 139 return port, done, nil 140} 141 142// CapturePort returns an output *Port whose value and byte components are 143// both connected to an internal pipe that saves the output. It also returns a 144// function to call to obtain the captured output. 145func CapturePort() (*Port, func() []interface{}, error) { 146 vs := []interface{}{} 147 var m sync.Mutex 148 port, done, err := PipePort( 149 func(ch <-chan interface{}) { 150 for v := range ch { 151 m.Lock() 152 vs = append(vs, v) 153 m.Unlock() 154 } 155 }, 156 func(r *os.File) { 157 buffered := bufio.NewReader(r) 158 for { 159 line, err := buffered.ReadString('\n') 160 if line != "" { 161 v := strutil.ChopLineEnding(line) 162 m.Lock() 163 vs = append(vs, v) 164 m.Unlock() 165 } 166 if err != nil { 167 if err != io.EOF { 168 logger.Println("error on reading:", err) 169 } 170 break 171 } 172 } 173 }) 174 if err != nil { 175 return nil, nil, err 176 } 177 return port, func() []interface{} { 178 done() 179 return vs 180 }, nil 181} 182 183// StringCapturePort is like CapturePort, but processes value outputs by 184// stringifying them and prepending an output marker. 185func StringCapturePort() (*Port, func() []string, error) { 186 var lines []string 187 var mu sync.Mutex 188 addLine := func(line string) { 189 mu.Lock() 190 defer mu.Unlock() 191 lines = append(lines, line) 192 } 193 port, done, err := PipePort( 194 func(ch <-chan interface{}) { 195 for v := range ch { 196 addLine("▶ " + vals.ToString(v)) 197 } 198 }, 199 func(r *os.File) { 200 bufr := bufio.NewReader(r) 201 for { 202 line, err := bufr.ReadString('\n') 203 if err != nil { 204 if err != io.EOF { 205 addLine("i/o error: " + err.Error()) 206 } 207 break 208 } 209 addLine(strutil.ChopLineEnding(line)) 210 } 211 }) 212 if err != nil { 213 return nil, nil, err 214 } 215 return port, func() []string { 216 done() 217 return lines 218 }, nil 219} 220 221// Buffer size for the channel to use in FilePort. The value has been chosen 222// arbitrarily. 223const filePortChanSize = 32 224 225// FilePort returns an output *Port where the byte component is the file itself, 226// and the value component is converted to an internal channel that writes 227// each value to the file, prepending with a prefix. It also returns a cleanup 228// function, which should be called when the *Port is no longer needed. 229func FilePort(f *os.File, valuePrefix string) (*Port, func()) { 230 ch := make(chan interface{}, filePortChanSize) 231 relayDone := make(chan struct{}) 232 go func() { 233 for v := range ch { 234 f.WriteString(valuePrefix) 235 f.WriteString(vals.Repr(v, vals.NoPretty)) 236 f.WriteString("\n") 237 } 238 close(relayDone) 239 }() 240 return &Port{File: f, Chan: ch}, func() { 241 close(ch) 242 <-relayDone 243 } 244} 245 246// PortsFromStdFiles is a shorthand for calling PortsFromFiles with os.Stdin, 247// os.Stdout and os.Stderr. 248func PortsFromStdFiles(prefix string) ([]*Port, func()) { 249 return PortsFromFiles([3]*os.File{os.Stdin, os.Stdout, os.Stderr}, prefix) 250} 251 252// PortsFromFiles builds 3 ports from 3 files. It also returns a function that 253// should be called when the ports are no longer needed. 254func PortsFromFiles(files [3]*os.File, prefix string) ([]*Port, func()) { 255 port1, cleanup1 := FilePort(files[1], prefix) 256 port2, cleanup2 := FilePort(files[2], prefix) 257 return []*Port{{File: files[0], Chan: ClosedChan}, port1, port2}, func() { 258 cleanup1() 259 cleanup2() 260 } 261} 262 263// ValueOutput defines the interface through which builtin commands access the 264// value output. 265// 266// The value output is backed by two channels, one for writing output, another 267// for the back-chanel signal that the reader of the channel has gone. 268type ValueOutput interface { 269 // Outputs a value. Returns errs.ReaderGone if the reader is gone. 270 Put(v interface{}) error 271} 272 273type valueOutput struct { 274 data chan<- interface{} 275 sendStop <-chan struct{} 276 sendError *error 277} 278 279func (vo valueOutput) Put(v interface{}) error { 280 select { 281 case vo.data <- v: 282 return nil 283 case <-vo.sendStop: 284 return *vo.sendError 285 } 286} 287 288// ByteOutput defines the interface through which builtin commands access the 289// byte output. 290// 291// It is a thin wrapper around the underlying *os.File value, only exposing 292// the necessary methods for writing bytes and strings, and converting any 293// syscall.EPIPE errors to errs.ReaderGone. 294type ByteOutput interface { 295 io.Writer 296 io.StringWriter 297} 298 299type byteOutput struct { 300 f *os.File 301} 302 303func (bo byteOutput) Write(p []byte) (int, error) { 304 n, err := bo.f.Write(p) 305 return n, convertReaderGone(err) 306} 307 308func (bo byteOutput) WriteString(s string) (int, error) { 309 n, err := bo.f.WriteString(s) 310 return n, convertReaderGone(err) 311} 312 313func convertReaderGone(err error) error { 314 if pathErr, ok := err.(*os.PathError); ok { 315 if pathErr.Err == epipe { 316 return errs.ReaderGone{} 317 } 318 } 319 return err 320} 321