1// Copyright (c) 2015 HPE Software Inc. All rights reserved.
2// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
3
4package tail
5
6import (
7	"bufio"
8	"errors"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"log"
13	"os"
14	"strings"
15	"sync"
16	"time"
17
18	"github.com/hpcloud/tail/ratelimiter"
19	"github.com/hpcloud/tail/util"
20	"github.com/hpcloud/tail/watch"
21	"gopkg.in/tomb.v1"
22)
23
24var (
25	ErrStop = errors.New("tail should now stop")
26)
27
28type Line struct {
29	Text string
30	Time time.Time
31	Err  error // Error from tail
32}
33
34// NewLine returns a Line with present time.
35func NewLine(text string) *Line {
36	return &Line{text, time.Now(), nil}
37}
38
39// SeekInfo represents arguments to `os.Seek`
40type SeekInfo struct {
41	Offset int64
42	Whence int // os.SEEK_*
43}
44
45type logger interface {
46	Fatal(v ...interface{})
47	Fatalf(format string, v ...interface{})
48	Fatalln(v ...interface{})
49	Panic(v ...interface{})
50	Panicf(format string, v ...interface{})
51	Panicln(v ...interface{})
52	Print(v ...interface{})
53	Printf(format string, v ...interface{})
54	Println(v ...interface{})
55}
56
57// Config is used to specify how a file must be tailed.
58type Config struct {
59	// File-specifc
60	Location    *SeekInfo // Seek to this location before tailing
61	ReOpen      bool      // Reopen recreated files (tail -F)
62	MustExist   bool      // Fail early if the file does not exist
63	Poll        bool      // Poll for file changes instead of using inotify
64	Pipe        bool      // Is a named pipe (mkfifo)
65	RateLimiter *ratelimiter.LeakyBucket
66
67	// Generic IO
68	Follow      bool // Continue looking for new lines (tail -f)
69	MaxLineSize int  // If non-zero, split longer lines into multiple lines
70
71	// Logger, when nil, is set to tail.DefaultLogger
72	// To disable logging: set field to tail.DiscardingLogger
73	Logger logger
74}
75
76type Tail struct {
77	Filename string
78	Lines    chan *Line
79	Config
80
81	file   *os.File
82	reader *bufio.Reader
83
84	watcher watch.FileWatcher
85	changes *watch.FileChanges
86
87	tomb.Tomb // provides: Done, Kill, Dying
88
89	fileMtx sync.Mutex
90	lk      sync.Mutex
91}
92
93var (
94	// DefaultLogger is used when Config.Logger == nil
95	DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
96	// DiscardingLogger can be used to disable logging output
97	DiscardingLogger = log.New(ioutil.Discard, "", 0)
98)
99
100// TailFile begins tailing the file. Output stream is made available
101// via the `Tail.Lines` channel. To handle errors during tailing,
102// invoke the `Wait` or `Err` method after finishing reading from the
103// `Lines` channel.
104func TailFile(filename string, config Config) (*Tail, error) {
105	if config.ReOpen && !config.Follow {
106		util.Fatal("cannot set ReOpen without Follow.")
107	}
108
109	t := &Tail{
110		Filename: filename,
111		Lines:    make(chan *Line),
112		Config:   config,
113	}
114
115	// when Logger was not specified in config, use default logger
116	if t.Logger == nil {
117		t.Logger = log.New(os.Stderr, "", log.LstdFlags)
118	}
119
120	if t.Poll {
121		t.watcher = watch.NewPollingFileWatcher(filename)
122	} else {
123		t.watcher = watch.NewInotifyFileWatcher(filename)
124	}
125
126	if t.MustExist {
127		var err error
128		t.file, err = OpenFile(t.Filename)
129		if err != nil {
130			return nil, err
131		}
132	}
133
134	go t.tailFileSync()
135
136	return t, nil
137}
138
139// Return the file's current position, like stdio's ftell().
140// But this value is not very accurate.
141// it may readed one line in the chan(tail.Lines),
142// so it may lost one line.
143func (tail *Tail) Tell() (int64, error) {
144	tail.fileMtx.Lock()
145	f := tail.file
146	tail.fileMtx.Unlock()
147	if f == nil {
148		return 0, os.ErrNotExist
149	}
150	offset, err := f.Seek(0, io.SeekCurrent)
151	if err != nil {
152		return 0, err
153	}
154
155	tail.lk.Lock()
156	defer tail.lk.Unlock()
157	if tail.reader == nil {
158		return 0, nil
159	}
160
161	offset -= int64(tail.reader.Buffered())
162	return offset, nil
163}
164
165// Size returns the length in bytes of the file being tailed,
166// or 0 with an error if there was an error Stat'ing the file.
167func (tail *Tail) Size() (int64, error) {
168	tail.fileMtx.Lock()
169	f := tail.file
170	tail.fileMtx.Unlock()
171	if f == nil {
172		return 0, os.ErrNotExist
173	}
174	fi, err := f.Stat()
175	if err != nil {
176		return 0, err
177	}
178	size := fi.Size()
179	return size, nil
180}
181
182// Stop stops the tailing activity.
183func (tail *Tail) Stop() error {
184	tail.Kill(nil)
185	return tail.Wait()
186}
187
188// StopAtEOF stops tailing as soon as the end of the file is reached.
189func (tail *Tail) StopAtEOF() error {
190	tail.Kill(errStopAtEOF)
191	return tail.Wait()
192}
193
194var errStopAtEOF = errors.New("tail: stop at eof")
195
196func (tail *Tail) close() {
197	close(tail.Lines)
198	tail.closeFile()
199}
200
201func (tail *Tail) closeFile() {
202	tail.fileMtx.Lock()
203	defer tail.fileMtx.Unlock()
204	if tail.file != nil {
205		tail.file.Close()
206		tail.file = nil
207	}
208}
209
210func (tail *Tail) reopen(truncated bool) error {
211
212	// There are cases where the file is reopened so quickly it's still the same file
213	// which causes the poller to hang on an open file handle to a file no longer being written to
214	// and which eventually gets deleted.  Save the current file handle info to make sure we only
215	// start tailing a different file.
216	cf, err := tail.file.Stat()
217	if !truncated && err != nil {
218		log.Print("stat of old file returned, this is not expected and may result in unexpected behavior")
219		// We don't action on this error but are logging it, not expecting to see it happen and not sure if we
220		// need to action on it, cf is checked for nil later on to accommodate this
221	}
222
223	tail.closeFile()
224	for {
225		var err error
226		tail.fileMtx.Lock()
227		tail.file, err = OpenFile(tail.Filename)
228		tail.fileMtx.Unlock()
229		if err != nil {
230			if os.IsNotExist(err) {
231				tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
232				if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
233					if err == tomb.ErrDying {
234						return err
235					}
236					return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
237				}
238				continue
239			}
240			return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
241		}
242
243		// File exists and is opened, get information about it.
244		nf, err := tail.file.Stat()
245		if err != nil {
246			tail.Logger.Print("Failed to stat new file to be tailed, will try to open it again")
247			tail.closeFile()
248			continue
249		}
250
251		// Check to see if we are trying to reopen and tail the exact same file (and it was not truncated).
252		retries := 20
253		if !truncated && cf != nil && os.SameFile(cf, nf) {
254			retries--
255			if retries <= 0 {
256				return errors.New("gave up trying to reopen log file with a different handle")
257			}
258			select {
259			case <-time.After(watch.POLL_DURATION):
260				tail.closeFile()
261				continue
262			case <-tail.Tomb.Dying():
263				return tomb.ErrDying
264			}
265		}
266		break
267	}
268	return nil
269}
270
271func (tail *Tail) readLine() (string, error) {
272	tail.lk.Lock()
273	line, err := tail.reader.ReadString('\n')
274	tail.lk.Unlock()
275	if err != nil {
276		// Note ReadString "returns the data read before the error" in
277		// case of an error, including EOF, so we return it as is. The
278		// caller is expected to process it if err is EOF.
279		return line, err
280	}
281
282	line = strings.TrimRight(line, "\n")
283
284	return line, err
285}
286
287func (tail *Tail) tailFileSync() {
288	defer tail.Done()
289	defer tail.close()
290
291	if !tail.MustExist {
292		// deferred first open, not technically truncated but we don't need to check for changed files
293		err := tail.reopen(true)
294		if err != nil {
295			if err != tomb.ErrDying {
296				tail.Kill(err)
297			}
298			return
299		}
300	}
301
302	// Seek to requested location on first open of the file.
303	if tail.Location != nil {
304		_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
305		tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
306		if err != nil {
307			tail.Killf("Seek error on %s: %s", tail.Filename, err)
308			return
309		}
310	}
311
312	tail.openReader()
313
314	var offset int64
315	var err error
316	oneMoreRun := false
317
318	// Read line by line.
319	for {
320		// do not seek in named pipes
321		if !tail.Pipe {
322			// grab the position in case we need to back up in the event of a half-line
323			offset, err = tail.Tell()
324			if err != nil {
325				tail.Kill(err)
326				return
327			}
328		}
329
330		line, err := tail.readLine()
331
332		// Process `line` even if err is EOF.
333		if err == nil {
334			cooloff := !tail.sendLine(line)
335			if cooloff {
336				// Wait a second before seeking till the end of
337				// file when rate limit is reached.
338				msg := ("Too much log activity; waiting a second " +
339					"before resuming tailing")
340				tail.Lines <- &Line{msg, time.Now(), errors.New(msg)}
341				select {
342				case <-time.After(time.Second):
343				case <-tail.Dying():
344					return
345				}
346				if err := tail.seekEnd(); err != nil {
347					tail.Kill(err)
348					return
349				}
350			}
351		} else if err == io.EOF {
352			if !tail.Follow {
353				if line != "" {
354					tail.sendLine(line)
355				}
356				return
357			}
358
359			if tail.Follow && line != "" {
360				// this has the potential to never return the last line if
361				// it's not followed by a newline; seems a fair trade here
362				err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
363				if err != nil {
364					tail.Kill(err)
365					return
366				}
367			}
368
369			// oneMoreRun is set true when a file is deleted,
370			// this is to catch events which might get missed in polling mode.
371			// now that the last run is completed, finish deleting the file
372			if oneMoreRun {
373				oneMoreRun = false
374				err = tail.finishDelete()
375				if err != nil {
376					if err != ErrStop {
377						tail.Kill(err)
378					}
379					return
380				}
381			}
382
383			// When EOF is reached, wait for more data to become
384			// available. Wait strategy is based on the `tail.watcher`
385			// implementation (inotify or polling).
386			oneMoreRun, err = tail.waitForChanges()
387			if err != nil {
388				if err != ErrStop {
389					tail.Kill(err)
390				}
391				return
392			}
393		} else {
394			// non-EOF error
395			tail.Killf("Error reading %s: %s", tail.Filename, err)
396			return
397		}
398
399		select {
400		case <-tail.Dying():
401			if tail.Err() == errStopAtEOF {
402				continue
403			}
404			return
405		default:
406		}
407	}
408}
409
410// waitForChanges waits until the file has been appended, deleted,
411// moved or truncated. When moved or deleted - the file will be
412// reopened if ReOpen is true. Truncated files are always reopened.
413func (tail *Tail) waitForChanges() (bool, error) {
414	if tail.changes == nil {
415		pos, err := tail.file.Seek(0, io.SeekCurrent)
416		if err != nil {
417			return false, err
418		}
419		tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
420		if err != nil {
421			return false, err
422		}
423	}
424
425	select {
426	case <-tail.changes.Modified:
427		return false, nil
428	case <-tail.changes.Deleted:
429		// In polling mode we could miss events when a file is deleted, so before we give up our file handle
430		// run the poll one more time to catch anything we may have missed since the last poll.
431		return true, nil
432	case <-tail.changes.Truncated:
433		// Always reopen truncated files (Follow is true)
434		tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
435		if err := tail.reopen(true); err != nil {
436			return false, err
437		}
438		tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
439		tail.openReader()
440		return false, nil
441	case <-tail.Dying():
442		return false, ErrStop
443	}
444	panic("unreachable")
445}
446
447func (tail *Tail) finishDelete() error {
448	tail.changes = nil
449	if tail.ReOpen {
450		// XXX: we must not log from a library.
451		tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
452		if err := tail.reopen(false); err != nil {
453			return err
454		}
455		tail.Logger.Printf("Successfully reopened %s", tail.Filename)
456		tail.openReader()
457		return nil
458	} else {
459		tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
460		return ErrStop
461	}
462}
463
464func (tail *Tail) openReader() {
465	if tail.MaxLineSize > 0 {
466		// add 2 to account for newline characters
467		tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
468	} else {
469		tail.reader = bufio.NewReader(tail.file)
470	}
471}
472
473func (tail *Tail) seekEnd() error {
474	return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
475}
476
477func (tail *Tail) seekTo(pos SeekInfo) error {
478	_, err := tail.file.Seek(pos.Offset, pos.Whence)
479	if err != nil {
480		return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
481	}
482	// Reset the read buffer whenever the file is re-seek'ed
483	tail.reader.Reset(tail.file)
484	return nil
485}
486
487// sendLine sends the line(s) to Lines channel, splitting longer lines
488// if necessary. Return false if rate limit is reached.
489func (tail *Tail) sendLine(line string) bool {
490	now := time.Now()
491	lines := []string{line}
492
493	// Split longer lines
494	if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
495		lines = util.PartitionString(line, tail.MaxLineSize)
496	}
497
498	for _, line := range lines {
499		tail.Lines <- &Line{line, now, nil}
500	}
501
502	if tail.Config.RateLimiter != nil {
503		ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
504		if !ok {
505			tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
506				tail.Filename)
507			return false
508		}
509	}
510
511	return true
512}
513
514// Cleanup removes inotify watches added by the tail package. This function is
515// meant to be invoked from a process's exit handler. Linux kernel may not
516// automatically remove inotify watches after the process exits.
517func (tail *Tail) Cleanup() {
518	watch.Cleanup(tail.Filename)
519}
520