1package file
2
3import (
4	"flag"
5	"os"
6	"path/filepath"
7	"time"
8
9	"github.com/bmatcuk/doublestar"
10	"github.com/go-kit/kit/log"
11	"github.com/go-kit/kit/log/level"
12	"github.com/pkg/errors"
13	"github.com/prometheus/common/model"
14	fsnotify "gopkg.in/fsnotify.v1"
15
16	"github.com/grafana/loki/clients/pkg/promtail/api"
17	"github.com/grafana/loki/clients/pkg/promtail/client"
18	"github.com/grafana/loki/clients/pkg/promtail/positions"
19	"github.com/grafana/loki/clients/pkg/promtail/targets/target"
20
21	"github.com/grafana/loki/pkg/util"
22)
23
24const (
25	FilenameLabel = "filename"
26)
27
28// Config describes behavior for Target
29type Config struct {
30	SyncPeriod time.Duration `yaml:"sync_period"`
31	Stdin      bool          `yaml:"stdin"`
32}
33
34// RegisterFlags with prefix registers flags where every name is prefixed by
35// prefix. If prefix is a non-empty string, prefix should end with a period.
36func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
37	f.DurationVar(&cfg.SyncPeriod, prefix+"target.sync-period", 10*time.Second, "Period to resync directories being watched and files being tailed.")
38	f.BoolVar(&cfg.Stdin, prefix+"stdin", false, "Set to true to pipe logs to promtail.")
39}
40
41// RegisterFlags register flags.
42func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
43	cfg.RegisterFlagsWithPrefix("", flags)
44}
45
46// FileTarget describes a particular set of logs.
47// nolint:revive
48type FileTarget struct {
49	metrics *Metrics
50	logger  log.Logger
51
52	handler          api.EntryHandler
53	positions        positions.Positions
54	labels           model.LabelSet
55	discoveredLabels model.LabelSet
56
57	watcher *fsnotify.Watcher
58	watches map[string]struct{}
59	path    string
60	quit    chan struct{}
61	done    chan struct{}
62
63	tails map[string]*tailer
64
65	targetConfig *Config
66}
67
68// NewFileTarget create a new FileTarget.
69func NewFileTarget(
70	metrics *Metrics,
71	logger log.Logger,
72	handler api.EntryHandler,
73	positions positions.Positions,
74	path string,
75	labels model.LabelSet,
76	discoveredLabels model.LabelSet,
77	targetConfig *Config,
78) (*FileTarget, error) {
79
80	watcher, err := fsnotify.NewWatcher()
81	if err != nil {
82		return nil, errors.Wrap(err, "filetarget.fsnotify.NewWatcher")
83	}
84
85	t := &FileTarget{
86		logger:           logger,
87		metrics:          metrics,
88		watcher:          watcher,
89		path:             path,
90		labels:           labels,
91		discoveredLabels: discoveredLabels,
92		handler:          api.AddLabelsMiddleware(labels).Wrap(handler),
93		positions:        positions,
94		quit:             make(chan struct{}),
95		done:             make(chan struct{}),
96		tails:            map[string]*tailer{},
97		targetConfig:     targetConfig,
98	}
99
100	err = t.sync()
101	if err != nil {
102		return nil, errors.Wrap(err, "filetarget.sync")
103	}
104
105	go t.run()
106	return t, nil
107}
108
109// Ready if at least one file is being tailed
110func (t *FileTarget) Ready() bool {
111	return len(t.tails) > 0
112}
113
114// Stop the target.
115func (t *FileTarget) Stop() {
116	close(t.quit)
117	<-t.done
118	t.handler.Stop()
119}
120
121// Type implements a Target
122func (t *FileTarget) Type() target.TargetType {
123	return target.FileTargetType
124}
125
126// DiscoveredLabels implements a Target
127func (t *FileTarget) DiscoveredLabels() model.LabelSet {
128	return t.discoveredLabels
129}
130
131// Labels implements a Target
132func (t *FileTarget) Labels() model.LabelSet {
133	return t.labels
134}
135
136// Details implements a Target
137func (t *FileTarget) Details() interface{} {
138	files := map[string]int64{}
139	for fileName := range t.tails {
140		files[fileName], _ = t.positions.Get(fileName)
141	}
142	return files
143}
144
145func (t *FileTarget) run() {
146	defer func() {
147		util.LogError("closing watcher", t.watcher.Close)
148		for _, v := range t.tails {
149			v.stop()
150		}
151		level.Info(t.logger).Log("msg", "filetarget: watcher closed, tailer stopped, positions saved", "path", t.path)
152		close(t.done)
153	}()
154
155	ticker := time.NewTicker(t.targetConfig.SyncPeriod)
156
157	for {
158		select {
159		case event := <-t.watcher.Events:
160			switch event.Op {
161			case fsnotify.Create:
162				matched, err := doublestar.Match(t.path, event.Name)
163				if err != nil {
164					level.Error(t.logger).Log("msg", "failed to match file", "error", err, "filename", event.Name)
165					continue
166				}
167				if !matched {
168					level.Debug(t.logger).Log("msg", "new file does not match glob", "filename", event.Name)
169					continue
170				}
171				t.startTailing([]string{event.Name})
172			default:
173				// No-op we only care about Create events
174			}
175		case err := <-t.watcher.Errors:
176			level.Error(t.logger).Log("msg", "error from fswatch", "error", err)
177		case <-ticker.C:
178			err := t.sync()
179			if err != nil {
180				level.Error(t.logger).Log("msg", "error running sync function", "error", err)
181			}
182		case <-t.quit:
183			return
184		}
185	}
186}
187
188func (t *FileTarget) sync() error {
189
190	// Gets current list of files to tail.
191	matches, err := doublestar.Glob(t.path)
192	if err != nil {
193		return errors.Wrap(err, "filetarget.sync.filepath.Glob")
194	}
195
196	if len(matches) == 0 {
197		level.Debug(t.logger).Log("msg", "no files matched requested path, nothing will be tailed", "path", t.path)
198	}
199
200	// Gets absolute path for each pattern.
201	for i := 0; i < len(matches); i++ {
202		if !filepath.IsAbs(matches[i]) {
203			path, err := filepath.Abs(matches[i])
204			if err != nil {
205				return errors.Wrap(err, "filetarget.sync.filepath.Abs")
206			}
207			matches[i] = path
208		}
209	}
210
211	// Record the size of all the files matched by the Glob pattern.
212	t.reportSize(matches)
213
214	// Get the current unique set of dirs to watch.
215	dirs := map[string]struct{}{}
216	for _, p := range matches {
217		dirs[filepath.Dir(p)] = struct{}{}
218	}
219
220	// Add any directories which are not already being watched.
221	toStartWatching := missing(t.watches, dirs)
222	t.startWatching(toStartWatching)
223
224	// Remove any directories which no longer need watching.
225	toStopWatching := missing(dirs, t.watches)
226	t.stopWatching(toStopWatching)
227
228	// fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves.
229	t.watches = dirs
230
231	// Check if any running tailers have stopped because of errors and remove them from the running list
232	// (They will be restarted in startTailing)
233	t.pruneStoppedTailers()
234
235	// Start tailing all of the matched files if not already doing so.
236	t.startTailing(matches)
237
238	// Stop tailing any files which no longer exist
239	toStopTailing := toStopTailing(matches, t.tails)
240	t.stopTailingAndRemovePosition(toStopTailing)
241
242	return nil
243}
244
245func (t *FileTarget) startWatching(dirs map[string]struct{}) {
246	for dir := range dirs {
247		if _, ok := t.watches[dir]; ok {
248			continue
249		}
250		level.Debug(t.logger).Log("msg", "watching new directory", "directory", dir)
251		if err := t.watcher.Add(dir); err != nil {
252			level.Error(t.logger).Log("msg", "error adding directory to watcher", "error", err)
253		}
254	}
255}
256
257func (t *FileTarget) stopWatching(dirs map[string]struct{}) {
258	for dir := range dirs {
259		if _, ok := t.watches[dir]; !ok {
260			continue
261		}
262		level.Debug(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
263		err := t.watcher.Remove(dir)
264		if err != nil {
265			level.Error(t.logger).Log("msg", " failed to remove directory from watcher", "error", err)
266		}
267	}
268}
269
270func (t *FileTarget) startTailing(ps []string) {
271	for _, p := range ps {
272		if _, ok := t.tails[p]; ok {
273			continue
274		}
275		fi, err := os.Stat(p)
276		if err != nil {
277			level.Error(t.logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", p)
278			continue
279		}
280		if fi.IsDir() {
281			level.Error(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p)
282			continue
283		}
284		level.Debug(t.logger).Log("msg", "tailing new file", "filename", p)
285		tailer, err := newTailer(t.metrics, t.logger, t.handler, t.positions, p)
286		if err != nil {
287			level.Error(t.logger).Log("msg", "failed to start tailer", "error", err, "filename", p)
288			continue
289		}
290		t.tails[p] = tailer
291	}
292}
293
294// stopTailingAndRemovePosition will stop the tailer and remove the positions entry.
295// Call this when a file no longer exists and you want to remove all traces of it.
296func (t *FileTarget) stopTailingAndRemovePosition(ps []string) {
297	for _, p := range ps {
298		if tailer, ok := t.tails[p]; ok {
299			tailer.stop()
300			t.positions.Remove(tailer.path)
301			delete(t.tails, p)
302		}
303		if h, ok := t.handler.(api.InstrumentedEntryHandler); ok {
304			h.UnregisterLatencyMetric(model.LabelSet{model.LabelName(client.LatencyLabel): model.LabelValue(p)})
305		}
306	}
307}
308
309// pruneStoppedTailers removes any tailers which have stopped running from
310// the list of active tailers. This allows them to be restarted if there were errors.
311func (t *FileTarget) pruneStoppedTailers() {
312	toRemove := make([]string, 0, len(t.tails))
313	for k, t := range t.tails {
314		if !t.isRunning() {
315			toRemove = append(toRemove, k)
316		}
317	}
318	for _, tr := range toRemove {
319		delete(t.tails, tr)
320	}
321}
322
323func toStopTailing(nt []string, et map[string]*tailer) []string {
324	// Make a set of all existing tails
325	existingTails := make(map[string]struct{}, len(et))
326	for file := range et {
327		existingTails[file] = struct{}{}
328	}
329	// Make a set of what we are about to start tailing
330	newTails := make(map[string]struct{}, len(nt))
331	for _, p := range nt {
332		newTails[p] = struct{}{}
333	}
334	// Find the tails in our existing which are not in the new, these need to be stopped!
335	ts := missing(newTails, existingTails)
336	ta := make([]string, len(ts))
337	i := 0
338	for t := range ts {
339		ta[i] = t
340		i++
341	}
342	return ta
343}
344
345func (t *FileTarget) reportSize(ms []string) {
346	for _, m := range ms {
347		// Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync
348		if tailer, ok := t.tails[m]; ok {
349			err := tailer.markPositionAndSize()
350			if err != nil {
351				level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err)
352				return
353			}
354		} else {
355			// Must be a new file, just directly read the size of it
356			fi, err := os.Stat(m)
357			if err != nil {
358				// If the file was deleted between when the glob match and here,
359				// we just ignore recording a size for it,
360				// the tail code will also check if the file exists before creating a tailer.
361				return
362			}
363			t.metrics.totalBytes.WithLabelValues(m).Set(float64(fi.Size()))
364		}
365
366	}
367}
368
369// Returns the elements from set b which are missing from set a
370func missing(as map[string]struct{}, bs map[string]struct{}) map[string]struct{} {
371	c := map[string]struct{}{}
372	for a := range bs {
373		if _, ok := as[a]; !ok {
374			c[a] = struct{}{}
375		}
376	}
377	return c
378}
379