1# Copyright 2019 The Cirq Developers
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from typing import Any, Callable, List, Optional
16
17from contextlib import contextmanager
18import importlib
19import sys
20
21# Bug workaround: https://github.com/python/mypy/issues/1498
22ModuleType = Any
23
24
25class InstrumentedFinder(importlib.abc.MetaPathFinder):
26    """A module finder used to hook the python import statement."""
27
28    def __init__(
29        self,
30        finder: Any,
31        module_name: str,
32        wrap_module: Callable[[ModuleType], Optional[ModuleType]],
33        after_exec: Callable[[ModuleType], None],
34    ):
35        """A module finder that uses an existing module finder to find a python
36        module spec and intercept the execution of matching modules.
37
38        Replace finders in `sys.meta_path` with instances of this class to
39        instrument import statements.
40
41        Args:
42            finder: The original module finder to wrap.
43            module_name: The fully qualified module name to instrument e.g.
44                `'pkg.submodule'`.  Submodules of this are also instrumented.
45            wrap_module: A callback function that takes a module object before
46                it is run and either modifies or replaces it before it is run.
47                The module returned by this function will be executed.  If None
48                is returned the module is not executed and may be executed
49                later.
50            after_exec: A callback function that is called with the return value
51                of `wrap_module` after that module was executed if `wrap_module`
52                didn't return None.
53        """
54
55        self.finder = finder
56        self.module_name = module_name
57        self.match_components: List[str] = []
58        if self.module_name:
59            self.match_components = self.module_name.split('.')
60        self.wrap_module = wrap_module
61        self.after_exec = after_exec
62
63    def find_spec(self, fullname: str, path: Any = None, target: Any = None) -> Any:
64        components = fullname.split('.')
65        spec = self.finder.find_spec(fullname, path=path, target=target)
66        if spec is None:
67            return None
68        if components[: len(self.match_components)] == self.match_components:
69            spec = self.wrap_spec(spec)
70        return spec
71
72    def wrap_spec(self, spec: Any) -> Any:
73        spec.loader = InstrumentedLoader(spec.loader, self.wrap_module, self.after_exec)
74        return spec
75
76
77class InstrumentedLoader(importlib.abc.Loader):
78    """A module loader used to hook the python import statement."""
79
80    def __init__(
81        self,
82        loader: Any,
83        wrap_module: Callable[[ModuleType], Optional[ModuleType]],
84        after_exec: Callable[[ModuleType], None],
85    ):
86        """A module loader that uses an existing module loader and intercepts
87        the execution of a module.
88
89        Use `InstrumentedFinder` to instrument modules with instances of this
90        class.
91
92        Args:
93            loader: The original module loader to wrap.
94            module_name: The fully qualified module name to instrument e.g.
95                `'pkg.submodule'`.  Submodules of this are also instrumented.
96            wrap_module: A callback function that takes a module object before
97                it is run and either modifies or replaces it before it is run.
98                The module returned by this function will be executed.  If None
99                is returned the module is not executed and may be executed
100                later.
101            after_exec: A callback function that is called with the return value
102                of `wrap_module` after that module was executed if `wrap_module`
103                didn't return None.
104        """
105        self.loader = loader
106        self.wrap_module = wrap_module
107        self.after_exec = after_exec
108
109    def create_module(self, spec: ModuleType) -> ModuleType:
110        return self.loader.create_module(spec)
111
112    def exec_module(self, module: ModuleType) -> None:
113        module = self.wrap_module(module)
114        if module is not None:
115            self.loader.exec_module(module)
116            self.after_exec(module)
117
118
119@contextmanager
120def wrap_module_executions(
121    module_name: str,
122    wrap_func: Callable[[ModuleType], Optional[ModuleType]],
123    after_exec: Callable[[ModuleType], None] = lambda m: None,
124    assert_meta_path_unchanged: bool = True,
125):
126    """A context manager that hooks python's import machinery within the
127    context.
128
129    `wrap_func` is called before executing the module called `module_name` and
130    any of its submodules.  The module returned by `wrap_func` will be executed.
131    """
132
133    def wrap(finder: Any) -> Any:
134        if not hasattr(finder, 'find_spec'):
135            return finder
136        return InstrumentedFinder(finder, module_name, wrap_func, after_exec)
137
138    new_meta_path = [wrap(finder) for finder in sys.meta_path]
139
140    try:
141        orig_meta_path, sys.meta_path = sys.meta_path, new_meta_path
142        yield
143    finally:
144        if assert_meta_path_unchanged:
145            assert sys.meta_path == new_meta_path
146        sys.meta_path = orig_meta_path
147
148
149@contextmanager
150def delay_import(module_name: str):
151    """A context manager that allows the module or submodule named `module_name`
152    to be imported without the contents of the module executing until the
153    context manager exits.
154    """
155    delay = True
156    execute_list = []
157
158    def wrap_func(module: ModuleType) -> Optional[ModuleType]:
159        if delay:
160            execute_list.append(module)
161            return None  # Don't allow the module to be executed yet
162        return module  # Now allow the module to be executed
163
164    with wrap_module_executions(module_name, wrap_func):
165        importlib.import_module(module_name)
166
167    yield  # Run the body of the context
168
169    delay = False
170    for module in execute_list:
171        module.__loader__.exec_module(module)  # Calls back into wrap_func
172