1"""Provide mechanism to broadcast imports from master to other processes.
2
3This reduces file system strain.
4
5Use:
6
7  with broadcast_imports():
8      <execute import statements>
9
10This temporarily overrides the Python import mechanism so that
11
12  1) master executes and caches import metadata and code
13  2) import metadata and code are broadcast to all processes
14  3) other processes execute the import statements from memory
15
16Warning: Do not perform any parallel operations while broadcast imports
17are enabled.  Non-master processes assume that they will receive module
18data and will crash or deadlock if master sends anything else.
19"""
20
21
22import os
23import sys
24import marshal
25import importlib
26import importlib.util
27from importlib.machinery import PathFinder, ModuleSpec
28
29import _gpaw
30
31
32if hasattr(_gpaw, 'Communicator'):
33    if '_gpaw' not in sys.builtin_module_names:
34        libmpi = os.environ.get('GPAW_MPI', 'libmpi.so')
35        import ctypes
36        try:
37            ctypes.CDLL(libmpi, ctypes.RTLD_GLOBAL)
38        except OSError:
39            pass
40    world = _gpaw.Communicator()
41else:
42    world = None  # type: ignore
43
44
45def marshal_broadcast(obj):
46    if world.rank == 0:
47        buf = marshal.dumps(obj)
48    else:
49        assert obj is None
50        buf = None
51
52    buf = _gpaw.globally_broadcast_bytes(buf)
53    try:
54        return marshal.loads(buf)
55    except ValueError as err:
56        msg = ('Parallel import failure -- probably received garbage.  '
57               'Error was: {}.  This may happen if parallel operations are '
58               'performed while parallel imports are enabled.'.format(err))
59        raise ImportError(msg)
60
61
62class BroadcastLoader:
63    def __init__(self, spec, module_cache):
64        self.module_cache = module_cache
65        self.spec = spec
66
67    def load_module(self, fullname):
68        if world.rank == 0:
69            # Load from file and store in cache:
70            code = self.spec.loader.get_code(fullname)
71            metadata = (self.spec.submodule_search_locations, self.spec.origin)
72            self.module_cache[fullname] = (metadata, code)
73            # We could execute the default mechanism to load the module here.
74            # Instead we load from cache using our own loader, like on the
75            # other cores.
76
77        return self.load_from_cache(fullname)
78
79    def load_from_cache(self, fullname):
80        metadata, code = self.module_cache[fullname]
81        module = importlib.util.module_from_spec(self.spec)
82        origin = metadata[1]
83        module.__file__ = origin
84        # __package__, __path__, __cached__?
85        module.__loader__ = self
86        sys.modules[fullname] = module
87        exec(code, module.__dict__)
88        return module
89
90    def __str__(self):
91        return ('<{} for {}:{} [{} modules cached]>'
92                .format(self.__class__.__name__,
93                        self.spec.name, self.spec.origin,
94                        len(self.module_cache)))
95
96
97class BroadcastImporter:
98    def __init__(self):
99        self.module_cache = {}
100        self.cached_modules = []
101
102    def find_spec(self, fullname, path=None, target=None):
103        if world.rank == 0:
104            spec = PathFinder.find_spec(fullname, path, target)
105            if spec is None:
106                return None
107
108            if spec.loader is None:
109                return None
110
111            code = spec.loader.get_code(fullname)
112            if code is None:  # C extensions
113                return None
114
115            loader = BroadcastLoader(spec, self.module_cache)
116            assert fullname == spec.name
117
118            searchloc = spec.submodule_search_locations
119            spec = ModuleSpec(fullname, loader, origin=spec.origin,
120                              is_package=searchloc is not None)
121            if searchloc is not None:
122                spec.submodule_search_locations += searchloc
123            return spec
124        else:
125            if fullname not in self.module_cache:
126                # Could this in principle interfere with builtin imports?
127                return PathFinder.find_spec(fullname, path, target)
128
129            searchloc, origin = self.module_cache[fullname][0]
130            loader = BroadcastLoader(None, self.module_cache)
131            spec = ModuleSpec(fullname, loader, origin=origin,
132                              is_package=searchloc is not None)
133            if searchloc is not None:
134                spec.submodule_search_locations += searchloc
135            loader.spec = spec  # XXX loader.loader is still None
136            return spec
137
138    def broadcast(self):
139        if world.rank == 0:
140            # print('bcast {} modules'.format(len(self.module_cache)))
141            marshal_broadcast(self.module_cache)
142        else:
143            self.module_cache = marshal_broadcast(None)
144            # print('recv {} modules'.format(len(self.module_cache)))
145
146    def enable(self):
147        if world is None:
148            return
149
150        # There is the question of whether we lose anything by inserting
151        # ourselves further on in the meta_path list.  Maybe not, and maybe
152        # that is a less violent act.
153        sys.meta_path.insert(0, self)
154        if world.rank != 0:
155            self.broadcast()
156
157    def disable(self):
158        if world is None:
159            return
160
161        if world.rank == 0:
162            self.broadcast()
163        self.cached_modules += self.module_cache.keys()
164        self.module_cache = {}
165        myself = sys.meta_path.pop(0)
166        assert myself is self
167
168    def __enter__(self):
169        self.enable()
170
171    def __exit__(self, *args):
172        self.disable()
173
174
175broadcast_imports = BroadcastImporter()
176