1# -*- coding: utf-8 -*-
2"""Import hooks for importing xonsh source files.
3
4This module registers the hooks it defines when it is imported.
5"""
6import os
7import re
8import sys
9import types
10import builtins
11import contextlib
12import importlib
13from importlib.machinery import ModuleSpec
14from importlib.abc import MetaPathFinder, SourceLoader, Loader
15
16from xonsh.events import events
17from xonsh.execer import Execer
18from xonsh.platform import scandir
19from xonsh.lazyasd import lazyobject
20
21
22@lazyobject
23def ENCODING_LINE():
24    # this regex comes from PEP 263
25    # https://www.python.org/dev/peps/pep-0263/#defining-the-encoding
26    return re.compile(b"^[ tv]*#.*?coding[:=][ t]*([-_.a-zA-Z0-9]+)")
27
28
29def find_source_encoding(src):
30    """Finds the source encoding given bytes representing a file. If
31    no encoding is found, UTF-8 will be returned as per the docs
32    https://docs.python.org/3/howto/unicode.html#unicode-literals-in-python-source-code
33    """
34    utf8 = "UTF-8"
35    first, _, rest = src.partition(b"\n")
36    m = ENCODING_LINE.match(first)
37    if m is not None:
38        return m.group(1).decode(utf8)
39    second, _, _ = rest.partition(b"\n")
40    m = ENCODING_LINE.match(second)
41    if m is not None:
42        return m.group(1).decode(utf8)
43    return utf8
44
45
46class XonshImportHook(MetaPathFinder, SourceLoader):
47    """Implements the import hook for xonsh source files."""
48
49    def __init__(self, *args, **kwargs):
50        super(XonshImportHook, self).__init__(*args, **kwargs)
51        self._filenames = {}
52        self._execer = None
53
54    @property
55    def execer(self):
56        if hasattr(builtins, "__xonsh_execer__"):
57            execer = builtins.__xonsh_execer__
58            if self._execer is not None:
59                self._execer = None
60        elif self._execer is None:
61            self._execer = execer = Execer(unload=False)
62        else:
63            execer = self._execer
64        return execer
65
66    #
67    # MetaPathFinder methods
68    #
69    def find_spec(self, fullname, path, target=None):
70        """Finds the spec for a xonsh module if it exists."""
71        dot = "."
72        spec = None
73        path = sys.path if path is None else path
74        if dot not in fullname and dot not in path:
75            path = [dot] + path
76        name = fullname.rsplit(dot, 1)[-1]
77        fname = name + ".xsh"
78        for p in path:
79            if not isinstance(p, str):
80                continue
81            if not os.path.isdir(p) or not os.access(p, os.R_OK):
82                continue
83            if fname not in (x.name for x in scandir(p)):
84                continue
85            spec = ModuleSpec(fullname, self)
86            self._filenames[fullname] = os.path.join(p, fname)
87            break
88        return spec
89
90    #
91    # SourceLoader methods
92    #
93    def create_module(self, spec):
94        """Create a xonsh module with the appropriate attributes."""
95        mod = types.ModuleType(spec.name)
96        mod.__file__ = self.get_filename(spec.name)
97        mod.__loader__ = self
98        mod.__package__ = spec.parent or ""
99        return mod
100
101    def get_filename(self, fullname):
102        """Returns the filename for a module's fullname."""
103        return self._filenames[fullname]
104
105    def get_data(self, path):
106        """Gets the bytes for a path."""
107        raise NotImplementedError
108
109    def get_code(self, fullname):
110        """Gets the code object for a xonsh file."""
111        filename = self.get_filename(fullname)
112        if filename is None:
113            msg = "xonsh file {0!r} could not be found".format(fullname)
114            raise ImportError(msg)
115        with open(filename, "rb") as f:
116            src = f.read()
117        enc = find_source_encoding(src)
118        src = src.decode(encoding=enc)
119        src = src if src.endswith("\n") else src + "\n"
120        execer = self.execer
121        execer.filename = filename
122        ctx = {}  # dummy for modules
123        code = execer.compile(src, glbs=ctx, locs=ctx)
124        return code
125
126
127#
128# Import events
129#
130events.doc(
131    "on_import_pre_find_spec",
132    """
133on_import_pre_find_spec(fullname: str, path: str, target: module or None) -> None
134
135Fires before any import find_spec() calls have been executed. The parameters
136here are the same as importlib.abc.MetaPathFinder.find_spec(). Namely,
137
138:``fullname``: The full name of the module to import.
139:``path``: None if a top-level import, otherwise the ``__path__`` of the parent
140          package.
141:``target``: Target module used to make a better guess about the package spec.
142""",
143)
144
145events.doc(
146    "on_import_post_find_spec",
147    """
148on_import_post_find_spec(spec, fullname, path, target) -> None
149
150Fires after all import find_spec() calls have been executed. The parameters
151here the spec and the arguments importlib.abc.MetaPathFinder.find_spec(). Namely,
152
153:``spec``: A ModuleSpec object if the spec was found, or None if it was not.
154:``fullname``: The full name of the module to import.
155:``path``: None if a top-level import, otherwise the ``__path__`` of the parent
156          package.
157:``target``: Target module used to make a better guess about the package spec.
158""",
159)
160
161events.doc(
162    "on_import_pre_create_module",
163    """
164on_import_pre_create_module(spec: ModuleSpec) -> None
165
166Fires right before a module is created by its loader. The only parameter
167is the spec object. See importlib for more details.
168""",
169)
170
171events.doc(
172    "on_import_post_create_module",
173    """
174on_import_post_create_module(module: Module, spec: ModuleSpec) -> None
175
176Fires after a module is created by its loader but before the loader returns it.
177The parameters here are the module object itself and the spec object.
178See importlib for more details.
179""",
180)
181
182events.doc(
183    "on_import_pre_exec_module",
184    """
185on_import_pre_exec_module(module: Module) -> None
186
187Fires right before a module is executed by its loader. The only parameter
188is the module itself. See importlib for more details.
189""",
190)
191
192events.doc(
193    "on_import_post_exec_module",
194    """
195on_import_post_create_module(module: Module) -> None
196
197Fires after a module is executed by its loader but before the loader returns it.
198The only parameter is the module itself. See importlib for more details.
199""",
200)
201
202
203def _should_dispatch_xonsh_import_event_loader():
204    """Figures out if we should dispatch to a load event"""
205    return (
206        len(events.on_import_pre_create_module) > 0
207        or len(events.on_import_post_create_module) > 0
208        or len(events.on_import_pre_exec_module) > 0
209        or len(events.on_import_post_exec_module) > 0
210    )
211
212
213class XonshImportEventHook(MetaPathFinder):
214    """Implements the import hook for firing xonsh events on import."""
215
216    def __init__(self, *args, **kwargs):
217        super().__init__(*args, **kwargs)
218        self._fullname_stack = []
219
220    @contextlib.contextmanager
221    def append_stack(self, fullname):
222        """A context manager for appending and then removing a name from the
223        fullname stack.
224        """
225        self._fullname_stack.append(fullname)
226        yield
227        del self._fullname_stack[-1]
228
229    #
230    # MetaPathFinder methods
231    #
232    def find_spec(self, fullname, path, target=None):
233        """Finds the spec for a xonsh module if it exists."""
234        if fullname in reversed(self._fullname_stack):
235            # don't execute if we are already in the stack.
236            return None
237        npre = len(events.on_import_pre_find_spec)
238        npost = len(events.on_import_post_find_spec)
239        dispatch_load = _should_dispatch_xonsh_import_event_loader()
240        if npre > 0:
241            events.on_import_pre_find_spec.fire(
242                fullname=fullname, path=path, target=target
243            )
244        elif npost == 0 and not dispatch_load:
245            # no events to fire, proceed normally and prevent recursion
246            return None
247        # now find the spec
248        with self.append_stack(fullname):
249            spec = importlib.util.find_spec(fullname)
250        # fire post event
251        if npost > 0:
252            events.on_import_post_find_spec.fire(
253                spec=spec, fullname=fullname, path=path, target=target
254            )
255        if dispatch_load and spec is not None and hasattr(spec.loader, "create_module"):
256            spec.loader = XonshImportEventLoader(spec.loader)
257        return spec
258
259
260class XonshImportEventLoader(Loader):
261    """A class that dispatches loader calls to another loader and fires relevant
262    xonsh events.
263    """
264
265    def __init__(self, loader):
266        self.loader = loader
267
268    #
269    # Loader methods
270    #
271    def create_module(self, spec):
272        """Creates and returns the module object."""
273        events.on_import_pre_create_module.fire(spec=spec)
274        mod = self.loader.create_module(spec)
275        events.on_import_post_create_module.fire(module=mod, spec=spec)
276        return mod
277
278    def exec_module(self, module):
279        """Executes the module in its own namespace."""
280        events.on_import_pre_exec_module.fire(module=module)
281        rtn = self.loader.exec_module(module)
282        events.on_import_post_exec_module.fire(module=module)
283        return rtn
284
285    def load_module(self, fullname):
286        """Legacy module loading, provided for backwards compatibility."""
287        return self.loader.load_module(fullname)
288
289    def module_repr(self, module):
290        """Legacy module repr, provided for backwards compatibility."""
291        return self.loader.module_repr(module)
292
293
294def install_import_hooks():
295    """
296    Install Xonsh import hooks in ``sys.meta_path`` in order for ``.xsh`` files
297    to be importable and import events to be fired.
298
299    Can safely be called many times, will be no-op if xonsh import hooks are
300    already present.
301    """
302    found_imp = found_event = False
303    for hook in sys.meta_path:
304        if isinstance(hook, XonshImportHook):
305            found_imp = True
306        elif isinstance(hook, XonshImportEventHook):
307            found_event = True
308    if not found_imp:
309        sys.meta_path.append(XonshImportHook())
310    if not found_event:
311        sys.meta_path.insert(0, XonshImportEventHook())
312
313
314# alias to deprecated name
315install_hook = install_import_hooks
316