1# coding: utf-8
2# Copyright (c) Pymatgen Development Team.
3# Distributed under the terms of the MIT License.
4
5
6"""
7This module defines the BorgQueen class, which manages drones to assimilate
8data using Python's multiprocessing.
9"""
10
11import json
12import logging
13import os
14from multiprocessing import Manager, Pool
15
16from monty.io import zopen
17from monty.json import MontyDecoder, MontyEncoder
18
19logger = logging.getLogger("BorgQueen")
20
21
22class BorgQueen:
23    """
24    The Borg Queen controls the drones to assimilate data in an entire
25    directory tree. Uses multiprocessing to speed up things considerably. It
26    also contains convenience methods to save and load data between sessions.
27    """
28
29    def __init__(self, drone, rootpath=None, number_of_drones=1):
30        """
31        Args:
32            drone (Drone): An implementation of
33                :class:`pymatgen.apps.borg.hive.AbstractDrone` to use for
34                assimilation.
35            rootpath (str): The root directory to start assimilation. Leave it
36                as None if you want to do assimilation later, or is using the
37                BorgQueen to load previously assimilated data.
38            ndrones (int): Number of drones to parallelize over.
39                Typical machines today have up to four processors. Note that you
40                won't see a 100% improvement with two drones over one, but you
41                will definitely see a significant speedup of at least 50% or so.
42                If you are running this over a server with far more processors,
43                the speedup will be even greater.
44        """
45        self._drone = drone
46        self._num_drones = number_of_drones
47        self._data = []
48
49        if rootpath:
50            if number_of_drones > 1:
51                self.parallel_assimilate(rootpath)
52            else:
53                self.serial_assimilate(rootpath)
54
55    def parallel_assimilate(self, rootpath):
56        """
57        Assimilate the entire subdirectory structure in rootpath.
58        """
59        logger.info("Scanning for valid paths...")
60        valid_paths = []
61        for (parent, subdirs, files) in os.walk(rootpath):
62            valid_paths.extend(self._drone.get_valid_paths((parent, subdirs, files)))
63        manager = Manager()
64        data = manager.list()
65        status = manager.dict()
66        status["count"] = 0
67        status["total"] = len(valid_paths)
68        logger.info("{} valid paths found.".format(len(valid_paths)))
69        with Pool(self._num_drones) as p:
70            p.map(
71                order_assimilation,
72                ((path, self._drone, data, status) for path in valid_paths),
73            )
74            for d in data:
75                self._data.append(json.loads(d, cls=MontyDecoder))
76
77    def serial_assimilate(self, rootpath):
78        """
79        Assimilate the entire subdirectory structure in rootpath serially.
80        """
81        valid_paths = []
82        for (parent, subdirs, files) in os.walk(rootpath):
83            valid_paths.extend(self._drone.get_valid_paths((parent, subdirs, files)))
84        data = []
85        count = 0
86        total = len(valid_paths)
87        for path in valid_paths:
88            newdata = self._drone.assimilate(path)
89            self._data.append(newdata)
90            count += 1
91            logger.info("{}/{} ({:.2f}%) done".format(count, total, count / total * 100))
92        for d in data:
93            self._data.append(json.loads(d, cls=MontyDecoder))
94
95    def get_data(self):
96        """
97        Returns an list of assimilated objects
98        """
99        return self._data
100
101    def save_data(self, filename):
102        """
103        Save the assimilated data to a file.
104
105        Args:
106            filename (str): filename to save the assimilated data to. Note
107                that if the filename ends with gz or bz2, the relevant gzip
108                or bz2 compression will be applied.
109        """
110        with zopen(filename, "wt") as f:
111            json.dump(list(self._data), f, cls=MontyEncoder)
112
113    def load_data(self, filename):
114        """
115        Load assimilated data from a file
116        """
117        with zopen(filename, "rt") as f:
118            self._data = json.load(f, cls=MontyDecoder)
119
120
121def order_assimilation(args):
122    """
123    Internal helper method for BorgQueen to process assimilation
124    """
125    (path, drone, data, status) = args
126    newdata = drone.assimilate(path)
127    if newdata:
128        data.append(json.dumps(newdata, cls=MontyEncoder))
129    status["count"] += 1
130    count = status["count"]
131    total = status["total"]
132    logger.info("{}/{} ({:.2f}%) done".format(count, total, count / total * 100))
133