1// Copyright (c) 2015 HPE Software Inc. All rights reserved.
2// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
3
4package tail
5
6import (
7	"bufio"
8	"errors"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"log"
13	"os"
14	"sync"
15	"time"
16
17	"github.com/gcla/tail/ratelimiter"
18	"github.com/gcla/tail/util"
19	"github.com/gcla/tail/watch"
20	"gopkg.in/tomb.v1"
21)
22
23var (
24	ErrStop = errors.New("tail should now stop")
25)
26
27type Chunk struct {
28	Text []byte
29	Time time.Time
30	Err  error // Error from tail
31}
32
33// SeekInfo represents arguments to `os.Seek`
34type SeekInfo struct {
35	Offset int64
36	Whence int // os.SEEK_*
37}
38
39type logger interface {
40	Fatal(v ...interface{})
41	Fatalf(format string, v ...interface{})
42	Fatalln(v ...interface{})
43	Panic(v ...interface{})
44	Panicf(format string, v ...interface{})
45	Panicln(v ...interface{})
46	Print(v ...interface{})
47	Printf(format string, v ...interface{})
48	Println(v ...interface{})
49}
50
51// Config is used to specify how a file must be tailed.
52type Config struct {
53	// File-specifc
54	Location    *SeekInfo // Seek to this location before tailing
55	ReOpen      bool      // Reopen recreated files (tail -F)
56	MustExist   bool      // Fail early if the file does not exist
57	Poll        bool      // Poll for file changes instead of using inotify
58	Pipe        bool      // Is a named pipe (mkfifo)
59	RateLimiter *ratelimiter.LeakyBucket
60
61	// Generic IO
62	Follow bool // Continue looking for new lines (tail -f)
63
64	// Logger, when nil, is set to tail.DefaultLogger
65	// To disable logging: set field to tail.DiscardingLogger
66	Logger logger
67}
68
69type Tail struct {
70	Filename string
71	Bytes    chan *Chunk
72	Config
73
74	file   *os.File
75	reader *bufio.Reader
76
77	watcher watch.FileWatcher
78	changes *watch.FileChanges
79
80	tomb.Tomb // provides: Done, Kill, Dying
81
82	lk sync.Mutex
83}
84
85var (
86	// DefaultLogger is used when Config.Logger == nil
87	DefaultLogger = log.New(os.Stderr, "", log.LstdFlags)
88	// DiscardingLogger can be used to disable logging output
89	DiscardingLogger = log.New(ioutil.Discard, "", 0)
90)
91
92// TailFile begins tailing the file. Output stream is made available
93// via the `Tail.Lines` channel. To handle errors during tailing,
94// invoke the `Wait` or `Err` method after finishing reading from the
95// `Lines` channel.
96func TailFile(filename string, config Config) (*Tail, error) {
97	if config.ReOpen && !config.Follow {
98		util.Fatal("cannot set ReOpen without Follow.")
99	}
100
101	t := &Tail{
102		Filename: filename,
103		Bytes:    make(chan *Chunk),
104		Config:   config,
105	}
106
107	// when Logger was not specified in config, use default logger
108	if t.Logger == nil {
109		t.Logger = log.New(os.Stderr, "", log.LstdFlags)
110	}
111
112	if t.Poll {
113		t.watcher = watch.NewPollingFileWatcher(filename)
114	} else {
115		t.watcher = watch.NewInotifyFileWatcher(filename)
116	}
117
118	if t.MustExist {
119		var err error
120		t.file, err = OpenFile(t.Filename)
121		if err != nil {
122			return nil, err
123		}
124	}
125
126	go t.tailFileSync()
127
128	return t, nil
129}
130
131// Return the file's current position, like stdio's ftell().
132// But this value is not very accurate.
133// it may readed one line in the chan(tail.Lines),
134// so it may lost one line.
135func (tail *Tail) Tell() (offset int64, err error) {
136	if tail.file == nil {
137		return
138	}
139	offset, err = tail.file.Seek(0, os.SEEK_CUR)
140	if err != nil {
141		return
142	}
143
144	tail.lk.Lock()
145	defer tail.lk.Unlock()
146	if tail.reader == nil {
147		return
148	}
149
150	offset -= int64(tail.reader.Buffered())
151	return
152}
153
154// Stop stops the tailing activity.
155func (tail *Tail) Stop() error {
156	tail.Kill(nil)
157	return tail.Wait()
158}
159
160// StopAtEOF stops tailing as soon as the end of the file is reached.
161func (tail *Tail) StopAtEOF() error {
162	tail.Kill(errStopAtEOF)
163	return tail.Wait()
164}
165
166var errStopAtEOF = errors.New("tail: stop at eof")
167
168func (tail *Tail) close() {
169	close(tail.Bytes)
170	tail.closeFile()
171}
172
173func (tail *Tail) closeFile() {
174	if tail.file != nil {
175		tail.file.Close()
176		tail.file = nil
177	}
178}
179
180func (tail *Tail) reopen() error {
181	tail.closeFile()
182	for {
183		var err error
184		tail.file, err = OpenFile(tail.Filename)
185		if err != nil {
186			if os.IsNotExist(err) {
187				tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
188				if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil {
189					if err == tomb.ErrDying {
190						return err
191					}
192					return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
193				}
194				continue
195			}
196			return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
197		}
198		break
199	}
200	return nil
201}
202
203func (tail *Tail) readLine() ([]byte, error) {
204	tail.lk.Lock()
205	defer tail.lk.Unlock()
206	b, err := tail.reader.ReadByte()
207	return []byte{b}, err
208}
209
210func (tail *Tail) tailFileSync() {
211	defer tail.Done()
212	defer tail.close()
213
214	if !tail.MustExist {
215		// deferred first open.
216		err := tail.reopen()
217		if err != nil {
218			if err != tomb.ErrDying {
219				tail.Kill(err)
220			}
221			return
222		}
223	}
224
225	// Seek to requested location on first open of the file.
226	if tail.Location != nil {
227		_, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence)
228		tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location)
229		if err != nil {
230			tail.Killf("Seek error on %s: %s", tail.Filename, err)
231			return
232		}
233	}
234
235	tail.openReader()
236
237	var offset int64
238	var err error
239
240	// Read line by line.
241	for {
242		// do not seek in named pipes
243		if !tail.Pipe {
244			// grab the position in case we need to back up in the event of a half-line
245			offset, err = tail.Tell()
246			if err != nil {
247				tail.Kill(err)
248				return
249			}
250		}
251
252		line, err := tail.readLine()
253
254		// Process `line` even if err is EOF.
255		if err == nil {
256			cooloff := !tail.sendChunk(line)
257			if cooloff {
258				// Wait a second before seeking till the end of
259				// file when rate limit is reached.
260				msg := ("Too much log activity; waiting a second " +
261					"before resuming tailing")
262				tail.Bytes <- &Chunk{[]byte(msg), time.Now(), errors.New(msg)}
263				select {
264				case <-time.After(time.Second):
265				case <-tail.Dying():
266					return
267				}
268				if err := tail.seekEnd(); err != nil {
269					tail.Kill(err)
270					return
271				}
272			}
273		} else if err == io.EOF {
274			if !tail.Follow {
275				if len(line) > 0 {
276					tail.sendChunk(line)
277				}
278				return
279			}
280
281			if tail.Follow && len(line) > 0 {
282				// this has the potential to never return the last line if
283				// it's not followed by a newline; seems a fair trade here
284				err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
285				if err != nil {
286					tail.Kill(err)
287					return
288				}
289			}
290
291			// When EOF is reached, wait for more data to become
292			// available. Wait strategy is based on the `tail.watcher`
293			// implementation (inotify or polling).
294			err := tail.waitForChanges()
295			if err != nil {
296				if err != ErrStop {
297					tail.Kill(err)
298				}
299				return
300			}
301		} else {
302			// non-EOF error
303			tail.Killf("Error reading %s: %s", tail.Filename, err)
304			return
305		}
306
307		select {
308		case <-tail.Dying():
309			if tail.Err() == errStopAtEOF {
310				continue
311			}
312			return
313		default:
314		}
315	}
316}
317
318// waitForChanges waits until the file has been appended, deleted,
319// moved or truncated. When moved or deleted - the file will be
320// reopened if ReOpen is true. Truncated files are always reopened.
321func (tail *Tail) waitForChanges() error {
322	if tail.changes == nil {
323		pos, err := tail.file.Seek(0, os.SEEK_CUR)
324		if err != nil {
325			return err
326		}
327		tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
328		if err != nil {
329			return err
330		}
331	}
332
333	select {
334	case <-tail.changes.Modified:
335		return nil
336	case <-tail.changes.Deleted:
337		tail.changes = nil
338		if tail.ReOpen {
339			// XXX: we must not log from a library.
340			tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
341			if err := tail.reopen(); err != nil {
342				return err
343			}
344			tail.Logger.Printf("Successfully reopened %s", tail.Filename)
345			tail.openReader()
346			return nil
347		} else {
348			tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
349			return ErrStop
350		}
351	case <-tail.changes.Truncated:
352		// Always reopen truncated files (Follow is true)
353		tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
354		if err := tail.reopen(); err != nil {
355			return err
356		}
357		tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename)
358		tail.openReader()
359		return nil
360	case <-tail.Dying():
361		return ErrStop
362	}
363	panic("unreachable")
364}
365
366func (tail *Tail) openReader() {
367	tail.reader = bufio.NewReader(tail.file)
368}
369
370func (tail *Tail) seekEnd() error {
371	return tail.seekTo(SeekInfo{Offset: 0, Whence: os.SEEK_END})
372}
373
374func (tail *Tail) seekTo(pos SeekInfo) error {
375	_, err := tail.file.Seek(pos.Offset, pos.Whence)
376	if err != nil {
377		return fmt.Errorf("Seek error on %s: %s", tail.Filename, err)
378	}
379	// Reset the read buffer whenever the file is re-seek'ed
380	tail.reader.Reset(tail.file)
381	return nil
382}
383
384// sendChunk sends the line(s) to Lines channel, splitting longer lines
385// if necessary. Return false if rate limit is reached.
386func (tail *Tail) sendChunk(line []byte) bool {
387	now := time.Now()
388
389	tail.Bytes <- &Chunk{line, now, nil}
390
391	return true
392}
393
394// Cleanup removes inotify watches added by the tail package. This function is
395// meant to be invoked from a process's exit handler. Linux kernel may not
396// automatically remove inotify watches after the process exits.
397func (tail *Tail) Cleanup() {
398	watch.Cleanup(tail.Filename)
399}
400