1package file 2 3import ( 4 "flag" 5 "os" 6 "path/filepath" 7 "time" 8 9 "github.com/bmatcuk/doublestar" 10 "github.com/go-kit/kit/log" 11 "github.com/go-kit/kit/log/level" 12 "github.com/pkg/errors" 13 "github.com/prometheus/common/model" 14 fsnotify "gopkg.in/fsnotify.v1" 15 16 "github.com/grafana/loki/clients/pkg/promtail/api" 17 "github.com/grafana/loki/clients/pkg/promtail/client" 18 "github.com/grafana/loki/clients/pkg/promtail/positions" 19 "github.com/grafana/loki/clients/pkg/promtail/targets/target" 20 21 "github.com/grafana/loki/pkg/util" 22) 23 24const ( 25 FilenameLabel = "filename" 26) 27 28// Config describes behavior for Target 29type Config struct { 30 SyncPeriod time.Duration `yaml:"sync_period"` 31 Stdin bool `yaml:"stdin"` 32} 33 34// RegisterFlags with prefix registers flags where every name is prefixed by 35// prefix. If prefix is a non-empty string, prefix should end with a period. 36func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { 37 f.DurationVar(&cfg.SyncPeriod, prefix+"target.sync-period", 10*time.Second, "Period to resync directories being watched and files being tailed.") 38 f.BoolVar(&cfg.Stdin, prefix+"stdin", false, "Set to true to pipe logs to promtail.") 39} 40 41// RegisterFlags register flags. 42func (cfg *Config) RegisterFlags(flags *flag.FlagSet) { 43 cfg.RegisterFlagsWithPrefix("", flags) 44} 45 46// FileTarget describes a particular set of logs. 47// nolint:revive 48type FileTarget struct { 49 metrics *Metrics 50 logger log.Logger 51 52 handler api.EntryHandler 53 positions positions.Positions 54 labels model.LabelSet 55 discoveredLabels model.LabelSet 56 57 watcher *fsnotify.Watcher 58 watches map[string]struct{} 59 path string 60 quit chan struct{} 61 done chan struct{} 62 63 tails map[string]*tailer 64 65 targetConfig *Config 66} 67 68// NewFileTarget create a new FileTarget. 69func NewFileTarget( 70 metrics *Metrics, 71 logger log.Logger, 72 handler api.EntryHandler, 73 positions positions.Positions, 74 path string, 75 labels model.LabelSet, 76 discoveredLabels model.LabelSet, 77 targetConfig *Config, 78) (*FileTarget, error) { 79 80 watcher, err := fsnotify.NewWatcher() 81 if err != nil { 82 return nil, errors.Wrap(err, "filetarget.fsnotify.NewWatcher") 83 } 84 85 t := &FileTarget{ 86 logger: logger, 87 metrics: metrics, 88 watcher: watcher, 89 path: path, 90 labels: labels, 91 discoveredLabels: discoveredLabels, 92 handler: api.AddLabelsMiddleware(labels).Wrap(handler), 93 positions: positions, 94 quit: make(chan struct{}), 95 done: make(chan struct{}), 96 tails: map[string]*tailer{}, 97 targetConfig: targetConfig, 98 } 99 100 err = t.sync() 101 if err != nil { 102 return nil, errors.Wrap(err, "filetarget.sync") 103 } 104 105 go t.run() 106 return t, nil 107} 108 109// Ready if at least one file is being tailed 110func (t *FileTarget) Ready() bool { 111 return len(t.tails) > 0 112} 113 114// Stop the target. 115func (t *FileTarget) Stop() { 116 close(t.quit) 117 <-t.done 118 t.handler.Stop() 119} 120 121// Type implements a Target 122func (t *FileTarget) Type() target.TargetType { 123 return target.FileTargetType 124} 125 126// DiscoveredLabels implements a Target 127func (t *FileTarget) DiscoveredLabels() model.LabelSet { 128 return t.discoveredLabels 129} 130 131// Labels implements a Target 132func (t *FileTarget) Labels() model.LabelSet { 133 return t.labels 134} 135 136// Details implements a Target 137func (t *FileTarget) Details() interface{} { 138 files := map[string]int64{} 139 for fileName := range t.tails { 140 files[fileName], _ = t.positions.Get(fileName) 141 } 142 return files 143} 144 145func (t *FileTarget) run() { 146 defer func() { 147 util.LogError("closing watcher", t.watcher.Close) 148 for _, v := range t.tails { 149 v.stop() 150 } 151 level.Info(t.logger).Log("msg", "filetarget: watcher closed, tailer stopped, positions saved", "path", t.path) 152 close(t.done) 153 }() 154 155 ticker := time.NewTicker(t.targetConfig.SyncPeriod) 156 157 for { 158 select { 159 case event := <-t.watcher.Events: 160 switch event.Op { 161 case fsnotify.Create: 162 matched, err := doublestar.Match(t.path, event.Name) 163 if err != nil { 164 level.Error(t.logger).Log("msg", "failed to match file", "error", err, "filename", event.Name) 165 continue 166 } 167 if !matched { 168 level.Debug(t.logger).Log("msg", "new file does not match glob", "filename", event.Name) 169 continue 170 } 171 t.startTailing([]string{event.Name}) 172 default: 173 // No-op we only care about Create events 174 } 175 case err := <-t.watcher.Errors: 176 level.Error(t.logger).Log("msg", "error from fswatch", "error", err) 177 case <-ticker.C: 178 err := t.sync() 179 if err != nil { 180 level.Error(t.logger).Log("msg", "error running sync function", "error", err) 181 } 182 case <-t.quit: 183 return 184 } 185 } 186} 187 188func (t *FileTarget) sync() error { 189 190 // Gets current list of files to tail. 191 matches, err := doublestar.Glob(t.path) 192 if err != nil { 193 return errors.Wrap(err, "filetarget.sync.filepath.Glob") 194 } 195 196 if len(matches) == 0 { 197 level.Debug(t.logger).Log("msg", "no files matched requested path, nothing will be tailed", "path", t.path) 198 } 199 200 // Gets absolute path for each pattern. 201 for i := 0; i < len(matches); i++ { 202 if !filepath.IsAbs(matches[i]) { 203 path, err := filepath.Abs(matches[i]) 204 if err != nil { 205 return errors.Wrap(err, "filetarget.sync.filepath.Abs") 206 } 207 matches[i] = path 208 } 209 } 210 211 // Record the size of all the files matched by the Glob pattern. 212 t.reportSize(matches) 213 214 // Get the current unique set of dirs to watch. 215 dirs := map[string]struct{}{} 216 for _, p := range matches { 217 dirs[filepath.Dir(p)] = struct{}{} 218 } 219 220 // Add any directories which are not already being watched. 221 toStartWatching := missing(t.watches, dirs) 222 t.startWatching(toStartWatching) 223 224 // Remove any directories which no longer need watching. 225 toStopWatching := missing(dirs, t.watches) 226 t.stopWatching(toStopWatching) 227 228 // fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves. 229 t.watches = dirs 230 231 // Check if any running tailers have stopped because of errors and remove them from the running list 232 // (They will be restarted in startTailing) 233 t.pruneStoppedTailers() 234 235 // Start tailing all of the matched files if not already doing so. 236 t.startTailing(matches) 237 238 // Stop tailing any files which no longer exist 239 toStopTailing := toStopTailing(matches, t.tails) 240 t.stopTailingAndRemovePosition(toStopTailing) 241 242 return nil 243} 244 245func (t *FileTarget) startWatching(dirs map[string]struct{}) { 246 for dir := range dirs { 247 if _, ok := t.watches[dir]; ok { 248 continue 249 } 250 level.Debug(t.logger).Log("msg", "watching new directory", "directory", dir) 251 if err := t.watcher.Add(dir); err != nil { 252 level.Error(t.logger).Log("msg", "error adding directory to watcher", "error", err) 253 } 254 } 255} 256 257func (t *FileTarget) stopWatching(dirs map[string]struct{}) { 258 for dir := range dirs { 259 if _, ok := t.watches[dir]; !ok { 260 continue 261 } 262 level.Debug(t.logger).Log("msg", "removing directory from watcher", "directory", dir) 263 err := t.watcher.Remove(dir) 264 if err != nil { 265 level.Error(t.logger).Log("msg", " failed to remove directory from watcher", "error", err) 266 } 267 } 268} 269 270func (t *FileTarget) startTailing(ps []string) { 271 for _, p := range ps { 272 if _, ok := t.tails[p]; ok { 273 continue 274 } 275 fi, err := os.Stat(p) 276 if err != nil { 277 level.Error(t.logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", p) 278 continue 279 } 280 if fi.IsDir() { 281 level.Error(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p) 282 continue 283 } 284 level.Debug(t.logger).Log("msg", "tailing new file", "filename", p) 285 tailer, err := newTailer(t.metrics, t.logger, t.handler, t.positions, p) 286 if err != nil { 287 level.Error(t.logger).Log("msg", "failed to start tailer", "error", err, "filename", p) 288 continue 289 } 290 t.tails[p] = tailer 291 } 292} 293 294// stopTailingAndRemovePosition will stop the tailer and remove the positions entry. 295// Call this when a file no longer exists and you want to remove all traces of it. 296func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { 297 for _, p := range ps { 298 if tailer, ok := t.tails[p]; ok { 299 tailer.stop() 300 t.positions.Remove(tailer.path) 301 delete(t.tails, p) 302 } 303 if h, ok := t.handler.(api.InstrumentedEntryHandler); ok { 304 h.UnregisterLatencyMetric(model.LabelSet{model.LabelName(client.LatencyLabel): model.LabelValue(p)}) 305 } 306 } 307} 308 309// pruneStoppedTailers removes any tailers which have stopped running from 310// the list of active tailers. This allows them to be restarted if there were errors. 311func (t *FileTarget) pruneStoppedTailers() { 312 toRemove := make([]string, 0, len(t.tails)) 313 for k, t := range t.tails { 314 if !t.isRunning() { 315 toRemove = append(toRemove, k) 316 } 317 } 318 for _, tr := range toRemove { 319 delete(t.tails, tr) 320 } 321} 322 323func toStopTailing(nt []string, et map[string]*tailer) []string { 324 // Make a set of all existing tails 325 existingTails := make(map[string]struct{}, len(et)) 326 for file := range et { 327 existingTails[file] = struct{}{} 328 } 329 // Make a set of what we are about to start tailing 330 newTails := make(map[string]struct{}, len(nt)) 331 for _, p := range nt { 332 newTails[p] = struct{}{} 333 } 334 // Find the tails in our existing which are not in the new, these need to be stopped! 335 ts := missing(newTails, existingTails) 336 ta := make([]string, len(ts)) 337 i := 0 338 for t := range ts { 339 ta[i] = t 340 i++ 341 } 342 return ta 343} 344 345func (t *FileTarget) reportSize(ms []string) { 346 for _, m := range ms { 347 // Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync 348 if tailer, ok := t.tails[m]; ok { 349 err := tailer.markPositionAndSize() 350 if err != nil { 351 level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err) 352 return 353 } 354 } else { 355 // Must be a new file, just directly read the size of it 356 fi, err := os.Stat(m) 357 if err != nil { 358 // If the file was deleted between when the glob match and here, 359 // we just ignore recording a size for it, 360 // the tail code will also check if the file exists before creating a tailer. 361 return 362 } 363 t.metrics.totalBytes.WithLabelValues(m).Set(float64(fi.Size())) 364 } 365 366 } 367} 368 369// Returns the elements from set b which are missing from set a 370func missing(as map[string]struct{}, bs map[string]struct{}) map[string]struct{} { 371 c := map[string]struct{}{} 372 for a := range bs { 373 if _, ok := as[a]; !ok { 374 c[a] = struct{}{} 375 } 376 } 377 return c 378} 379