1// Copyright (c) 2015 HPE Software Inc. All rights reserved. 2// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. 3 4package tail 5 6import ( 7 "bufio" 8 "errors" 9 "fmt" 10 "io" 11 "io/ioutil" 12 "log" 13 "os" 14 "strings" 15 "sync" 16 "time" 17 18 "github.com/hpcloud/tail/ratelimiter" 19 "github.com/hpcloud/tail/util" 20 "github.com/hpcloud/tail/watch" 21 "gopkg.in/tomb.v1" 22) 23 24var ( 25 ErrStop = errors.New("tail should now stop") 26) 27 28type Line struct { 29 Text string 30 Time time.Time 31 Err error // Error from tail 32} 33 34// NewLine returns a Line with present time. 35func NewLine(text string) *Line { 36 return &Line{text, time.Now(), nil} 37} 38 39// SeekInfo represents arguments to `os.Seek` 40type SeekInfo struct { 41 Offset int64 42 Whence int // os.SEEK_* 43} 44 45type logger interface { 46 Fatal(v ...interface{}) 47 Fatalf(format string, v ...interface{}) 48 Fatalln(v ...interface{}) 49 Panic(v ...interface{}) 50 Panicf(format string, v ...interface{}) 51 Panicln(v ...interface{}) 52 Print(v ...interface{}) 53 Printf(format string, v ...interface{}) 54 Println(v ...interface{}) 55} 56 57// Config is used to specify how a file must be tailed. 58type Config struct { 59 // File-specifc 60 Location *SeekInfo // Seek to this location before tailing 61 ReOpen bool // Reopen recreated files (tail -F) 62 MustExist bool // Fail early if the file does not exist 63 Poll bool // Poll for file changes instead of using inotify 64 Pipe bool // Is a named pipe (mkfifo) 65 RateLimiter *ratelimiter.LeakyBucket 66 67 // Generic IO 68 Follow bool // Continue looking for new lines (tail -f) 69 MaxLineSize int // If non-zero, split longer lines into multiple lines 70 71 // Logger, when nil, is set to tail.DefaultLogger 72 // To disable logging: set field to tail.DiscardingLogger 73 Logger logger 74} 75 76type Tail struct { 77 Filename string 78 Lines chan *Line 79 Config 80 81 file *os.File 82 reader *bufio.Reader 83 84 watcher watch.FileWatcher 85 changes *watch.FileChanges 86 87 tomb.Tomb // provides: Done, Kill, Dying 88 89 fileMtx sync.Mutex 90 lk sync.Mutex 91} 92 93var ( 94 // DefaultLogger is used when Config.Logger == nil 95 DefaultLogger = log.New(os.Stderr, "", log.LstdFlags) 96 // DiscardingLogger can be used to disable logging output 97 DiscardingLogger = log.New(ioutil.Discard, "", 0) 98) 99 100// TailFile begins tailing the file. Output stream is made available 101// via the `Tail.Lines` channel. To handle errors during tailing, 102// invoke the `Wait` or `Err` method after finishing reading from the 103// `Lines` channel. 104func TailFile(filename string, config Config) (*Tail, error) { 105 if config.ReOpen && !config.Follow { 106 util.Fatal("cannot set ReOpen without Follow.") 107 } 108 109 t := &Tail{ 110 Filename: filename, 111 Lines: make(chan *Line), 112 Config: config, 113 } 114 115 // when Logger was not specified in config, use default logger 116 if t.Logger == nil { 117 t.Logger = log.New(os.Stderr, "", log.LstdFlags) 118 } 119 120 if t.Poll { 121 t.watcher = watch.NewPollingFileWatcher(filename) 122 } else { 123 t.watcher = watch.NewInotifyFileWatcher(filename) 124 } 125 126 if t.MustExist { 127 var err error 128 t.file, err = OpenFile(t.Filename) 129 if err != nil { 130 return nil, err 131 } 132 } 133 134 go t.tailFileSync() 135 136 return t, nil 137} 138 139// Return the file's current position, like stdio's ftell(). 140// But this value is not very accurate. 141// it may readed one line in the chan(tail.Lines), 142// so it may lost one line. 143func (tail *Tail) Tell() (int64, error) { 144 tail.fileMtx.Lock() 145 f := tail.file 146 tail.fileMtx.Unlock() 147 if f == nil { 148 return 0, os.ErrNotExist 149 } 150 offset, err := f.Seek(0, io.SeekCurrent) 151 if err != nil { 152 return 0, err 153 } 154 155 tail.lk.Lock() 156 defer tail.lk.Unlock() 157 if tail.reader == nil { 158 return 0, nil 159 } 160 161 offset -= int64(tail.reader.Buffered()) 162 return offset, nil 163} 164 165// Size returns the length in bytes of the file being tailed, 166// or 0 with an error if there was an error Stat'ing the file. 167func (tail *Tail) Size() (int64, error) { 168 tail.fileMtx.Lock() 169 f := tail.file 170 tail.fileMtx.Unlock() 171 if f == nil { 172 return 0, os.ErrNotExist 173 } 174 fi, err := f.Stat() 175 if err != nil { 176 return 0, err 177 } 178 size := fi.Size() 179 return size, nil 180} 181 182// Stop stops the tailing activity. 183func (tail *Tail) Stop() error { 184 tail.Kill(nil) 185 return tail.Wait() 186} 187 188// StopAtEOF stops tailing as soon as the end of the file is reached. 189func (tail *Tail) StopAtEOF() error { 190 tail.Kill(errStopAtEOF) 191 return tail.Wait() 192} 193 194var errStopAtEOF = errors.New("tail: stop at eof") 195 196func (tail *Tail) close() { 197 close(tail.Lines) 198 tail.closeFile() 199} 200 201func (tail *Tail) closeFile() { 202 tail.fileMtx.Lock() 203 defer tail.fileMtx.Unlock() 204 if tail.file != nil { 205 tail.file.Close() 206 tail.file = nil 207 } 208} 209 210func (tail *Tail) reopen(truncated bool) error { 211 212 // There are cases where the file is reopened so quickly it's still the same file 213 // which causes the poller to hang on an open file handle to a file no longer being written to 214 // and which eventually gets deleted. Save the current file handle info to make sure we only 215 // start tailing a different file. 216 cf, err := tail.file.Stat() 217 if !truncated && err != nil { 218 log.Print("stat of old file returned, this is not expected and may result in unexpected behavior") 219 // We don't action on this error but are logging it, not expecting to see it happen and not sure if we 220 // need to action on it, cf is checked for nil later on to accommodate this 221 } 222 223 tail.closeFile() 224 for { 225 var err error 226 tail.fileMtx.Lock() 227 tail.file, err = OpenFile(tail.Filename) 228 tail.fileMtx.Unlock() 229 if err != nil { 230 if os.IsNotExist(err) { 231 tail.Logger.Printf("Waiting for %s to appear...", tail.Filename) 232 if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil { 233 if err == tomb.ErrDying { 234 return err 235 } 236 return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err) 237 } 238 continue 239 } 240 return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err) 241 } 242 243 // File exists and is opened, get information about it. 244 nf, err := tail.file.Stat() 245 if err != nil { 246 tail.Logger.Print("Failed to stat new file to be tailed, will try to open it again") 247 tail.closeFile() 248 continue 249 } 250 251 // Check to see if we are trying to reopen and tail the exact same file (and it was not truncated). 252 retries := 20 253 if !truncated && cf != nil && os.SameFile(cf, nf) { 254 retries-- 255 if retries <= 0 { 256 return errors.New("gave up trying to reopen log file with a different handle") 257 } 258 select { 259 case <-time.After(watch.POLL_DURATION): 260 tail.closeFile() 261 continue 262 case <-tail.Tomb.Dying(): 263 return tomb.ErrDying 264 } 265 } 266 break 267 } 268 return nil 269} 270 271func (tail *Tail) readLine() (string, error) { 272 tail.lk.Lock() 273 line, err := tail.reader.ReadString('\n') 274 tail.lk.Unlock() 275 if err != nil { 276 // Note ReadString "returns the data read before the error" in 277 // case of an error, including EOF, so we return it as is. The 278 // caller is expected to process it if err is EOF. 279 return line, err 280 } 281 282 line = strings.TrimRight(line, "\n") 283 284 return line, err 285} 286 287func (tail *Tail) tailFileSync() { 288 defer tail.Done() 289 defer tail.close() 290 291 if !tail.MustExist { 292 // deferred first open, not technically truncated but we don't need to check for changed files 293 err := tail.reopen(true) 294 if err != nil { 295 if err != tomb.ErrDying { 296 tail.Kill(err) 297 } 298 return 299 } 300 } 301 302 // Seek to requested location on first open of the file. 303 if tail.Location != nil { 304 _, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence) 305 tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location) 306 if err != nil { 307 tail.Killf("Seek error on %s: %s", tail.Filename, err) 308 return 309 } 310 } 311 312 tail.openReader() 313 314 var offset int64 315 var err error 316 oneMoreRun := false 317 318 // Read line by line. 319 for { 320 // do not seek in named pipes 321 if !tail.Pipe { 322 // grab the position in case we need to back up in the event of a half-line 323 offset, err = tail.Tell() 324 if err != nil { 325 tail.Kill(err) 326 return 327 } 328 } 329 330 line, err := tail.readLine() 331 332 // Process `line` even if err is EOF. 333 if err == nil { 334 cooloff := !tail.sendLine(line) 335 if cooloff { 336 // Wait a second before seeking till the end of 337 // file when rate limit is reached. 338 msg := ("Too much log activity; waiting a second " + 339 "before resuming tailing") 340 tail.Lines <- &Line{msg, time.Now(), errors.New(msg)} 341 select { 342 case <-time.After(time.Second): 343 case <-tail.Dying(): 344 return 345 } 346 if err := tail.seekEnd(); err != nil { 347 tail.Kill(err) 348 return 349 } 350 } 351 } else if err == io.EOF { 352 if !tail.Follow { 353 if line != "" { 354 tail.sendLine(line) 355 } 356 return 357 } 358 359 if tail.Follow && line != "" { 360 // this has the potential to never return the last line if 361 // it's not followed by a newline; seems a fair trade here 362 err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0}) 363 if err != nil { 364 tail.Kill(err) 365 return 366 } 367 } 368 369 // oneMoreRun is set true when a file is deleted, 370 // this is to catch events which might get missed in polling mode. 371 // now that the last run is completed, finish deleting the file 372 if oneMoreRun { 373 oneMoreRun = false 374 err = tail.finishDelete() 375 if err != nil { 376 if err != ErrStop { 377 tail.Kill(err) 378 } 379 return 380 } 381 } 382 383 // When EOF is reached, wait for more data to become 384 // available. Wait strategy is based on the `tail.watcher` 385 // implementation (inotify or polling). 386 oneMoreRun, err = tail.waitForChanges() 387 if err != nil { 388 if err != ErrStop { 389 tail.Kill(err) 390 } 391 return 392 } 393 } else { 394 // non-EOF error 395 tail.Killf("Error reading %s: %s", tail.Filename, err) 396 return 397 } 398 399 select { 400 case <-tail.Dying(): 401 if tail.Err() == errStopAtEOF { 402 continue 403 } 404 return 405 default: 406 } 407 } 408} 409 410// waitForChanges waits until the file has been appended, deleted, 411// moved or truncated. When moved or deleted - the file will be 412// reopened if ReOpen is true. Truncated files are always reopened. 413func (tail *Tail) waitForChanges() (bool, error) { 414 if tail.changes == nil { 415 pos, err := tail.file.Seek(0, io.SeekCurrent) 416 if err != nil { 417 return false, err 418 } 419 tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos) 420 if err != nil { 421 return false, err 422 } 423 } 424 425 select { 426 case <-tail.changes.Modified: 427 return false, nil 428 case <-tail.changes.Deleted: 429 // In polling mode we could miss events when a file is deleted, so before we give up our file handle 430 // run the poll one more time to catch anything we may have missed since the last poll. 431 return true, nil 432 case <-tail.changes.Truncated: 433 // Always reopen truncated files (Follow is true) 434 tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename) 435 if err := tail.reopen(true); err != nil { 436 return false, err 437 } 438 tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename) 439 tail.openReader() 440 return false, nil 441 case <-tail.Dying(): 442 return false, ErrStop 443 } 444 panic("unreachable") 445} 446 447func (tail *Tail) finishDelete() error { 448 tail.changes = nil 449 if tail.ReOpen { 450 // XXX: we must not log from a library. 451 tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename) 452 if err := tail.reopen(false); err != nil { 453 return err 454 } 455 tail.Logger.Printf("Successfully reopened %s", tail.Filename) 456 tail.openReader() 457 return nil 458 } else { 459 tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename) 460 return ErrStop 461 } 462} 463 464func (tail *Tail) openReader() { 465 if tail.MaxLineSize > 0 { 466 // add 2 to account for newline characters 467 tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2) 468 } else { 469 tail.reader = bufio.NewReader(tail.file) 470 } 471} 472 473func (tail *Tail) seekEnd() error { 474 return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END}) 475} 476 477func (tail *Tail) seekTo(pos SeekInfo) error { 478 _, err := tail.file.Seek(pos.Offset, pos.Whence) 479 if err != nil { 480 return fmt.Errorf("Seek error on %s: %s", tail.Filename, err) 481 } 482 // Reset the read buffer whenever the file is re-seek'ed 483 tail.reader.Reset(tail.file) 484 return nil 485} 486 487// sendLine sends the line(s) to Lines channel, splitting longer lines 488// if necessary. Return false if rate limit is reached. 489func (tail *Tail) sendLine(line string) bool { 490 now := time.Now() 491 lines := []string{line} 492 493 // Split longer lines 494 if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize { 495 lines = util.PartitionString(line, tail.MaxLineSize) 496 } 497 498 for _, line := range lines { 499 tail.Lines <- &Line{line, now, nil} 500 } 501 502 if tail.Config.RateLimiter != nil { 503 ok := tail.Config.RateLimiter.Pour(uint16(len(lines))) 504 if !ok { 505 tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n", 506 tail.Filename) 507 return false 508 } 509 } 510 511 return true 512} 513 514// Cleanup removes inotify watches added by the tail package. This function is 515// meant to be invoked from a process's exit handler. Linux kernel may not 516// automatically remove inotify watches after the process exits. 517func (tail *Tail) Cleanup() { 518 watch.Cleanup(tail.Filename) 519} 520