1# -*- coding: utf-8 -*-
2"""
3    livereload.watcher
4    ~~~~~~~~~~~~~~~~~~
5
6    A file watch management for LiveReload Server.
7
8    :copyright: (c) 2013 - 2015 by Hsiaoming Yang
9    :license: BSD, see LICENSE for more details.
10"""
11
12import glob
13import logging
14import os
15import time
16
17try:
18    import pyinotify
19except ImportError:
20    pyinotify = None
21
22logger = logging.getLogger('livereload')
23
24
25class Watcher(object):
26    """A file watcher registry."""
27    def __init__(self):
28        self._tasks = {}
29
30        # modification time of filepaths for each task,
31        # before and after checking for changes
32        self._task_mtimes = {}
33        self._new_mtimes = {}
34
35        # setting changes
36        self._changes = []
37
38        # filepath that is changed
39        self.filepath = None
40        self._start = time.time()
41
42        # list of ignored dirs
43        self.ignored_dirs = ['.git', '.hg', '.svn', '.cvs']
44
45    def ignore_dirs(self, *args):
46        self.ignored_dirs.extend(args)
47
48    def remove_dirs_from_ignore(self, *args):
49        for a in args:
50            self.ignored_dirs.remove(a)
51
52    def ignore(self, filename):
53        """Ignore a given filename or not."""
54        _, ext = os.path.splitext(filename)
55        return ext in ['.pyc', '.pyo', '.o', '.swp']
56
57    def watch(self, path, func=None, delay=0, ignore=None):
58        """Add a task to watcher.
59
60        :param path: a filepath or directory path or glob pattern
61        :param func: the function to be executed when file changed
62        :param delay: Delay sending the reload message. Use 'forever' to
63                      not send it. This is useful to compile sass files to
64                      css, but reload on changed css files then only.
65        :param ignore: A function return True to ignore a certain pattern of
66                       filepath.
67        """
68        self._tasks[path] = {
69            'func': func,
70            'delay': delay,
71            'ignore': ignore,
72            'mtimes': {},
73        }
74
75    def start(self, callback):
76        """Start the watcher running, calling callback when changes are
77        observed. If this returns False, regular polling will be used."""
78        return False
79
80    def examine(self):
81        """Check if there are changes. If so, run the given task.
82
83        Returns a tuple of modified filepath and reload delay.
84        """
85        if self._changes:
86            return self._changes.pop()
87
88        # clean filepath
89        self.filepath = None
90        delays = set()
91        for path in self._tasks:
92            item = self._tasks[path]
93            self._task_mtimes = item['mtimes']
94            if self.is_changed(path, item['ignore']):
95                func = item['func']
96                delay = item['delay']
97                if delay and isinstance(delay, float):
98                    delays.add(delay)
99                if func:
100                    name = getattr(func, 'name', None)
101                    if not name:
102                        name = getattr(func, '__name__', 'anonymous')
103                    logger.info(
104                        "Running task: {} (delay: {})".format(name, delay))
105                    func()
106
107        if delays:
108            delay = max(delays)
109        else:
110            delay = None
111        return self.filepath, delay
112
113    def is_changed(self, path, ignore=None):
114        """Check if any filepaths have been added, modified, or removed.
115
116        Updates filepath modification times in self._task_mtimes.
117        """
118        self._new_mtimes = {}
119        changed = False
120
121        if os.path.isfile(path):
122            changed = self.is_file_changed(path, ignore)
123        elif os.path.isdir(path):
124            changed = self.is_folder_changed(path, ignore)
125        else:
126            changed = self.is_glob_changed(path, ignore)
127
128        if not changed:
129            changed = self.is_file_removed()
130
131        self._task_mtimes.update(self._new_mtimes)
132        return changed
133
134    def is_file_removed(self):
135        """Check if any filepaths have been removed since last check.
136
137        Deletes removed paths from self._task_mtimes.
138        Sets self.filepath to one of the removed paths.
139        """
140        removed_paths = set(self._task_mtimes) - set(self._new_mtimes)
141        if not removed_paths:
142            return False
143
144        for path in removed_paths:
145            self._task_mtimes.pop(path)
146            # self.filepath seems purely informational, so setting one
147            # of several removed files seems sufficient
148            self.filepath = path
149        return True
150
151    def is_file_changed(self, path, ignore=None):
152        """Check if filepath has been added or modified since last check.
153
154        Updates filepath modification times in self._new_mtimes.
155        Sets self.filepath to changed path.
156        """
157        if not os.path.isfile(path):
158            return False
159
160        if self.ignore(path):
161            return False
162
163        if ignore and ignore(path):
164            return False
165
166        mtime = os.path.getmtime(path)
167
168        if path not in self._task_mtimes:
169            self._new_mtimes[path] = mtime
170            self.filepath = path
171            return mtime > self._start
172
173        if self._task_mtimes[path] != mtime:
174            self._new_mtimes[path] = mtime
175            self.filepath = path
176            return True
177
178        self._new_mtimes[path] = mtime
179        return False
180
181    def is_folder_changed(self, path, ignore=None):
182        """Check if directory path has any changed filepaths."""
183        for root, dirs, files in os.walk(path, followlinks=True):
184            for d in self.ignored_dirs:
185                if d in dirs:
186                    dirs.remove(d)
187
188            for f in files:
189                if self.is_file_changed(os.path.join(root, f), ignore):
190                    return True
191        return False
192
193    def is_glob_changed(self, path, ignore=None):
194        """Check if glob path has any changed filepaths."""
195        for f in glob.glob(path):
196            if self.is_file_changed(f, ignore):
197                return True
198        return False
199
200
201class INotifyWatcher(Watcher):
202    def __init__(self):
203        Watcher.__init__(self)
204
205        self.wm = pyinotify.WatchManager()
206        self.notifier = None
207        self.callback = None
208
209    def watch(self, path, func=None, delay=None, ignore=None):
210        flag = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY
211        self.wm.add_watch(path, flag, rec=True, do_glob=True, auto_add=True)
212        Watcher.watch(self, path, func, delay, ignore)
213
214    def inotify_event(self, event):
215        self.callback()
216
217    def start(self, callback):
218        if not self.notifier:
219            self.callback = callback
220
221            from tornado import ioloop
222            self.notifier = pyinotify.TornadoAsyncNotifier(
223                self.wm, ioloop.IOLoop.instance(),
224                default_proc_fun=self.inotify_event
225            )
226            callback()
227        return True
228
229
230def get_watcher_class():
231    if pyinotify is None or not hasattr(pyinotify, 'TornadoAsyncNotifier'):
232        return Watcher
233    return INotifyWatcher
234