1// Copyright (c) 2015 HPE Software Inc. All rights reserved.
2// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
3
4package tail
5
6import (
7	"bufio"
8	"errors"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"log"
13	"os"
14	"strings"
15	"sync"
16	"time"
17
18	"github.com/nxadm/tail/ratelimiter"
19	"github.com/nxadm/tail/util"
20	"github.com/nxadm/tail/watch"
21	"gopkg.in/tomb.v1"
22)
23
24var (
25	ErrStop = errors.New("tail should now stop")
26)
27
28type Line struct {
29	Text     string
30	Num      int
31	SeekInfo SeekInfo
32	Time     time.Time
33	Err      error // Error from tail
34}
35
36// NewLine returns a Line with present time.
37func NewLine(text string, lineNum int) *Line {
38	return &Line{text, lineNum, SeekInfo{}, time.Now(), nil}
39}
40
41// SeekInfo represents arguments to `io.Seek`
42type SeekInfo struct {
43	Offset int64
44	Whence int // io.Seek*
45}
46
47type logger interface {
48	Fatal(v ...interface{})
49	Fatalf(format string, v ...interface{})
50	Fatalln(v ...interface{})
51	Panic(v ...interface{})
52	Panicf(format string, v ...interface{})
53	Panicln(v ...interface{})
54	Print(v ...interface{})
55	Printf(format string, v ...interface{})
56	Println(v ...interface{})
57}
58
59// Config is used to specify how a file must be tailed.
60type Config struct {
61	// File-specifc
62	Location    *SeekInfo // Seek to this location before tailing
63	ReOpen      bool      // Reopen recreated files (tail -F)
64	MustExist   bool      // Fail early if the file does not exist
65	Poll        bool      // Poll for file changes instead of using inotify
66	Pipe        bool      // Is a named pipe (mkfifo)
67	RateLimiter *ratelimiter.LeakyBucket
68
69	// Generic IO
70	Follow      bool // Continue looking for new lines (tail -f)
71	MaxLineSize int  // If non-zero, split longer lines into multiple lines
72
73	// Logger, when nil, is set to tail.DefaultLogger
74	// To disable logging: set field to tail.DiscardingLogger
75	Logger logger
76}
77
78type Tail struct {
79	Filename string
80	Lines    chan *Line
81	Config
82
83	file    *os.File
84	reader  *bufio.Reader
85	lineNum int
86
87	watcher watch.FileWatcher
88	changes *watch.FileChanges
89
90	tomb.Tomb // provides: Done, Kill, Dying
91
92	lk sync.Mutex
93}
94
95var (
96	// DefaultLogger is used when Config.Logger == nil
97	DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
98	// DiscardingLogger can be used to disable logging output
99	DiscardingLogger = log.New(ioutil.Discard, "", 0)
100)
101
102// TailFile begins tailing the file. Output stream is made available
103// via the `Tail.Lines` channel. To handle errors during tailing,
104// invoke the `Wait` or `Err` method after finishing reading from the
105// `Lines` channel.
106func TailFile(filename string, config Config) (*Tail, error) {
107	if config.ReOpen && !config.Follow {
108		util.Fatal("cannot set ReOpen without Follow.")
109	}
110
111	t := &Tail{
112		Filename: filename,
113		Lines:    make(chan *Line),
114		Config:   config,
115	}
116
117	// when Logger was not specified in config, use default logger
118	if t.Logger == nil {
119		t.Logger = DefaultLogger
120	}
121
122	if t.Poll {
123		t.watcher = watch.NewPollingFileWatcher(filename)
124	} else {
125		t.watcher = watch.NewInotifyFileWatcher(filename)
126	}
127
128	if t.MustExist {
129		var err error
130		t.file, err = OpenFile(t.Filename)
131		if err != nil {
132			return nil, err
133		}
134	}
135
136	go t.tailFileSync()
137
138	return t, nil
139}
140
141// Tell returns the file's current position, like stdio's ftell().
142// But this value is not very accurate.
143// One line from the chan(tail.Lines) may have been read,
144// so it may have lost one line.
145func (tail *Tail) Tell() (offset int64, err error) {
146	if tail.file == nil {
147		return
148	}
149	offset, err = tail.file.Seek(0, io.SeekCurrent)
150	if err != nil {
151		return
152	}
153
154	tail.lk.Lock()
155	defer tail.lk.Unlock()
156	if tail.reader == nil {
157		return
158	}
159
160	offset -= int64(tail.reader.Buffered())
161	return
162}
163
164// Stop stops the tailing activity.
165func (tail *Tail) Stop() error {
166	tail.Kill(nil)
167	return tail.Wait()
168}
169
170// StopAtEOF stops tailing as soon as the end of the file is reached.
171func (tail *Tail) StopAtEOF() error {
172	tail.Kill(errStopAtEOF)
173	return tail.Wait()
174}
175
176var errStopAtEOF = errors.New("tail: stop at eof")
177
178func (tail *Tail) close() {
179	close(tail.Lines)
180	tail.closeFile()
181}
182
183func (tail *Tail) closeFile() {
184	if tail.file != nil {
185		tail.file.Close()
186		tail.file = nil
187	}
188}
189
190func (tail *Tail) reopen() error {
191	tail.closeFile()
192	tail.lineNum = 0
193	for {
194		var err error
195		tail.file, err = OpenFile(tail.Filename)
196		if err != nil {
197			if os.IsNotExist(err) {
198				tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
199				if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
200					if err == tomb.ErrDying {
201						return err
202					}
203					return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
204				}
205				continue
206			}
207			return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
208		}
209		break
210	}
211	return nil
212}
213
214func (tail *Tail) readLine() (string, error) {
215	tail.lk.Lock()
216	line, err := tail.reader.ReadString('\n')
217	tail.lk.Unlock()
218	if err != nil {
219		// Note ReadString "returns the data read before the error" in
220		// case of an error, including EOF, so we return it as is. The
221		// caller is expected to process it if err is EOF.
222		return line, err
223	}
224
225	line = strings.TrimRight(line, "\n")
226
227	return line, err
228}
229
230func (tail *Tail) tailFileSync() {
231	defer tail.Done()
232	defer tail.close()
233
234	if !tail.MustExist {
235		// deferred first open.
236		err := tail.reopen()
237		if err != nil {
238			if err != tomb.ErrDying {
239				tail.Kill(err)
240			}
241			return
242		}
243	}
244
245	// Seek to requested location on first open of the file.
246	if tail.Location != nil {
247		_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
248		if err != nil {
249			tail.Killf("Seek error on %s: %s", tail.Filename, err)
250			return
251		}
252	}
253
254	tail.openReader()
255
256	// Read line by line.
257	for {
258		// do not seek in named pipes
259		if !tail.Pipe {
260			// grab the position in case we need to back up in the event of a half-line
261			if _, err := tail.Tell(); err != nil {
262				tail.Kill(err)
263				return
264			}
265		}
266
267		line, err := tail.readLine()
268
269		// Process `line` even if err is EOF.
270		if err == nil {
271			cooloff := !tail.sendLine(line)
272			if cooloff {
273				// Wait a second before seeking till the end of
274				// file when rate limit is reached.
275				msg := ("Too much log activity; waiting a second before resuming tailing")
276				offset, _ := tail.Tell()
277				tail.Lines <- &Line{msg, tail.lineNum, SeekInfo{Offset: offset}, time.Now(), errors.New(msg)}
278				select {
279				case <-time.After(time.Second):
280				case <-tail.Dying():
281					return
282				}
283				if err := tail.seekEnd(); err != nil {
284					tail.Kill(err)
285					return
286				}
287			}
288		} else if err == io.EOF {
289			if !tail.Follow {
290				if line != "" {
291					tail.sendLine(line)
292				}
293				return
294			}
295
296			if tail.Follow && line != "" {
297				tail.sendLine(line)
298				if err := tail.seekEnd(); err != nil {
299					tail.Kill(err)
300					return
301				}
302			}
303
304			// When EOF is reached, wait for more data to become
305			// available. Wait strategy is based on the `tail.watcher`
306			// implementation (inotify or polling).
307			err := tail.waitForChanges()
308			if err != nil {
309				if err != ErrStop {
310					tail.Kill(err)
311				}
312				return
313			}
314		} else {
315			// non-EOF error
316			tail.Killf("Error reading %s: %s", tail.Filename, err)
317			return
318		}
319
320		select {
321		case <-tail.Dying():
322			if tail.Err() == errStopAtEOF {
323				continue
324			}
325			return
326		default:
327		}
328	}
329}
330
331// waitForChanges waits until the file has been appended, deleted,
332// moved or truncated. When moved or deleted - the file will be
333// reopened if ReOpen is true. Truncated files are always reopened.
334func (tail *Tail) waitForChanges() error {
335	if tail.changes == nil {
336		pos, err := tail.file.Seek(0, io.SeekCurrent)
337		if err != nil {
338			return err
339		}
340		tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
341		if err != nil {
342			return err
343		}
344	}
345
346	select {
347	case <-tail.changes.Modified:
348		return nil
349	case <-tail.changes.Deleted:
350		tail.changes = nil
351		if tail.ReOpen {
352			// XXX: we must not log from a library.
353			tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
354			if err := tail.reopen(); err != nil {
355				return err
356			}
357			tail.Logger.Printf("Successfully reopened %s", tail.Filename)
358			tail.openReader()
359			return nil
360		}
361		tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
362		return ErrStop
363	case <-tail.changes.Truncated:
364		// Always reopen truncated files (Follow is true)
365		tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
366		if err := tail.reopen(); err != nil {
367			return err
368		}
369		tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
370		tail.openReader()
371		return nil
372	case <-tail.Dying():
373		return ErrStop
374	}
375}
376
377func (tail *Tail) openReader() {
378	tail.lk.Lock()
379	if tail.MaxLineSize > 0 {
380		// add 2 to account for newline characters
381		tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
382	} else {
383		tail.reader = bufio.NewReader(tail.file)
384	}
385	tail.lk.Unlock()
386}
387
388func (tail *Tail) seekEnd() error {
389	return tail.seekTo(SeekInfo{Offset: 0, Whence: io.SeekEnd})
390}
391
392func (tail *Tail) seekTo(pos SeekInfo) error {
393	_, err := tail.file.Seek(pos.Offset, pos.Whence)
394	if err != nil {
395		return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
396	}
397	// Reset the read buffer whenever the file is re-seek'ed
398	tail.reader.Reset(tail.file)
399	return nil
400}
401
402// sendLine sends the line(s) to Lines channel, splitting longer lines
403// if necessary. Return false if rate limit is reached.
404func (tail *Tail) sendLine(line string) bool {
405	now := time.Now()
406	lines := []string{line}
407
408	// Split longer lines
409	if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
410		lines = util.PartitionString(line, tail.MaxLineSize)
411	}
412
413	for _, line := range lines {
414		tail.lineNum++
415		offset, _ := tail.Tell()
416		select {
417		case tail.Lines <- &Line{line, tail.lineNum, SeekInfo{Offset: offset}, now, nil}:
418		case <-tail.Dying():
419			return true
420		}
421	}
422
423	if tail.Config.RateLimiter != nil {
424		ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
425		if !ok {
426			tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.",
427				tail.Filename)
428			return false
429		}
430	}
431
432	return true
433}
434
435// Cleanup removes inotify watches added by the tail package. This function is
436// meant to be invoked from a process's exit handler. Linux kernel may not
437// automatically remove inotify watches after the process exits.
438func (tail *Tail) Cleanup() {
439	watch.Cleanup(tail.Filename)
440}
441