1// Copyright (c) 2015 HPE Software Inc. All rights reserved.
2// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
3
4package tail
5
6import (
7	"bufio"
8	"errors"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"log"
13	"os"
14	"strings"
15	"sync"
16	"time"
17
18	"github.com/hpcloud/tail/ratelimiter"
19	"github.com/hpcloud/tail/util"
20	"github.com/hpcloud/tail/watch"
21	"gopkg.in/tomb.v1"
22)
23
24var (
25	ErrStop = fmt.Errorf("tail should now stop")
26)
27
28type Line struct {
29	Text string
30	Time time.Time
31	Err  error // Error from tail
32}
33
34// NewLine returns a Line with present time.
35func NewLine(text string) *Line {
36	return &Line{text, time.Now(), nil}
37}
38
39// SeekInfo represents arguments to `os.Seek`
40type SeekInfo struct {
41	Offset int64
42	Whence int // os.SEEK_*
43}
44
45type logger interface {
46	Fatal(v ...interface{})
47	Fatalf(format string, v ...interface{})
48	Fatalln(v ...interface{})
49	Panic(v ...interface{})
50	Panicf(format string, v ...interface{})
51	Panicln(v ...interface{})
52	Print(v ...interface{})
53	Printf(format string, v ...interface{})
54	Println(v ...interface{})
55}
56
57// Config is used to specify how a file must be tailed.
58type Config struct {
59	// File-specifc
60	Location    *SeekInfo // Seek to this location before tailing
61	ReOpen      bool      // Reopen recreated files (tail -F)
62	MustExist   bool      // Fail early if the file does not exist
63	Poll        bool      // Poll for file changes instead of using inotify
64	Pipe        bool      // Is a named pipe (mkfifo)
65	RateLimiter *ratelimiter.LeakyBucket
66
67	// Generic IO
68	Follow      bool // Continue looking for new lines (tail -f)
69	MaxLineSize int  // If non-zero, split longer lines into multiple lines
70
71	// Logger, when nil, is set to tail.DefaultLogger
72	// To disable logging: set field to tail.DiscardingLogger
73	Logger logger
74}
75
76type Tail struct {
77	Filename string
78	Lines    chan *Line
79	Config
80
81	file   *os.File
82	reader *bufio.Reader
83
84	watcher watch.FileWatcher
85	changes *watch.FileChanges
86
87	tomb.Tomb // provides: Done, Kill, Dying
88
89	lk sync.Mutex
90}
91
92var (
93	// DefaultLogger is used when Config.Logger == nil
94	DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
95	// DiscardingLogger can be used to disable logging output
96	DiscardingLogger = log.New(ioutil.Discard, "", 0)
97)
98
99// TailFile begins tailing the file. Output stream is made available
100// via the `Tail.Lines` channel. To handle errors during tailing,
101// invoke the `Wait` or `Err` method after finishing reading from the
102// `Lines` channel.
103func TailFile(filename string, config Config) (*Tail, error) {
104	if config.ReOpen && !config.Follow {
105		util.Fatal("cannot set ReOpen without Follow.")
106	}
107
108	t := &Tail{
109		Filename: filename,
110		Lines:    make(chan *Line),
111		Config:   config,
112	}
113
114	// when Logger was not specified in config, use default logger
115	if t.Logger == nil {
116		t.Logger = log.New(os.Stderr, "", log.LstdFlags)
117	}
118
119	if t.Poll {
120		t.watcher = watch.NewPollingFileWatcher(filename)
121	} else {
122		t.watcher = watch.NewInotifyFileWatcher(filename)
123	}
124
125	if t.MustExist {
126		var err error
127		t.file, err = OpenFile(t.Filename)
128		if err != nil {
129			return nil, err
130		}
131	}
132
133	go t.tailFileSync()
134
135	return t, nil
136}
137
138// Return the file's current position, like stdio's ftell().
139// But this value is not very accurate.
140// it may readed one line in the chan(tail.Lines),
141// so it may lost one line.
142func (tail *Tail) Tell() (offset int64, err error) {
143	if tail.file == nil {
144		return
145	}
146	offset, err = tail.file.Seek(0, os.SEEK_CUR)
147	if err != nil {
148		return
149	}
150
151	tail.lk.Lock()
152	defer tail.lk.Unlock()
153	if tail.reader == nil {
154		return
155	}
156
157	offset -= int64(tail.reader.Buffered())
158	return
159}
160
161// Stop stops the tailing activity.
162func (tail *Tail) Stop() error {
163	tail.Kill(nil)
164	return tail.Wait()
165}
166
167// StopAtEOF stops tailing as soon as the end of the file is reached.
168func (tail *Tail) StopAtEOF() error {
169	tail.Kill(errStopAtEOF)
170	return tail.Wait()
171}
172
173var errStopAtEOF = errors.New("tail: stop at eof")
174
175func (tail *Tail) close() {
176	close(tail.Lines)
177	tail.closeFile()
178}
179
180func (tail *Tail) closeFile() {
181	if tail.file != nil {
182		tail.file.Close()
183		tail.file = nil
184	}
185}
186
187func (tail *Tail) reopen() error {
188	tail.closeFile()
189	for {
190		var err error
191		tail.file, err = OpenFile(tail.Filename)
192		if err != nil {
193			if os.IsNotExist(err) {
194				tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
195				if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
196					if err == tomb.ErrDying {
197						return err
198					}
199					return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
200				}
201				continue
202			}
203			return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
204		}
205		break
206	}
207	return nil
208}
209
210func (tail *Tail) readLine() (string, error) {
211	tail.lk.Lock()
212	line, err := tail.reader.ReadString('\n')
213	tail.lk.Unlock()
214	if err != nil {
215		// Note ReadString "returns the data read before the error" in
216		// case of an error, including EOF, so we return it as is. The
217		// caller is expected to process it if err is EOF.
218		return line, err
219	}
220
221	line = strings.TrimRight(line, "\n")
222
223	return line, err
224}
225
226func (tail *Tail) tailFileSync() {
227	defer tail.Done()
228	defer tail.close()
229
230	if !tail.MustExist {
231		// deferred first open.
232		err := tail.reopen()
233		if err != nil {
234			if err != tomb.ErrDying {
235				tail.Kill(err)
236			}
237			return
238		}
239	}
240
241	// Seek to requested location on first open of the file.
242	if tail.Location != nil {
243		_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
244		tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
245		if err != nil {
246			tail.Killf("Seek error on %s: %s", tail.Filename, err)
247			return
248		}
249	}
250
251	tail.openReader()
252
253	var offset int64 = 0
254	var err error
255
256	// Read line by line.
257	for {
258		// do not seek in named pipes
259		if !tail.Pipe {
260			// grab the position in case we need to back up in the event of a half-line
261			offset, err = tail.Tell()
262			if err != nil {
263				tail.Kill(err)
264				return
265			}
266		}
267
268		line, err := tail.readLine()
269
270		// Process `line` even if err is EOF.
271		if err == nil {
272			cooloff := !tail.sendLine(line)
273			if cooloff {
274				// Wait a second before seeking till the end of
275				// file when rate limit is reached.
276				msg := fmt.Sprintf(
277					"Too much log activity; waiting a second " +
278						"before resuming tailing")
279				tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)}
280				select {
281				case <-time.After(time.Second):
282				case <-tail.Dying():
283					return
284				}
285				if err := tail.seekEnd(); err != nil {
286					tail.Kill(err)
287					return
288				}
289			}
290		} else if err == io.EOF {
291			if !tail.Follow {
292				if line != "" {
293					tail.sendLine(line)
294				}
295				return
296			}
297
298			if tail.Follow && line != "" {
299				// this has the potential to never return the last line if
300				// it's not followed by a newline; seems a fair trade here
301				err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
302				if err != nil {
303					tail.Kill(err)
304					return
305				}
306			}
307
308			// When EOF is reached, wait for more data to become
309			// available. Wait strategy is based on the `tail.watcher`
310			// implementation (inotify or polling).
311			err := tail.waitForChanges()
312			if err != nil {
313				if err != ErrStop {
314					tail.Kill(err)
315				}
316				return
317			}
318		} else {
319			// non-EOF error
320			tail.Killf("Error reading %s: %s", tail.Filename, err)
321			return
322		}
323
324		select {
325		case <-tail.Dying():
326			if tail.Err() == errStopAtEOF {
327				continue
328			}
329			return
330		default:
331		}
332	}
333}
334
335// waitForChanges waits until the file has been appended, deleted,
336// moved or truncated. When moved or deleted - the file will be
337// reopened if ReOpen is true. Truncated files are always reopened.
338func (tail *Tail) waitForChanges() error {
339	if tail.changes == nil {
340		pos, err := tail.file.Seek(0, os.SEEK_CUR)
341		if err != nil {
342			return err
343		}
344		tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
345		if err != nil {
346			return err
347		}
348	}
349
350	select {
351	case <-tail.changes.Modified:
352		return nil
353	case <-tail.changes.Deleted:
354		tail.changes = nil
355		if tail.ReOpen {
356			// XXX: we must not log from a library.
357			tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
358			if err := tail.reopen(); err != nil {
359				return err
360			}
361			tail.Logger.Printf("Successfully reopened %s", tail.Filename)
362			tail.openReader()
363			return nil
364		} else {
365			tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
366			return ErrStop
367		}
368	case <-tail.changes.Truncated:
369		// Always reopen truncated files (Follow is true)
370		tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
371		if err := tail.reopen(); err != nil {
372			return err
373		}
374		tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
375		tail.openReader()
376		return nil
377	case <-tail.Dying():
378		return ErrStop
379	}
380	panic("unreachable")
381}
382
383func (tail *Tail) openReader() {
384	if tail.MaxLineSize > 0 {
385		// add 2 to account for newline characters
386		tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2)
387	} else {
388		tail.reader = bufio.NewReader(tail.file)
389	}
390}
391
392func (tail *Tail) seekEnd() error {
393	return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
394}
395
396func (tail *Tail) seekTo(pos SeekInfo) error {
397	_, err := tail.file.Seek(pos.Offset, pos.Whence)
398	if err != nil {
399		return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
400	}
401	// Reset the read buffer whenever the file is re-seek'ed
402	tail.reader.Reset(tail.file)
403	return nil
404}
405
406// sendLine sends the line(s) to Lines channel, splitting longer lines
407// if necessary. Return false if rate limit is reached.
408func (tail *Tail) sendLine(line string) bool {
409	now := time.Now()
410	lines := []string{line}
411
412	// Split longer lines
413	if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
414		lines = util.PartitionString(line, tail.MaxLineSize)
415	}
416
417	for _, line := range lines {
418		tail.Lines <- &Line{line, now, nil}
419	}
420
421	if tail.Config.RateLimiter != nil {
422		ok := tail.Config.RateLimiter.Pour(uint16(len(lines)))
423		if !ok {
424			tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n",
425				tail.Filename)
426			return false
427		}
428	}
429
430	return true
431}
432
433// Cleanup removes inotify watches added by the tail package. This function is
434// meant to be invoked from a process's exit handler. Linux kernel may not
435// automatically remove inotify watches after the process exits.
436func (tail *Tail) Cleanup() {
437	watch.Cleanup(tail.Filename)
438}
439