1// Copyright (c) 2015 HPE Software Inc. All rights reserved. 2// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. 3 4package tail 5 6import ( 7 "bufio" 8 "errors" 9 "fmt" 10 "io" 11 "io/ioutil" 12 "log" 13 "os" 14 "sync" 15 "time" 16 17 "github.com/gcla/tail/ratelimiter" 18 "github.com/gcla/tail/util" 19 "github.com/gcla/tail/watch" 20 "gopkg.in/tomb.v1" 21) 22 23var ( 24 ErrStop = errors.New("tail should now stop") 25) 26 27type Chunk struct { 28 Text []byte 29 Time time.Time 30 Err error // Error from tail 31} 32 33// SeekInfo represents arguments to `os.Seek` 34type SeekInfo struct { 35 Offset int64 36 Whence int // os.SEEK_* 37} 38 39type logger interface { 40 Fatal(v ...interface{}) 41 Fatalf(format string, v ...interface{}) 42 Fatalln(v ...interface{}) 43 Panic(v ...interface{}) 44 Panicf(format string, v ...interface{}) 45 Panicln(v ...interface{}) 46 Print(v ...interface{}) 47 Printf(format string, v ...interface{}) 48 Println(v ...interface{}) 49} 50 51// Config is used to specify how a file must be tailed. 52type Config struct { 53 // File-specifc 54 Location *SeekInfo // Seek to this location before tailing 55 ReOpen bool // Reopen recreated files (tail -F) 56 MustExist bool // Fail early if the file does not exist 57 Poll bool // Poll for file changes instead of using inotify 58 Pipe bool // Is a named pipe (mkfifo) 59 RateLimiter *ratelimiter.LeakyBucket 60 61 // Generic IO 62 Follow bool // Continue looking for new lines (tail -f) 63 64 // Logger, when nil, is set to tail.DefaultLogger 65 // To disable logging: set field to tail.DiscardingLogger 66 Logger logger 67} 68 69type Tail struct { 70 Filename string 71 Bytes chan *Chunk 72 Config 73 74 file *os.File 75 reader *bufio.Reader 76 77 watcher watch.FileWatcher 78 changes *watch.FileChanges 79 80 tomb.Tomb // provides: Done, Kill, Dying 81 82 lk sync.Mutex 83} 84 85var ( 86 // DefaultLogger is used when Config.Logger == nil 87 DefaultLogger = log.New(os.Stderr, "", log.LstdFlags) 88 // DiscardingLogger can be used to disable logging output 89 DiscardingLogger = log.New(ioutil.Discard, "", 0) 90) 91 92// TailFile begins tailing the file. Output stream is made available 93// via the `Tail.Lines` channel. To handle errors during tailing, 94// invoke the `Wait` or `Err` method after finishing reading from the 95// `Lines` channel. 96func TailFile(filename string, config Config) (*Tail, error) { 97 if config.ReOpen && !config.Follow { 98 util.Fatal("cannot set ReOpen without Follow.") 99 } 100 101 t := &Tail{ 102 Filename: filename, 103 Bytes: make(chan *Chunk), 104 Config: config, 105 } 106 107 // when Logger was not specified in config, use default logger 108 if t.Logger == nil { 109 t.Logger = log.New(os.Stderr, "", log.LstdFlags) 110 } 111 112 if t.Poll { 113 t.watcher = watch.NewPollingFileWatcher(filename) 114 } else { 115 t.watcher = watch.NewInotifyFileWatcher(filename) 116 } 117 118 if t.MustExist { 119 var err error 120 t.file, err = OpenFile(t.Filename) 121 if err != nil { 122 return nil, err 123 } 124 } 125 126 go t.tailFileSync() 127 128 return t, nil 129} 130 131// Return the file's current position, like stdio's ftell(). 132// But this value is not very accurate. 133// it may readed one line in the chan(tail.Lines), 134// so it may lost one line. 135func (tail *Tail) Tell() (offset int64, err error) { 136 if tail.file == nil { 137 return 138 } 139 offset, err = tail.file.Seek(0, os.SEEK_CUR) 140 if err != nil { 141 return 142 } 143 144 tail.lk.Lock() 145 defer tail.lk.Unlock() 146 if tail.reader == nil { 147 return 148 } 149 150 offset -= int64(tail.reader.Buffered()) 151 return 152} 153 154// Stop stops the tailing activity. 155func (tail *Tail) Stop() error { 156 tail.Kill(nil) 157 return tail.Wait() 158} 159 160// StopAtEOF stops tailing as soon as the end of the file is reached. 161func (tail *Tail) StopAtEOF() error { 162 tail.Kill(errStopAtEOF) 163 return tail.Wait() 164} 165 166var errStopAtEOF = errors.New("tail: stop at eof") 167 168func (tail *Tail) close() { 169 close(tail.Bytes) 170 tail.closeFile() 171} 172 173func (tail *Tail) closeFile() { 174 if tail.file != nil { 175 tail.file.Close() 176 tail.file = nil 177 } 178} 179 180func (tail *Tail) reopen() error { 181 tail.closeFile() 182 for { 183 var err error 184 tail.file, err = OpenFile(tail.Filename) 185 if err != nil { 186 if os.IsNotExist(err) { 187 tail.Logger.Printf("Waiting for %s to appear...", tail.Filename) 188 if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil { 189 if err == tomb.ErrDying { 190 return err 191 } 192 return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err) 193 } 194 continue 195 } 196 return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err) 197 } 198 break 199 } 200 return nil 201} 202 203func (tail *Tail) readLine() ([]byte, error) { 204 tail.lk.Lock() 205 defer tail.lk.Unlock() 206 b, err := tail.reader.ReadByte() 207 return []byte{b}, err 208} 209 210func (tail *Tail) tailFileSync() { 211 defer tail.Done() 212 defer tail.close() 213 214 if !tail.MustExist { 215 // deferred first open. 216 err := tail.reopen() 217 if err != nil { 218 if err != tomb.ErrDying { 219 tail.Kill(err) 220 } 221 return 222 } 223 } 224 225 // Seek to requested location on first open of the file. 226 if tail.Location != nil { 227 _, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence) 228 tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location) 229 if err != nil { 230 tail.Killf("Seek error on %s: %s", tail.Filename, err) 231 return 232 } 233 } 234 235 tail.openReader() 236 237 var offset int64 238 var err error 239 240 // Read line by line. 241 for { 242 // do not seek in named pipes 243 if !tail.Pipe { 244 // grab the position in case we need to back up in the event of a half-line 245 offset, err = tail.Tell() 246 if err != nil { 247 tail.Kill(err) 248 return 249 } 250 } 251 252 line, err := tail.readLine() 253 254 // Process `line` even if err is EOF. 255 if err == nil { 256 cooloff := !tail.sendChunk(line) 257 if cooloff { 258 // Wait a second before seeking till the end of 259 // file when rate limit is reached. 260 msg := ("Too much log activity; waiting a second " + 261 "before resuming tailing") 262 tail.Bytes <- &Chunk{[]byte(msg), time.Now(), errors.New(msg)} 263 select { 264 case <-time.After(time.Second): 265 case <-tail.Dying(): 266 return 267 } 268 if err := tail.seekEnd(); err != nil { 269 tail.Kill(err) 270 return 271 } 272 } 273 } else if err == io.EOF { 274 if !tail.Follow { 275 if len(line) > 0 { 276 tail.sendChunk(line) 277 } 278 return 279 } 280 281 if tail.Follow && len(line) > 0 { 282 // this has the potential to never return the last line if 283 // it's not followed by a newline; seems a fair trade here 284 err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0}) 285 if err != nil { 286 tail.Kill(err) 287 return 288 } 289 } 290 291 // When EOF is reached, wait for more data to become 292 // available. Wait strategy is based on the `tail.watcher` 293 // implementation (inotify or polling). 294 err := tail.waitForChanges() 295 if err != nil { 296 if err != ErrStop { 297 tail.Kill(err) 298 } 299 return 300 } 301 } else { 302 // non-EOF error 303 tail.Killf("Error reading %s: %s", tail.Filename, err) 304 return 305 } 306 307 select { 308 case <-tail.Dying(): 309 if tail.Err() == errStopAtEOF { 310 continue 311 } 312 return 313 default: 314 } 315 } 316} 317 318// waitForChanges waits until the file has been appended, deleted, 319// moved or truncated. When moved or deleted - the file will be 320// reopened if ReOpen is true. Truncated files are always reopened. 321func (tail *Tail) waitForChanges() error { 322 if tail.changes == nil { 323 pos, err := tail.file.Seek(0, os.SEEK_CUR) 324 if err != nil { 325 return err 326 } 327 tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos) 328 if err != nil { 329 return err 330 } 331 } 332 333 select { 334 case <-tail.changes.Modified: 335 return nil 336 case <-tail.changes.Deleted: 337 tail.changes = nil 338 if tail.ReOpen { 339 // XXX: we must not log from a library. 340 tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename) 341 if err := tail.reopen(); err != nil { 342 return err 343 } 344 tail.Logger.Printf("Successfully reopened %s", tail.Filename) 345 tail.openReader() 346 return nil 347 } else { 348 tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename) 349 return ErrStop 350 } 351 case <-tail.changes.Truncated: 352 // Always reopen truncated files (Follow is true) 353 tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename) 354 if err := tail.reopen(); err != nil { 355 return err 356 } 357 tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename) 358 tail.openReader() 359 return nil 360 case <-tail.Dying(): 361 return ErrStop 362 } 363 panic("unreachable") 364} 365 366func (tail *Tail) openReader() { 367 tail.reader = bufio.NewReader(tail.file) 368} 369 370func (tail *Tail) seekEnd() error { 371 return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END}) 372} 373 374func (tail *Tail) seekTo(pos SeekInfo) error { 375 _, err := tail.file.Seek(pos.Offset, pos.Whence) 376 if err != nil { 377 return fmt.Errorf("Seek error on %s: %s", tail.Filename, err) 378 } 379 // Reset the read buffer whenever the file is re-seek'ed 380 tail.reader.Reset(tail.file) 381 return nil 382} 383 384// sendChunk sends the line(s) to Lines channel, splitting longer lines 385// if necessary. Return false if rate limit is reached. 386func (tail *Tail) sendChunk(line []byte) bool { 387 now := time.Now() 388 389 tail.Bytes <- &Chunk{line, now, nil} 390 391 return true 392} 393 394// Cleanup removes inotify watches added by the tail package. This function is 395// meant to be invoked from a process's exit handler. Linux kernel may not 396// automatically remove inotify watches after the process exits. 397func (tail *Tail) Cleanup() { 398 watch.Cleanup(tail.Filename) 399} 400