1# coding: utf-8 2# Copyright (c) Pymatgen Development Team. 3# Distributed under the terms of the MIT License. 4 5 6""" 7This module defines the BorgQueen class, which manages drones to assimilate 8data using Python's multiprocessing. 9""" 10 11import json 12import logging 13import os 14from multiprocessing import Manager, Pool 15 16from monty.io import zopen 17from monty.json import MontyDecoder, MontyEncoder 18 19logger = logging.getLogger("BorgQueen") 20 21 22class BorgQueen: 23 """ 24 The Borg Queen controls the drones to assimilate data in an entire 25 directory tree. Uses multiprocessing to speed up things considerably. It 26 also contains convenience methods to save and load data between sessions. 27 """ 28 29 def __init__(self, drone, rootpath=None, number_of_drones=1): 30 """ 31 Args: 32 drone (Drone): An implementation of 33 :class:`pymatgen.apps.borg.hive.AbstractDrone` to use for 34 assimilation. 35 rootpath (str): The root directory to start assimilation. Leave it 36 as None if you want to do assimilation later, or is using the 37 BorgQueen to load previously assimilated data. 38 ndrones (int): Number of drones to parallelize over. 39 Typical machines today have up to four processors. Note that you 40 won't see a 100% improvement with two drones over one, but you 41 will definitely see a significant speedup of at least 50% or so. 42 If you are running this over a server with far more processors, 43 the speedup will be even greater. 44 """ 45 self._drone = drone 46 self._num_drones = number_of_drones 47 self._data = [] 48 49 if rootpath: 50 if number_of_drones > 1: 51 self.parallel_assimilate(rootpath) 52 else: 53 self.serial_assimilate(rootpath) 54 55 def parallel_assimilate(self, rootpath): 56 """ 57 Assimilate the entire subdirectory structure in rootpath. 58 """ 59 logger.info("Scanning for valid paths...") 60 valid_paths = [] 61 for (parent, subdirs, files) in os.walk(rootpath): 62 valid_paths.extend(self._drone.get_valid_paths((parent, subdirs, files))) 63 manager = Manager() 64 data = manager.list() 65 status = manager.dict() 66 status["count"] = 0 67 status["total"] = len(valid_paths) 68 logger.info("{} valid paths found.".format(len(valid_paths))) 69 with Pool(self._num_drones) as p: 70 p.map( 71 order_assimilation, 72 ((path, self._drone, data, status) for path in valid_paths), 73 ) 74 for d in data: 75 self._data.append(json.loads(d, cls=MontyDecoder)) 76 77 def serial_assimilate(self, rootpath): 78 """ 79 Assimilate the entire subdirectory structure in rootpath serially. 80 """ 81 valid_paths = [] 82 for (parent, subdirs, files) in os.walk(rootpath): 83 valid_paths.extend(self._drone.get_valid_paths((parent, subdirs, files))) 84 data = [] 85 count = 0 86 total = len(valid_paths) 87 for path in valid_paths: 88 newdata = self._drone.assimilate(path) 89 self._data.append(newdata) 90 count += 1 91 logger.info("{}/{} ({:.2f}%) done".format(count, total, count / total * 100)) 92 for d in data: 93 self._data.append(json.loads(d, cls=MontyDecoder)) 94 95 def get_data(self): 96 """ 97 Returns an list of assimilated objects 98 """ 99 return self._data 100 101 def save_data(self, filename): 102 """ 103 Save the assimilated data to a file. 104 105 Args: 106 filename (str): filename to save the assimilated data to. Note 107 that if the filename ends with gz or bz2, the relevant gzip 108 or bz2 compression will be applied. 109 """ 110 with zopen(filename, "wt") as f: 111 json.dump(list(self._data), f, cls=MontyEncoder) 112 113 def load_data(self, filename): 114 """ 115 Load assimilated data from a file 116 """ 117 with zopen(filename, "rt") as f: 118 self._data = json.load(f, cls=MontyDecoder) 119 120 121def order_assimilation(args): 122 """ 123 Internal helper method for BorgQueen to process assimilation 124 """ 125 (path, drone, data, status) = args 126 newdata = drone.assimilate(path) 127 if newdata: 128 data.append(json.dumps(newdata, cls=MontyEncoder)) 129 status["count"] += 1 130 count = status["count"] 131 total = status["total"] 132 logger.info("{}/{} ({:.2f}%) done".format(count, total, count / total * 100)) 133