1# -*- coding: utf-8 -*- 2"""Import hooks for importing xonsh source files. 3 4This module registers the hooks it defines when it is imported. 5""" 6import os 7import re 8import sys 9import types 10import builtins 11import contextlib 12import importlib 13from importlib.machinery import ModuleSpec 14from importlib.abc import MetaPathFinder, SourceLoader, Loader 15 16from xonsh.events import events 17from xonsh.execer import Execer 18from xonsh.platform import scandir 19from xonsh.lazyasd import lazyobject 20 21 22@lazyobject 23def ENCODING_LINE(): 24 # this regex comes from PEP 263 25 # https://www.python.org/dev/peps/pep-0263/#defining-the-encoding 26 return re.compile(b"^[ tv]*#.*?coding[:=][ t]*([-_.a-zA-Z0-9]+)") 27 28 29def find_source_encoding(src): 30 """Finds the source encoding given bytes representing a file. If 31 no encoding is found, UTF-8 will be returned as per the docs 32 https://docs.python.org/3/howto/unicode.html#unicode-literals-in-python-source-code 33 """ 34 utf8 = "UTF-8" 35 first, _, rest = src.partition(b"\n") 36 m = ENCODING_LINE.match(first) 37 if m is not None: 38 return m.group(1).decode(utf8) 39 second, _, _ = rest.partition(b"\n") 40 m = ENCODING_LINE.match(second) 41 if m is not None: 42 return m.group(1).decode(utf8) 43 return utf8 44 45 46class XonshImportHook(MetaPathFinder, SourceLoader): 47 """Implements the import hook for xonsh source files.""" 48 49 def __init__(self, *args, **kwargs): 50 super(XonshImportHook, self).__init__(*args, **kwargs) 51 self._filenames = {} 52 self._execer = None 53 54 @property 55 def execer(self): 56 if hasattr(builtins, "__xonsh_execer__"): 57 execer = builtins.__xonsh_execer__ 58 if self._execer is not None: 59 self._execer = None 60 elif self._execer is None: 61 self._execer = execer = Execer(unload=False) 62 else: 63 execer = self._execer 64 return execer 65 66 # 67 # MetaPathFinder methods 68 # 69 def find_spec(self, fullname, path, target=None): 70 """Finds the spec for a xonsh module if it exists.""" 71 dot = "." 72 spec = None 73 path = sys.path if path is None else path 74 if dot not in fullname and dot not in path: 75 path = [dot] + path 76 name = fullname.rsplit(dot, 1)[-1] 77 fname = name + ".xsh" 78 for p in path: 79 if not isinstance(p, str): 80 continue 81 if not os.path.isdir(p) or not os.access(p, os.R_OK): 82 continue 83 if fname not in (x.name for x in scandir(p)): 84 continue 85 spec = ModuleSpec(fullname, self) 86 self._filenames[fullname] = os.path.join(p, fname) 87 break 88 return spec 89 90 # 91 # SourceLoader methods 92 # 93 def create_module(self, spec): 94 """Create a xonsh module with the appropriate attributes.""" 95 mod = types.ModuleType(spec.name) 96 mod.__file__ = self.get_filename(spec.name) 97 mod.__loader__ = self 98 mod.__package__ = spec.parent or "" 99 return mod 100 101 def get_filename(self, fullname): 102 """Returns the filename for a module's fullname.""" 103 return self._filenames[fullname] 104 105 def get_data(self, path): 106 """Gets the bytes for a path.""" 107 raise NotImplementedError 108 109 def get_code(self, fullname): 110 """Gets the code object for a xonsh file.""" 111 filename = self.get_filename(fullname) 112 if filename is None: 113 msg = "xonsh file {0!r} could not be found".format(fullname) 114 raise ImportError(msg) 115 with open(filename, "rb") as f: 116 src = f.read() 117 enc = find_source_encoding(src) 118 src = src.decode(encoding=enc) 119 src = src if src.endswith("\n") else src + "\n" 120 execer = self.execer 121 execer.filename = filename 122 ctx = {} # dummy for modules 123 code = execer.compile(src, glbs=ctx, locs=ctx) 124 return code 125 126 127# 128# Import events 129# 130events.doc( 131 "on_import_pre_find_spec", 132 """ 133on_import_pre_find_spec(fullname: str, path: str, target: module or None) -> None 134 135Fires before any import find_spec() calls have been executed. The parameters 136here are the same as importlib.abc.MetaPathFinder.find_spec(). Namely, 137 138:``fullname``: The full name of the module to import. 139:``path``: None if a top-level import, otherwise the ``__path__`` of the parent 140 package. 141:``target``: Target module used to make a better guess about the package spec. 142""", 143) 144 145events.doc( 146 "on_import_post_find_spec", 147 """ 148on_import_post_find_spec(spec, fullname, path, target) -> None 149 150Fires after all import find_spec() calls have been executed. The parameters 151here the spec and the arguments importlib.abc.MetaPathFinder.find_spec(). Namely, 152 153:``spec``: A ModuleSpec object if the spec was found, or None if it was not. 154:``fullname``: The full name of the module to import. 155:``path``: None if a top-level import, otherwise the ``__path__`` of the parent 156 package. 157:``target``: Target module used to make a better guess about the package spec. 158""", 159) 160 161events.doc( 162 "on_import_pre_create_module", 163 """ 164on_import_pre_create_module(spec: ModuleSpec) -> None 165 166Fires right before a module is created by its loader. The only parameter 167is the spec object. See importlib for more details. 168""", 169) 170 171events.doc( 172 "on_import_post_create_module", 173 """ 174on_import_post_create_module(module: Module, spec: ModuleSpec) -> None 175 176Fires after a module is created by its loader but before the loader returns it. 177The parameters here are the module object itself and the spec object. 178See importlib for more details. 179""", 180) 181 182events.doc( 183 "on_import_pre_exec_module", 184 """ 185on_import_pre_exec_module(module: Module) -> None 186 187Fires right before a module is executed by its loader. The only parameter 188is the module itself. See importlib for more details. 189""", 190) 191 192events.doc( 193 "on_import_post_exec_module", 194 """ 195on_import_post_create_module(module: Module) -> None 196 197Fires after a module is executed by its loader but before the loader returns it. 198The only parameter is the module itself. See importlib for more details. 199""", 200) 201 202 203def _should_dispatch_xonsh_import_event_loader(): 204 """Figures out if we should dispatch to a load event""" 205 return ( 206 len(events.on_import_pre_create_module) > 0 207 or len(events.on_import_post_create_module) > 0 208 or len(events.on_import_pre_exec_module) > 0 209 or len(events.on_import_post_exec_module) > 0 210 ) 211 212 213class XonshImportEventHook(MetaPathFinder): 214 """Implements the import hook for firing xonsh events on import.""" 215 216 def __init__(self, *args, **kwargs): 217 super().__init__(*args, **kwargs) 218 self._fullname_stack = [] 219 220 @contextlib.contextmanager 221 def append_stack(self, fullname): 222 """A context manager for appending and then removing a name from the 223 fullname stack. 224 """ 225 self._fullname_stack.append(fullname) 226 yield 227 del self._fullname_stack[-1] 228 229 # 230 # MetaPathFinder methods 231 # 232 def find_spec(self, fullname, path, target=None): 233 """Finds the spec for a xonsh module if it exists.""" 234 if fullname in reversed(self._fullname_stack): 235 # don't execute if we are already in the stack. 236 return None 237 npre = len(events.on_import_pre_find_spec) 238 npost = len(events.on_import_post_find_spec) 239 dispatch_load = _should_dispatch_xonsh_import_event_loader() 240 if npre > 0: 241 events.on_import_pre_find_spec.fire( 242 fullname=fullname, path=path, target=target 243 ) 244 elif npost == 0 and not dispatch_load: 245 # no events to fire, proceed normally and prevent recursion 246 return None 247 # now find the spec 248 with self.append_stack(fullname): 249 spec = importlib.util.find_spec(fullname) 250 # fire post event 251 if npost > 0: 252 events.on_import_post_find_spec.fire( 253 spec=spec, fullname=fullname, path=path, target=target 254 ) 255 if dispatch_load and spec is not None and hasattr(spec.loader, "create_module"): 256 spec.loader = XonshImportEventLoader(spec.loader) 257 return spec 258 259 260class XonshImportEventLoader(Loader): 261 """A class that dispatches loader calls to another loader and fires relevant 262 xonsh events. 263 """ 264 265 def __init__(self, loader): 266 self.loader = loader 267 268 # 269 # Loader methods 270 # 271 def create_module(self, spec): 272 """Creates and returns the module object.""" 273 events.on_import_pre_create_module.fire(spec=spec) 274 mod = self.loader.create_module(spec) 275 events.on_import_post_create_module.fire(module=mod, spec=spec) 276 return mod 277 278 def exec_module(self, module): 279 """Executes the module in its own namespace.""" 280 events.on_import_pre_exec_module.fire(module=module) 281 rtn = self.loader.exec_module(module) 282 events.on_import_post_exec_module.fire(module=module) 283 return rtn 284 285 def load_module(self, fullname): 286 """Legacy module loading, provided for backwards compatibility.""" 287 return self.loader.load_module(fullname) 288 289 def module_repr(self, module): 290 """Legacy module repr, provided for backwards compatibility.""" 291 return self.loader.module_repr(module) 292 293 294def install_import_hooks(): 295 """ 296 Install Xonsh import hooks in ``sys.meta_path`` in order for ``.xsh`` files 297 to be importable and import events to be fired. 298 299 Can safely be called many times, will be no-op if xonsh import hooks are 300 already present. 301 """ 302 found_imp = found_event = False 303 for hook in sys.meta_path: 304 if isinstance(hook, XonshImportHook): 305 found_imp = True 306 elif isinstance(hook, XonshImportEventHook): 307 found_event = True 308 if not found_imp: 309 sys.meta_path.append(XonshImportHook()) 310 if not found_event: 311 sys.meta_path.insert(0, XonshImportEventHook()) 312 313 314# alias to deprecated name 315install_hook = install_import_hooks 316