1// ================================================================
2// These are handlers for print, dump, emit, etc in the put/filter verbs.
3//
4// * For "> filename" ">> filename", these handle the open/write/close file operations.
5// * For "| command", these handle open/write/close pipe operations.
6// * For stderr, these write to stderr immediately.
7// * For stdout, these write to the main record-output Go channel.
8//   The reason for this is since we want all print statements and
9//   record-output to be in the same goroutine, for deterministic output
10//   ordering. (Main record-writer output is also to stdout.)
11//   ================================================================
12
13package output
14
15import (
16	"errors"
17	"fmt"
18	"io"
19	"os"
20
21	"miller/src/cliutil"
22	"miller/src/lib"
23	"miller/src/types"
24)
25
26// ================================================================
27type OutputHandlerManager interface {
28
29	// For print-variants and dump-variants
30	WriteString(outputString string, filename string) error
31
32	// For emit-variants and tee
33	WriteRecordAndContext(outrecAndContext *types.RecordAndContext, filename string) error
34
35	Close() []error
36}
37
38type OutputHandler interface {
39	WriteString(outputString string) error
40	WriteRecordAndContext(outrecAndContext *types.RecordAndContext) error
41	Close() error
42}
43
44// ================================================================
45type MultiOutputHandlerManager struct {
46	outputHandlers map[string]OutputHandler
47
48	// For stdout or stderr
49	singleHandler *FileOutputHandler
50
51	// TOOD: make an enum
52	append              bool // True for ">>", false for ">" and "|"
53	pipe                bool // True for "|", false for ">" and ">>"
54	recordWriterOptions *cliutil.TWriterOptions
55}
56
57// ----------------------------------------------------------------
58func NewFileWritetHandlerManager(
59	recordWriterOptions *cliutil.TWriterOptions,
60) *MultiOutputHandlerManager {
61	return &MultiOutputHandlerManager{
62		outputHandlers:      make(map[string]OutputHandler),
63		singleHandler:       nil,
64		append:              false,
65		pipe:                false,
66		recordWriterOptions: recordWriterOptions,
67	}
68}
69
70func NewFileAppendHandlerManager(
71	recordWriterOptions *cliutil.TWriterOptions,
72) *MultiOutputHandlerManager {
73	return &MultiOutputHandlerManager{
74		outputHandlers:      make(map[string]OutputHandler),
75		singleHandler:       nil,
76		append:              true,
77		pipe:                false,
78		recordWriterOptions: recordWriterOptions,
79	}
80}
81
82func NewPipeWriteHandlerManager(
83	recordWriterOptions *cliutil.TWriterOptions,
84) *MultiOutputHandlerManager {
85	return &MultiOutputHandlerManager{
86		outputHandlers:      make(map[string]OutputHandler),
87		singleHandler:       nil,
88		append:              false,
89		pipe:                true,
90		recordWriterOptions: recordWriterOptions,
91	}
92}
93
94func NewStdoutWriteHandlerManager(
95	recordWriterOptions *cliutil.TWriterOptions,
96) *MultiOutputHandlerManager {
97	return &MultiOutputHandlerManager{
98		outputHandlers:      make(map[string]OutputHandler),
99		singleHandler:       newStdoutOutputHandler(recordWriterOptions),
100		append:              false,
101		pipe:                false,
102		recordWriterOptions: recordWriterOptions,
103	}
104}
105
106func NewStderrWriteHandlerManager(
107	recordWriterOptions *cliutil.TWriterOptions,
108) *MultiOutputHandlerManager {
109	return &MultiOutputHandlerManager{
110		outputHandlers:      make(map[string]OutputHandler),
111		singleHandler:       newStderrOutputHandler(recordWriterOptions),
112		append:              false,
113		pipe:                false,
114		recordWriterOptions: recordWriterOptions,
115	}
116}
117
118// ----------------------------------------------------------------
119func (this *MultiOutputHandlerManager) WriteString(
120	outputString string,
121	filename string,
122) error {
123	outputHandler, err := this.getOutputHandlerFor(filename)
124	if err != nil {
125		return err
126	}
127	return outputHandler.WriteString(outputString)
128}
129
130func (this *MultiOutputHandlerManager) WriteRecordAndContext(
131	outrecAndContext *types.RecordAndContext,
132	filename string,
133) error {
134	outputHandler, err := this.getOutputHandlerFor(filename)
135	if err != nil {
136		return err
137	}
138	return outputHandler.WriteRecordAndContext(outrecAndContext)
139}
140
141func (this *MultiOutputHandlerManager) getOutputHandlerFor(
142	filename string,
143) (OutputHandler, error) {
144	if this.singleHandler != nil {
145		return this.singleHandler, nil
146	}
147
148	// TODO: LRU with close and re-open in case too many files are open
149	outputHandler := this.outputHandlers[filename]
150	if outputHandler == nil {
151		var err error = nil
152		if this.pipe {
153			outputHandler, err = NewPipeWriteOutputHandler(
154				filename,
155				this.recordWriterOptions,
156			)
157			if err != nil {
158				return nil, err
159			}
160			if outputHandler != nil {
161			}
162		} else if this.append {
163			outputHandler, err = NewFileAppendOutputHandler(
164				filename,
165				this.recordWriterOptions,
166			)
167			if err != nil {
168				return nil, err
169			}
170		} else {
171			outputHandler, err = NewFileWriteOutputHandler(
172				filename,
173				this.recordWriterOptions,
174			)
175			if err != nil {
176				return nil, err
177			}
178		}
179		this.outputHandlers[filename] = outputHandler
180	}
181	return outputHandler, nil
182}
183
184func (this *MultiOutputHandlerManager) Close() []error {
185	errs := make([]error, 0)
186	if this.singleHandler != nil {
187		err := this.singleHandler.Close()
188		if err != nil {
189			errs = append(errs, err)
190		}
191	}
192	for _, outputHandler := range this.outputHandlers {
193		err := outputHandler.Close()
194		if err != nil {
195			errs = append(errs, err)
196		}
197	}
198	return errs
199}
200
201// ================================================================
202type FileOutputHandler struct {
203	filename  string
204	handle    io.WriteCloser
205	closeable bool
206
207	// This will be nil if WriteRecordAndContext has never been called. It's
208	// lazily created on WriteRecord. The record-writer / channel parts are
209	// called only by WriteRecrod which is called by emit and tee variants;
210	// print and dump variants call WriteString.
211	recordWriterOptions *cliutil.TWriterOptions
212	recordWriter        IRecordWriter
213	recordOutputChannel chan *types.RecordAndContext
214	recordDoneChannel   chan bool
215}
216
217func newOutputHandlerCommon(
218	filename string,
219	handle io.WriteCloser,
220	closeable bool,
221	recordWriterOptions *cliutil.TWriterOptions,
222) *FileOutputHandler {
223	return &FileOutputHandler{
224		filename:  filename,
225		handle:    handle,
226		closeable: closeable,
227
228		recordWriterOptions: recordWriterOptions,
229		recordWriter:        nil,
230		recordOutputChannel: nil,
231		recordDoneChannel:   nil,
232	}
233}
234
235// ----------------------------------------------------------------
236func NewFileWriteOutputHandler(
237	filename string,
238	recordWriterOptions *cliutil.TWriterOptions,
239) (*FileOutputHandler, error) {
240	handle, err := os.OpenFile(
241		filename,
242		os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
243		0644, // TODO: let users parameterize this
244	)
245	if err != nil {
246		return nil, err
247	}
248	return newOutputHandlerCommon(
249		filename,
250		handle,
251		true,
252		recordWriterOptions,
253	), nil
254}
255
256func NewFileAppendOutputHandler(
257	filename string,
258	recordWriterOptions *cliutil.TWriterOptions,
259) (*FileOutputHandler, error) {
260	handle, err := os.OpenFile(
261		filename,
262		os.O_CREATE|os.O_WRONLY|os.O_APPEND,
263		0644, // TODO: let users parameterize this
264	)
265	if err != nil {
266		return nil, err
267	}
268	return newOutputHandlerCommon(
269		filename,
270		handle,
271		true,
272		recordWriterOptions,
273	), nil
274}
275
276func NewPipeWriteOutputHandler(
277	commandString string,
278	recordWriterOptions *cliutil.TWriterOptions,
279) (*FileOutputHandler, error) {
280	writePipe, err := lib.OpenOutboundHalfPipe(commandString)
281	if err != nil {
282		return nil, errors.New(
283			fmt.Sprintf(
284				"%s: could not launch command \"%s\" for pipe-to.",
285				lib.MlrExeName(),
286				commandString,
287			),
288		)
289	}
290
291	return newOutputHandlerCommon(
292		"| "+commandString,
293		writePipe,
294		true,
295		recordWriterOptions,
296	), nil
297}
298
299func newStdoutOutputHandler(
300	recordWriterOptions *cliutil.TWriterOptions,
301) *FileOutputHandler {
302	return newOutputHandlerCommon(
303		"(stdout)",
304		os.Stdout,
305		false,
306		recordWriterOptions,
307	)
308}
309
310func newStderrOutputHandler(
311	recordWriterOptions *cliutil.TWriterOptions,
312) *FileOutputHandler {
313	return newOutputHandlerCommon(
314		"(stderr)",
315		os.Stderr,
316		false,
317		recordWriterOptions,
318	)
319}
320
321// ----------------------------------------------------------------
322func (this *FileOutputHandler) WriteString(outputString string) error {
323	_, err := this.handle.Write([]byte(outputString))
324	return err
325}
326
327// ----------------------------------------------------------------
328func (this *FileOutputHandler) WriteRecordAndContext(
329	outrecAndContext *types.RecordAndContext,
330) error {
331	// Lazily create the record-writer and output channel.
332	if this.recordWriter == nil {
333		err := this.setUpRecordWriter()
334		if err != nil {
335			return err
336		}
337	}
338
339	this.recordOutputChannel <- outrecAndContext
340	return nil
341}
342
343func (this *FileOutputHandler) setUpRecordWriter() error {
344	if this.recordWriter != nil {
345		return nil
346	}
347
348	recordWriter := Create(this.recordWriterOptions)
349	if recordWriter == nil {
350		return errors.New(
351			"Output format not found: " + this.recordWriterOptions.OutputFileFormat,
352		)
353	}
354	this.recordWriter = recordWriter
355
356	this.recordOutputChannel = make(chan *types.RecordAndContext, 1)
357	this.recordDoneChannel = make(chan bool, 1)
358
359	go ChannelWriter(
360		this.recordOutputChannel,
361		this.recordWriter,
362		this.recordDoneChannel,
363		this.handle,
364	)
365
366	return nil
367}
368
369// ----------------------------------------------------------------
370func (this *FileOutputHandler) Close() error {
371	if this.recordOutputChannel != nil {
372		// TODO: see if we need a real context
373		emptyContext := types.Context{}
374		this.recordOutputChannel <- types.NewEndOfStreamMarker(&emptyContext)
375
376		// Wait for the output channel to drain
377		done := false
378		for !done {
379			select {
380			case _ = <-this.recordDoneChannel:
381				done = true
382				break
383			}
384		}
385	}
386
387	if this.closeable {
388		return this.handle.Close()
389	} else {
390		return nil
391	}
392}
393