1package eval
2
3import (
4	"bufio"
5	"errors"
6	"fmt"
7	"io"
8	"os"
9	"sync"
10
11	"src.elv.sh/pkg/eval/errs"
12	"src.elv.sh/pkg/eval/vals"
13	"src.elv.sh/pkg/strutil"
14)
15
16// Port conveys data stream. It always consists of a byte band and a channel band.
17type Port struct {
18	File      *os.File
19	Chan      chan interface{}
20	closeFile bool
21	closeChan bool
22
23	// The following two fields are populated as an additional control
24	// mechanism for output ports. When no more value should be send on Chan,
25	// chanSendError is populated and chanSendStop is closed. This is used for
26	// both detection of reader termination (see readerGone below) and closed
27	// ports.
28	sendStop  chan struct{}
29	sendError *error
30
31	// Only populated in output ports writing to another command in a pipeline.
32	// When the reading end of the pipe exits, it stores 1 in readerGone. This
33	// is used to check if an external command killed by SIGPIPE is caused by
34	// the termination of the reader of the pipe.
35	readerGone *int32
36}
37
38// ErrNoValueOutput is thrown when writing to a pipe without a value output
39// component.
40var ErrNoValueOutput = errors.New("port has no value output")
41
42// A closed channel, suitable as a value for Port.sendStop when there is no
43// reader to start with.
44var closedSendStop = make(chan struct{})
45
46func init() { close(closedSendStop) }
47
48// Returns a copy of the Port with the Close* flags unset.
49func (p *Port) fork() *Port {
50	return &Port{p.File, p.Chan, false, false, p.sendStop, p.sendError, p.readerGone}
51}
52
53// Closes a Port.
54func (p *Port) close() {
55	if p == nil {
56		return
57	}
58	if p.closeFile {
59		p.File.Close()
60	}
61	if p.closeChan {
62		close(p.Chan)
63	}
64}
65
66var (
67	// ClosedChan is a closed channel, suitable as a placeholder input channel.
68	ClosedChan = getClosedChan()
69	// BlackholeChan is a channel that absorbs all values written to it,
70	// suitable as a placeholder output channel.
71	BlackholeChan = getBlackholeChan()
72	// DevNull is /dev/null, suitable as a placeholder file for either input or
73	// output.
74	DevNull = getDevNull()
75
76	// DummyInputPort is a port made up from DevNull and ClosedChan, suitable as
77	// a placeholder input port.
78	DummyInputPort = &Port{File: DevNull, Chan: ClosedChan}
79	// DummyOutputPort is a port made up from DevNull and BlackholeChan,
80	// suitable as a placeholder output port.
81	DummyOutputPort = &Port{File: DevNull, Chan: BlackholeChan}
82
83	// DummyPorts contains 3 dummy ports, suitable as stdin, stdout and stderr.
84	DummyPorts = []*Port{DummyInputPort, DummyOutputPort, DummyOutputPort}
85)
86
87func getClosedChan() chan interface{} {
88	ch := make(chan interface{})
89	close(ch)
90	return ch
91}
92
93func getBlackholeChan() chan interface{} {
94	ch := make(chan interface{})
95	go func() {
96		for range ch {
97		}
98	}()
99	return ch
100}
101
102func getDevNull() *os.File {
103	f, err := os.Open(os.DevNull)
104	if err != nil {
105		fmt.Fprintf(os.Stderr,
106			"cannot open %s, shell might not function normally\n", os.DevNull)
107	}
108	return f
109}
110
111// PipePort returns an output *Port whose value and byte components are both
112// piped. The supplied functions are called on a separate goroutine with the
113// read ends of the value and byte components of the port. It also returns a
114// function to clean up the port and wait for the callbacks to finish.
115func PipePort(vCb func(<-chan interface{}), bCb func(*os.File)) (*Port, func(), error) {
116	r, w, err := os.Pipe()
117	if err != nil {
118		return nil, nil, err
119	}
120	ch := make(chan interface{}, outputCaptureBufferSize)
121
122	var wg sync.WaitGroup
123	wg.Add(2)
124	go func() {
125		defer wg.Done()
126		vCb(ch)
127	}()
128	go func() {
129		defer wg.Done()
130		defer r.Close()
131		bCb(r)
132	}()
133
134	port := &Port{Chan: ch, closeChan: true, File: w, closeFile: true}
135	done := func() {
136		port.close()
137		wg.Wait()
138	}
139	return port, done, nil
140}
141
142// CapturePort returns an output *Port whose value and byte components are
143// both connected to an internal pipe that saves the output. It also returns a
144// function to call to obtain the captured output.
145func CapturePort() (*Port, func() []interface{}, error) {
146	vs := []interface{}{}
147	var m sync.Mutex
148	port, done, err := PipePort(
149		func(ch <-chan interface{}) {
150			for v := range ch {
151				m.Lock()
152				vs = append(vs, v)
153				m.Unlock()
154			}
155		},
156		func(r *os.File) {
157			buffered := bufio.NewReader(r)
158			for {
159				line, err := buffered.ReadString('\n')
160				if line != "" {
161					v := strutil.ChopLineEnding(line)
162					m.Lock()
163					vs = append(vs, v)
164					m.Unlock()
165				}
166				if err != nil {
167					if err != io.EOF {
168						logger.Println("error on reading:", err)
169					}
170					break
171				}
172			}
173		})
174	if err != nil {
175		return nil, nil, err
176	}
177	return port, func() []interface{} {
178		done()
179		return vs
180	}, nil
181}
182
183// StringCapturePort is like CapturePort, but processes value outputs by
184// stringifying them and prepending an output marker.
185func StringCapturePort() (*Port, func() []string, error) {
186	var lines []string
187	var mu sync.Mutex
188	addLine := func(line string) {
189		mu.Lock()
190		defer mu.Unlock()
191		lines = append(lines, line)
192	}
193	port, done, err := PipePort(
194		func(ch <-chan interface{}) {
195			for v := range ch {
196				addLine("▶ " + vals.ToString(v))
197			}
198		},
199		func(r *os.File) {
200			bufr := bufio.NewReader(r)
201			for {
202				line, err := bufr.ReadString('\n')
203				if err != nil {
204					if err != io.EOF {
205						addLine("i/o error: " + err.Error())
206					}
207					break
208				}
209				addLine(strutil.ChopLineEnding(line))
210			}
211		})
212	if err != nil {
213		return nil, nil, err
214	}
215	return port, func() []string {
216		done()
217		return lines
218	}, nil
219}
220
221// Buffer size for the channel to use in FilePort. The value has been chosen
222// arbitrarily.
223const filePortChanSize = 32
224
225// FilePort returns an output *Port where the byte component is the file itself,
226// and the value component is converted to an internal channel that writes
227// each value to the file, prepending with a prefix. It also returns a cleanup
228// function, which should be called when the *Port is no longer needed.
229func FilePort(f *os.File, valuePrefix string) (*Port, func()) {
230	ch := make(chan interface{}, filePortChanSize)
231	relayDone := make(chan struct{})
232	go func() {
233		for v := range ch {
234			f.WriteString(valuePrefix)
235			f.WriteString(vals.Repr(v, vals.NoPretty))
236			f.WriteString("\n")
237		}
238		close(relayDone)
239	}()
240	return &Port{File: f, Chan: ch}, func() {
241		close(ch)
242		<-relayDone
243	}
244}
245
246// PortsFromStdFiles is a shorthand for calling PortsFromFiles with os.Stdin,
247// os.Stdout and os.Stderr.
248func PortsFromStdFiles(prefix string) ([]*Port, func()) {
249	return PortsFromFiles([3]*os.File{os.Stdin, os.Stdout, os.Stderr}, prefix)
250}
251
252// PortsFromFiles builds 3 ports from 3 files. It also returns a function that
253// should be called when the ports are no longer needed.
254func PortsFromFiles(files [3]*os.File, prefix string) ([]*Port, func()) {
255	port1, cleanup1 := FilePort(files[1], prefix)
256	port2, cleanup2 := FilePort(files[2], prefix)
257	return []*Port{{File: files[0], Chan: ClosedChan}, port1, port2}, func() {
258		cleanup1()
259		cleanup2()
260	}
261}
262
263// ValueOutput defines the interface through which builtin commands access the
264// value output.
265//
266// The value output is backed by two channels, one for writing output, another
267// for the back-chanel signal that the reader of the channel has gone.
268type ValueOutput interface {
269	// Outputs a value. Returns errs.ReaderGone if the reader is gone.
270	Put(v interface{}) error
271}
272
273type valueOutput struct {
274	data      chan<- interface{}
275	sendStop  <-chan struct{}
276	sendError *error
277}
278
279func (vo valueOutput) Put(v interface{}) error {
280	select {
281	case vo.data <- v:
282		return nil
283	case <-vo.sendStop:
284		return *vo.sendError
285	}
286}
287
288// ByteOutput defines the interface through which builtin commands access the
289// byte output.
290//
291// It is a thin wrapper around the underlying *os.File value, only exposing
292// the necessary methods for writing bytes and strings, and converting any
293// syscall.EPIPE errors to errs.ReaderGone.
294type ByteOutput interface {
295	io.Writer
296	io.StringWriter
297}
298
299type byteOutput struct {
300	f *os.File
301}
302
303func (bo byteOutput) Write(p []byte) (int, error) {
304	n, err := bo.f.Write(p)
305	return n, convertReaderGone(err)
306}
307
308func (bo byteOutput) WriteString(s string) (int, error) {
309	n, err := bo.f.WriteString(s)
310	return n, convertReaderGone(err)
311}
312
313func convertReaderGone(err error) error {
314	if pathErr, ok := err.(*os.PathError); ok {
315		if pathErr.Err == epipe {
316			return errs.ReaderGone{}
317		}
318	}
319	return err
320}
321