1"""Provide mechanism to broadcast imports from master to other processes. 2 3This reduces file system strain. 4 5Use: 6 7 with broadcast_imports(): 8 <execute import statements> 9 10This temporarily overrides the Python import mechanism so that 11 12 1) master executes and caches import metadata and code 13 2) import metadata and code are broadcast to all processes 14 3) other processes execute the import statements from memory 15 16Warning: Do not perform any parallel operations while broadcast imports 17are enabled. Non-master processes assume that they will receive module 18data and will crash or deadlock if master sends anything else. 19""" 20 21 22import os 23import sys 24import marshal 25import importlib 26import importlib.util 27from importlib.machinery import PathFinder, ModuleSpec 28 29import _gpaw 30 31 32if hasattr(_gpaw, 'Communicator'): 33 if '_gpaw' not in sys.builtin_module_names: 34 libmpi = os.environ.get('GPAW_MPI', 'libmpi.so') 35 import ctypes 36 try: 37 ctypes.CDLL(libmpi, ctypes.RTLD_GLOBAL) 38 except OSError: 39 pass 40 world = _gpaw.Communicator() 41else: 42 world = None # type: ignore 43 44 45def marshal_broadcast(obj): 46 if world.rank == 0: 47 buf = marshal.dumps(obj) 48 else: 49 assert obj is None 50 buf = None 51 52 buf = _gpaw.globally_broadcast_bytes(buf) 53 try: 54 return marshal.loads(buf) 55 except ValueError as err: 56 msg = ('Parallel import failure -- probably received garbage. ' 57 'Error was: {}. This may happen if parallel operations are ' 58 'performed while parallel imports are enabled.'.format(err)) 59 raise ImportError(msg) 60 61 62class BroadcastLoader: 63 def __init__(self, spec, module_cache): 64 self.module_cache = module_cache 65 self.spec = spec 66 67 def load_module(self, fullname): 68 if world.rank == 0: 69 # Load from file and store in cache: 70 code = self.spec.loader.get_code(fullname) 71 metadata = (self.spec.submodule_search_locations, self.spec.origin) 72 self.module_cache[fullname] = (metadata, code) 73 # We could execute the default mechanism to load the module here. 74 # Instead we load from cache using our own loader, like on the 75 # other cores. 76 77 return self.load_from_cache(fullname) 78 79 def load_from_cache(self, fullname): 80 metadata, code = self.module_cache[fullname] 81 module = importlib.util.module_from_spec(self.spec) 82 origin = metadata[1] 83 module.__file__ = origin 84 # __package__, __path__, __cached__? 85 module.__loader__ = self 86 sys.modules[fullname] = module 87 exec(code, module.__dict__) 88 return module 89 90 def __str__(self): 91 return ('<{} for {}:{} [{} modules cached]>' 92 .format(self.__class__.__name__, 93 self.spec.name, self.spec.origin, 94 len(self.module_cache))) 95 96 97class BroadcastImporter: 98 def __init__(self): 99 self.module_cache = {} 100 self.cached_modules = [] 101 102 def find_spec(self, fullname, path=None, target=None): 103 if world.rank == 0: 104 spec = PathFinder.find_spec(fullname, path, target) 105 if spec is None: 106 return None 107 108 if spec.loader is None: 109 return None 110 111 code = spec.loader.get_code(fullname) 112 if code is None: # C extensions 113 return None 114 115 loader = BroadcastLoader(spec, self.module_cache) 116 assert fullname == spec.name 117 118 searchloc = spec.submodule_search_locations 119 spec = ModuleSpec(fullname, loader, origin=spec.origin, 120 is_package=searchloc is not None) 121 if searchloc is not None: 122 spec.submodule_search_locations += searchloc 123 return spec 124 else: 125 if fullname not in self.module_cache: 126 # Could this in principle interfere with builtin imports? 127 return PathFinder.find_spec(fullname, path, target) 128 129 searchloc, origin = self.module_cache[fullname][0] 130 loader = BroadcastLoader(None, self.module_cache) 131 spec = ModuleSpec(fullname, loader, origin=origin, 132 is_package=searchloc is not None) 133 if searchloc is not None: 134 spec.submodule_search_locations += searchloc 135 loader.spec = spec # XXX loader.loader is still None 136 return spec 137 138 def broadcast(self): 139 if world.rank == 0: 140 # print('bcast {} modules'.format(len(self.module_cache))) 141 marshal_broadcast(self.module_cache) 142 else: 143 self.module_cache = marshal_broadcast(None) 144 # print('recv {} modules'.format(len(self.module_cache))) 145 146 def enable(self): 147 if world is None: 148 return 149 150 # There is the question of whether we lose anything by inserting 151 # ourselves further on in the meta_path list. Maybe not, and maybe 152 # that is a less violent act. 153 sys.meta_path.insert(0, self) 154 if world.rank != 0: 155 self.broadcast() 156 157 def disable(self): 158 if world is None: 159 return 160 161 if world.rank == 0: 162 self.broadcast() 163 self.cached_modules += self.module_cache.keys() 164 self.module_cache = {} 165 myself = sys.meta_path.pop(0) 166 assert myself is self 167 168 def __enter__(self): 169 self.enable() 170 171 def __exit__(self, *args): 172 self.disable() 173 174 175broadcast_imports = BroadcastImporter() 176