1// Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
2// Use of this source code is governed by the MIT license that can be
3// found in the LICENSE file.
4
5// +build darwin,kqueue dragonfly freebsd netbsd openbsd solaris
6
7// watcher_trigger is used for FEN and kqueue which behave similarly:
8// only files and dirs can be watched directly, but not files inside dirs.
9// As a result Create events have to be generated by implementation when
10// after Write event is returned for watched dir, it is rescanned and Create
11// event is returned for new files and these are automatically added
12// to watchlist. In case of removal of watched directory, native system returns
13// events for all files, but for Rename, they also need to be generated.
14// As a result native system works as something like trigger for rescan,
15// but contains additional data about dir in which changes occurred. For files
16// detailed data is returned.
17// Usage of watcher_trigger requires:
18// - trigger implementation,
19// - encode func,
20// - not2nat, nat2not maps.
21// Required manual operations on filesystem can lead to loss of precision.
22
23package notify
24
25import (
26	"fmt"
27	"os"
28	"path/filepath"
29	"strings"
30	"sync"
31	"syscall"
32)
33
34// trigger is to be implemented by platform implementation like FEN or kqueue.
35type trigger interface {
36	// Close closes watcher's main native file descriptor.
37	Close() error
38	// Stop waiting for new events.
39	Stop() error
40	// Create new instance of watched.
41	NewWatched(string, os.FileInfo) (*watched, error)
42	// Record internally new *watched instance.
43	Record(*watched)
44	// Del removes internal copy of *watched instance.
45	Del(*watched)
46	// Watched returns *watched instance and native events for native type.
47	Watched(interface{}) (*watched, int64, error)
48	// Init initializes native watcher call.
49	Init() error
50	// Watch starts watching provided file/dir.
51	Watch(os.FileInfo, *watched, int64) error
52	// Unwatch stops watching provided file/dir.
53	Unwatch(*watched) error
54	// Wait for new events.
55	Wait() (interface{}, error)
56	// IsStop checks if Wait finished because of request watcher's stop.
57	IsStop(n interface{}, err error) bool
58}
59
60// trgWatched is a the base data structure representing watched file/directory.
61// The platform specific full data structure (watched) must embed this type.
62type trgWatched struct {
63	// p is a path to watched file/directory.
64	p string
65	// fi provides information about watched file/dir.
66	fi os.FileInfo
67	// eDir represents events watched directly.
68	eDir Event
69	// eNonDir represents events watched indirectly.
70	eNonDir Event
71}
72
73// encode Event to native representation. Implementation is to be provided by
74// platform specific implementation.
75var encode func(Event, bool) int64
76
77var (
78	// nat2not matches native events to notify's ones. To be initialized by
79	// platform dependent implementation.
80	nat2not map[Event]Event
81	// not2nat matches notify's events to native ones. To be initialized by
82	// platform dependent implementation.
83	not2nat map[Event]Event
84)
85
86// trg is a main structure implementing watcher.
87type trg struct {
88	sync.Mutex
89	// s is a channel used to stop monitoring.
90	s chan struct{}
91	// c is a channel used to pass events further.
92	c chan<- EventInfo
93	// pthLkp is a data structure mapping file names with data about watching
94	// represented by them files/directories.
95	pthLkp map[string]*watched
96	// t is a platform dependent implementation of trigger.
97	t trigger
98}
99
100// newWatcher returns new watcher's implementation.
101func newWatcher(c chan<- EventInfo) watcher {
102	t := &trg{
103		s:      make(chan struct{}, 1),
104		pthLkp: make(map[string]*watched, 0),
105		c:      c,
106	}
107	t.t = newTrigger(t.pthLkp)
108	if err := t.t.Init(); err != nil {
109		t.Close()
110		return watcherStub{fmt.Errorf("failed setting up watcher: %v", err)}
111	}
112	go t.monitor()
113	return t
114}
115
116// Close implements watcher.
117func (t *trg) Close() (err error) {
118	t.Lock()
119	if err = t.t.Stop(); err != nil {
120		t.Unlock()
121		return
122	}
123	<-t.s
124	var e error
125	for _, w := range t.pthLkp {
126		if e = t.unwatch(w.p, w.fi); e != nil {
127			dbgprintf("trg: unwatch %q failed: %q\n", w.p, e)
128			err = nonil(err, e)
129		}
130	}
131	if e = t.t.Close(); e != nil {
132		dbgprintf("trg: closing native watch failed: %q\n", e)
133		err = nonil(err, e)
134	}
135	if remaining := len(t.pthLkp); remaining != 0 {
136		err = nonil(err, fmt.Errorf("Not all watches were removed: len(t.pthLkp) == %v", len(t.pthLkp)))
137	}
138	t.Unlock()
139	return
140}
141
142// send reported events one by one through chan.
143func (t *trg) send(evn []event) {
144	for i := range evn {
145		t.c <- &evn[i]
146	}
147}
148
149// singlewatch starts to watch given p file/directory.
150func (t *trg) singlewatch(p string, e Event, direct mode, fi os.FileInfo) (err error) {
151	w, ok := t.pthLkp[p]
152	if !ok {
153		if w, err = t.t.NewWatched(p, fi); err != nil {
154			return
155		}
156	}
157	switch direct {
158	case dir:
159		w.eDir |= e
160	case ndir:
161		w.eNonDir |= e
162	case both:
163		w.eDir |= e
164		w.eNonDir |= e
165	}
166	if err = t.t.Watch(fi, w, encode(w.eDir|w.eNonDir, fi.IsDir())); err != nil {
167		return
168	}
169	if !ok {
170		t.t.Record(w)
171		return nil
172	}
173	return errAlreadyWatched
174}
175
176// decode converts event received from native to notify.Event
177// representation taking into account requested events (w).
178func decode(o int64, w Event) (e Event) {
179	for f, n := range nat2not {
180		if o&int64(f) != 0 {
181			if w&f != 0 {
182				e |= f
183			}
184			if w&n != 0 {
185				e |= n
186			}
187		}
188	}
189
190	return
191}
192
193func (t *trg) watch(p string, e Event, fi os.FileInfo) error {
194	if err := t.singlewatch(p, e, dir, fi); err != nil {
195		if err != errAlreadyWatched {
196			return err
197		}
198	}
199	if fi.IsDir() {
200		err := t.walk(p, func(fi os.FileInfo) (err error) {
201			if err = t.singlewatch(filepath.Join(p, fi.Name()), e, ndir,
202				fi); err != nil {
203				if err != errAlreadyWatched {
204					return
205				}
206			}
207			return nil
208		})
209		if err != nil {
210			return err
211		}
212	}
213	return nil
214}
215
216// walk runs f func on each file/dir from p directory.
217func (t *trg) walk(p string, fn func(os.FileInfo) error) error {
218	fp, err := os.Open(p)
219	if err != nil {
220		return err
221	}
222	ls, err := fp.Readdir(0)
223	fp.Close()
224	if err != nil {
225		return err
226	}
227	for i := range ls {
228		if err := fn(ls[i]); err != nil {
229			return err
230		}
231	}
232	return nil
233}
234
235func (t *trg) unwatch(p string, fi os.FileInfo) error {
236	if fi.IsDir() {
237		err := t.walk(p, func(fi os.FileInfo) error {
238			err := t.singleunwatch(filepath.Join(p, fi.Name()), ndir)
239			if err != errNotWatched {
240				return err
241			}
242			return nil
243		})
244		if err != nil {
245			return err
246		}
247	}
248	return t.singleunwatch(p, dir)
249}
250
251// Watch implements Watcher interface.
252func (t *trg) Watch(p string, e Event) error {
253	fi, err := os.Stat(p)
254	if err != nil {
255		return err
256	}
257	t.Lock()
258	err = t.watch(p, e, fi)
259	t.Unlock()
260	return err
261}
262
263// Unwatch implements Watcher interface.
264func (t *trg) Unwatch(p string) error {
265	fi, err := os.Stat(p)
266	if err != nil {
267		return err
268	}
269	t.Lock()
270	err = t.unwatch(p, fi)
271	t.Unlock()
272	return err
273}
274
275// Rewatch implements Watcher interface.
276//
277// TODO(rjeczalik): This is a naive hack. Rewrite might help.
278func (t *trg) Rewatch(p string, _, e Event) error {
279	fi, err := os.Stat(p)
280	if err != nil {
281		return err
282	}
283	t.Lock()
284	if err = t.unwatch(p, fi); err == nil {
285		// TODO(rjeczalik): If watch fails then we leave trigger in inconsistent
286		// state. Handle? Panic? Native version of rewatch?
287		err = t.watch(p, e, fi)
288	}
289	t.Unlock()
290	return nil
291}
292
293func (*trg) file(w *watched, n interface{}, e Event) (evn []event) {
294	evn = append(evn, event{w.p, e, w.fi.IsDir(), n})
295	return
296}
297
298func (t *trg) dir(w *watched, n interface{}, e, ge Event) (evn []event) {
299	// If it's dir and delete we have to send it and continue, because
300	// other processing relies on opening (in this case not existing) dir.
301	// Events for contents of this dir are reported by native impl.
302	// However events for rename must be generated for all monitored files
303	// inside of moved directory, because native impl does not report it independently
304	// for each file descriptor being moved in result of move action on
305	// parent directory.
306	if (ge & (not2nat[Rename] | not2nat[Remove])) != 0 {
307		// Write is reported also for Remove on directory. Because of that
308		// we have to filter it out explicitly.
309		evn = append(evn, event{w.p, e & ^Write & ^not2nat[Write], true, n})
310		if ge&not2nat[Rename] != 0 {
311			for p := range t.pthLkp {
312				if strings.HasPrefix(p, w.p+string(os.PathSeparator)) {
313					if err := t.singleunwatch(p, both); err != nil && err != errNotWatched &&
314						!os.IsNotExist(err) {
315						dbgprintf("trg: failed stop watching moved file (%q): %q\n",
316							p, err)
317					}
318					if (w.eDir|w.eNonDir)&(not2nat[Rename]|Rename) != 0 {
319						evn = append(evn, event{
320							p, (w.eDir | w.eNonDir) & e &^ Write &^ not2nat[Write],
321							w.fi.IsDir(), nil,
322						})
323					}
324				}
325			}
326		}
327		t.t.Del(w)
328		return
329	}
330	if (ge & not2nat[Write]) != 0 {
331		switch err := t.walk(w.p, func(fi os.FileInfo) error {
332			p := filepath.Join(w.p, fi.Name())
333			switch err := t.singlewatch(p, w.eDir, ndir, fi); {
334			case os.IsNotExist(err) && ((w.eDir & Remove) != 0):
335				evn = append(evn, event{p, Remove, fi.IsDir(), n})
336			case err == errAlreadyWatched:
337			case err != nil:
338				dbgprintf("trg: watching %q failed: %q", p, err)
339			case (w.eDir & Create) != 0:
340				evn = append(evn, event{p, Create, fi.IsDir(), n})
341			default:
342			}
343			return nil
344		}); {
345		case os.IsNotExist(err):
346			return
347		case err != nil:
348			dbgprintf("trg: dir processing failed: %q", err)
349		default:
350		}
351	}
352	return
353}
354
355type mode uint
356
357const (
358	dir mode = iota
359	ndir
360	both
361)
362
363// unwatch stops watching p file/directory.
364func (t *trg) singleunwatch(p string, direct mode) error {
365	w, ok := t.pthLkp[p]
366	if !ok {
367		return errNotWatched
368	}
369	switch direct {
370	case dir:
371		w.eDir = 0
372	case ndir:
373		w.eNonDir = 0
374	case both:
375		w.eDir, w.eNonDir = 0, 0
376	}
377	if err := t.t.Unwatch(w); err != nil {
378		return err
379	}
380	if w.eNonDir|w.eDir != 0 {
381		mod := dir
382		if w.eNonDir != 0 {
383			mod = ndir
384		}
385		if err := t.singlewatch(p, w.eNonDir|w.eDir, mod,
386			w.fi); err != nil && err != errAlreadyWatched {
387			return err
388		}
389	} else {
390		t.t.Del(w)
391	}
392	return nil
393}
394
395func (t *trg) monitor() {
396	var (
397		n   interface{}
398		err error
399	)
400	for {
401		switch n, err = t.t.Wait(); {
402		case err == syscall.EINTR:
403		case t.t.IsStop(n, err):
404			t.s <- struct{}{}
405			return
406		case err != nil:
407			dbgprintf("trg: failed to read events: %q\n", err)
408		default:
409			t.send(t.process(n))
410		}
411	}
412}
413
414// process event returned by native call.
415func (t *trg) process(n interface{}) (evn []event) {
416	t.Lock()
417	w, ge, err := t.t.Watched(n)
418	if err != nil {
419		t.Unlock()
420		dbgprintf("trg: %v event lookup failed: %q", Event(ge), err)
421		return
422	}
423
424	e := decode(ge, w.eDir|w.eNonDir)
425	if ge&int64(not2nat[Remove]|not2nat[Rename]) == 0 {
426		switch fi, err := os.Stat(w.p); {
427		case err != nil:
428		default:
429			if err = t.t.Watch(fi, w, encode(w.eDir|w.eNonDir, fi.IsDir())); err != nil {
430				dbgprintf("trg: %q is no longer watched: %q", w.p, err)
431				t.t.Del(w)
432			}
433		}
434	}
435	if e == Event(0) && (!w.fi.IsDir() || (ge&int64(not2nat[Write])) == 0) {
436		t.Unlock()
437		return
438	}
439
440	if w.fi.IsDir() {
441		evn = append(evn, t.dir(w, n, e, Event(ge))...)
442	} else {
443		evn = append(evn, t.file(w, n, e)...)
444	}
445	if Event(ge)&(not2nat[Remove]|not2nat[Rename]) != 0 {
446		t.t.Del(w)
447	}
448	t.Unlock()
449	return
450}
451