1// ================================================================ 2// These are handlers for print, dump, emit, etc in the put/filter verbs. 3// 4// * For "> filename" ">> filename", these handle the open/write/close file operations. 5// * For "| command", these handle open/write/close pipe operations. 6// * For stderr, these write to stderr immediately. 7// * For stdout, these write to the main record-output Go channel. 8// The reason for this is since we want all print statements and 9// record-output to be in the same goroutine, for deterministic output 10// ordering. (Main record-writer output is also to stdout.) 11// ================================================================ 12 13package output 14 15import ( 16 "errors" 17 "fmt" 18 "io" 19 "os" 20 21 "miller/src/cliutil" 22 "miller/src/lib" 23 "miller/src/types" 24) 25 26// ================================================================ 27type OutputHandlerManager interface { 28 29 // For print-variants and dump-variants 30 WriteString(outputString string, filename string) error 31 32 // For emit-variants and tee 33 WriteRecordAndContext(outrecAndContext *types.RecordAndContext, filename string) error 34 35 Close() []error 36} 37 38type OutputHandler interface { 39 WriteString(outputString string) error 40 WriteRecordAndContext(outrecAndContext *types.RecordAndContext) error 41 Close() error 42} 43 44// ================================================================ 45type MultiOutputHandlerManager struct { 46 outputHandlers map[string]OutputHandler 47 48 // For stdout or stderr 49 singleHandler *FileOutputHandler 50 51 // TOOD: make an enum 52 append bool // True for ">>", false for ">" and "|" 53 pipe bool // True for "|", false for ">" and ">>" 54 recordWriterOptions *cliutil.TWriterOptions 55} 56 57// ---------------------------------------------------------------- 58func NewFileWritetHandlerManager( 59 recordWriterOptions *cliutil.TWriterOptions, 60) *MultiOutputHandlerManager { 61 return &MultiOutputHandlerManager{ 62 outputHandlers: make(map[string]OutputHandler), 63 singleHandler: nil, 64 append: false, 65 pipe: false, 66 recordWriterOptions: recordWriterOptions, 67 } 68} 69 70func NewFileAppendHandlerManager( 71 recordWriterOptions *cliutil.TWriterOptions, 72) *MultiOutputHandlerManager { 73 return &MultiOutputHandlerManager{ 74 outputHandlers: make(map[string]OutputHandler), 75 singleHandler: nil, 76 append: true, 77 pipe: false, 78 recordWriterOptions: recordWriterOptions, 79 } 80} 81 82func NewPipeWriteHandlerManager( 83 recordWriterOptions *cliutil.TWriterOptions, 84) *MultiOutputHandlerManager { 85 return &MultiOutputHandlerManager{ 86 outputHandlers: make(map[string]OutputHandler), 87 singleHandler: nil, 88 append: false, 89 pipe: true, 90 recordWriterOptions: recordWriterOptions, 91 } 92} 93 94func NewStdoutWriteHandlerManager( 95 recordWriterOptions *cliutil.TWriterOptions, 96) *MultiOutputHandlerManager { 97 return &MultiOutputHandlerManager{ 98 outputHandlers: make(map[string]OutputHandler), 99 singleHandler: newStdoutOutputHandler(recordWriterOptions), 100 append: false, 101 pipe: false, 102 recordWriterOptions: recordWriterOptions, 103 } 104} 105 106func NewStderrWriteHandlerManager( 107 recordWriterOptions *cliutil.TWriterOptions, 108) *MultiOutputHandlerManager { 109 return &MultiOutputHandlerManager{ 110 outputHandlers: make(map[string]OutputHandler), 111 singleHandler: newStderrOutputHandler(recordWriterOptions), 112 append: false, 113 pipe: false, 114 recordWriterOptions: recordWriterOptions, 115 } 116} 117 118// ---------------------------------------------------------------- 119func (this *MultiOutputHandlerManager) WriteString( 120 outputString string, 121 filename string, 122) error { 123 outputHandler, err := this.getOutputHandlerFor(filename) 124 if err != nil { 125 return err 126 } 127 return outputHandler.WriteString(outputString) 128} 129 130func (this *MultiOutputHandlerManager) WriteRecordAndContext( 131 outrecAndContext *types.RecordAndContext, 132 filename string, 133) error { 134 outputHandler, err := this.getOutputHandlerFor(filename) 135 if err != nil { 136 return err 137 } 138 return outputHandler.WriteRecordAndContext(outrecAndContext) 139} 140 141func (this *MultiOutputHandlerManager) getOutputHandlerFor( 142 filename string, 143) (OutputHandler, error) { 144 if this.singleHandler != nil { 145 return this.singleHandler, nil 146 } 147 148 // TODO: LRU with close and re-open in case too many files are open 149 outputHandler := this.outputHandlers[filename] 150 if outputHandler == nil { 151 var err error = nil 152 if this.pipe { 153 outputHandler, err = NewPipeWriteOutputHandler( 154 filename, 155 this.recordWriterOptions, 156 ) 157 if err != nil { 158 return nil, err 159 } 160 if outputHandler != nil { 161 } 162 } else if this.append { 163 outputHandler, err = NewFileAppendOutputHandler( 164 filename, 165 this.recordWriterOptions, 166 ) 167 if err != nil { 168 return nil, err 169 } 170 } else { 171 outputHandler, err = NewFileWriteOutputHandler( 172 filename, 173 this.recordWriterOptions, 174 ) 175 if err != nil { 176 return nil, err 177 } 178 } 179 this.outputHandlers[filename] = outputHandler 180 } 181 return outputHandler, nil 182} 183 184func (this *MultiOutputHandlerManager) Close() []error { 185 errs := make([]error, 0) 186 if this.singleHandler != nil { 187 err := this.singleHandler.Close() 188 if err != nil { 189 errs = append(errs, err) 190 } 191 } 192 for _, outputHandler := range this.outputHandlers { 193 err := outputHandler.Close() 194 if err != nil { 195 errs = append(errs, err) 196 } 197 } 198 return errs 199} 200 201// ================================================================ 202type FileOutputHandler struct { 203 filename string 204 handle io.WriteCloser 205 closeable bool 206 207 // This will be nil if WriteRecordAndContext has never been called. It's 208 // lazily created on WriteRecord. The record-writer / channel parts are 209 // called only by WriteRecrod which is called by emit and tee variants; 210 // print and dump variants call WriteString. 211 recordWriterOptions *cliutil.TWriterOptions 212 recordWriter IRecordWriter 213 recordOutputChannel chan *types.RecordAndContext 214 recordDoneChannel chan bool 215} 216 217func newOutputHandlerCommon( 218 filename string, 219 handle io.WriteCloser, 220 closeable bool, 221 recordWriterOptions *cliutil.TWriterOptions, 222) *FileOutputHandler { 223 return &FileOutputHandler{ 224 filename: filename, 225 handle: handle, 226 closeable: closeable, 227 228 recordWriterOptions: recordWriterOptions, 229 recordWriter: nil, 230 recordOutputChannel: nil, 231 recordDoneChannel: nil, 232 } 233} 234 235// ---------------------------------------------------------------- 236func NewFileWriteOutputHandler( 237 filename string, 238 recordWriterOptions *cliutil.TWriterOptions, 239) (*FileOutputHandler, error) { 240 handle, err := os.OpenFile( 241 filename, 242 os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 243 0644, // TODO: let users parameterize this 244 ) 245 if err != nil { 246 return nil, err 247 } 248 return newOutputHandlerCommon( 249 filename, 250 handle, 251 true, 252 recordWriterOptions, 253 ), nil 254} 255 256func NewFileAppendOutputHandler( 257 filename string, 258 recordWriterOptions *cliutil.TWriterOptions, 259) (*FileOutputHandler, error) { 260 handle, err := os.OpenFile( 261 filename, 262 os.O_CREATE|os.O_WRONLY|os.O_APPEND, 263 0644, // TODO: let users parameterize this 264 ) 265 if err != nil { 266 return nil, err 267 } 268 return newOutputHandlerCommon( 269 filename, 270 handle, 271 true, 272 recordWriterOptions, 273 ), nil 274} 275 276func NewPipeWriteOutputHandler( 277 commandString string, 278 recordWriterOptions *cliutil.TWriterOptions, 279) (*FileOutputHandler, error) { 280 writePipe, err := lib.OpenOutboundHalfPipe(commandString) 281 if err != nil { 282 return nil, errors.New( 283 fmt.Sprintf( 284 "%s: could not launch command \"%s\" for pipe-to.", 285 lib.MlrExeName(), 286 commandString, 287 ), 288 ) 289 } 290 291 return newOutputHandlerCommon( 292 "| "+commandString, 293 writePipe, 294 true, 295 recordWriterOptions, 296 ), nil 297} 298 299func newStdoutOutputHandler( 300 recordWriterOptions *cliutil.TWriterOptions, 301) *FileOutputHandler { 302 return newOutputHandlerCommon( 303 "(stdout)", 304 os.Stdout, 305 false, 306 recordWriterOptions, 307 ) 308} 309 310func newStderrOutputHandler( 311 recordWriterOptions *cliutil.TWriterOptions, 312) *FileOutputHandler { 313 return newOutputHandlerCommon( 314 "(stderr)", 315 os.Stderr, 316 false, 317 recordWriterOptions, 318 ) 319} 320 321// ---------------------------------------------------------------- 322func (this *FileOutputHandler) WriteString(outputString string) error { 323 _, err := this.handle.Write([]byte(outputString)) 324 return err 325} 326 327// ---------------------------------------------------------------- 328func (this *FileOutputHandler) WriteRecordAndContext( 329 outrecAndContext *types.RecordAndContext, 330) error { 331 // Lazily create the record-writer and output channel. 332 if this.recordWriter == nil { 333 err := this.setUpRecordWriter() 334 if err != nil { 335 return err 336 } 337 } 338 339 this.recordOutputChannel <- outrecAndContext 340 return nil 341} 342 343func (this *FileOutputHandler) setUpRecordWriter() error { 344 if this.recordWriter != nil { 345 return nil 346 } 347 348 recordWriter := Create(this.recordWriterOptions) 349 if recordWriter == nil { 350 return errors.New( 351 "Output format not found: " + this.recordWriterOptions.OutputFileFormat, 352 ) 353 } 354 this.recordWriter = recordWriter 355 356 this.recordOutputChannel = make(chan *types.RecordAndContext, 1) 357 this.recordDoneChannel = make(chan bool, 1) 358 359 go ChannelWriter( 360 this.recordOutputChannel, 361 this.recordWriter, 362 this.recordDoneChannel, 363 this.handle, 364 ) 365 366 return nil 367} 368 369// ---------------------------------------------------------------- 370func (this *FileOutputHandler) Close() error { 371 if this.recordOutputChannel != nil { 372 // TODO: see if we need a real context 373 emptyContext := types.Context{} 374 this.recordOutputChannel <- types.NewEndOfStreamMarker(&emptyContext) 375 376 // Wait for the output channel to drain 377 done := false 378 for !done { 379 select { 380 case _ = <-this.recordDoneChannel: 381 done = true 382 break 383 } 384 } 385 } 386 387 if this.closeable { 388 return this.handle.Close() 389 } else { 390 return nil 391 } 392} 393