1# coding: utf-8
2"""Work subclasses related to effective mass calculations."""
3
4import numpy as np
5import os
6
7from monty.json import jsanitize
8from abipy.core.kpoints import build_segments
9from .nodes import Node
10from .works import Work, PhononWork
11from .flows import Flow
12
13
14def _get_red_dirs_from_opts(red_dirs, cart_dirs, reciprocal_lattice):
15    """
16    Helper function to compute the list of directions from user input. Return numpy array.
17    """
18    all_red_dirs = []
19    if red_dirs is not None:
20        all_red_dirs.extend(np.reshape(red_dirs, (-1, 3)))
21
22    if cart_dirs is not None:
23        frac_coords = reciprocal_lattice.get_fractional_coords(np.reshape(cart_dirs, (-1, 3)))
24        all_red_dirs.extend(frac_coords)
25
26    return np.reshape(all_red_dirs, (-1, 3))
27
28
29class EffMassLineWork(Work):
30    """
31    Work for the computation of effective masses via finite differences along a k-line.
32    Useful for cases such as NC+SOC where DFPT is not implemented or if one is interested
33    in non-parabolic behaviour.
34
35    .. rubric:: Inheritance Diagram
36    .. inheritance-diagram:: EffMassLineWork
37    """
38
39    @classmethod
40    def from_scf_input(cls, scf_input, k0_list, step=0.01, npts=15,
41                       red_dirs=[[1, 0, 0], [0, 1, 0], [0, 0, 1]], cart_dirs=None,
42                       den_node=None, manager=None):
43        """
44        Build the Work from an |AbinitInput| representing a GS-SCF calculation.
45
46        Args:
47            scf_input: |AbinitInput| for GS-SCF used as template to generate the other inputs.
48            k0_list: List with the reduced coordinates of the k-points where effective masses are wanted.
49            step: Step for finite difference in Angstrom^-1
50            npts: Number of points sampled around each k-point for each direction.
51            red_dirs: List of reduced directions used to generate the segments passing through the k-point
52            cart_dirs: List of Cartesian directions used to generate the segments passing through the k-point
53            den_node: Path to the DEN file or Task object producing a DEN file.
54                Can be used to avoid the initial SCF calculation if a DEN file is already available.
55                If None, a GS calculation is performed.
56            manager: |TaskManager| instance. Use default if None.
57        """
58        if npts < 3:
59            raise ValueError("Number of points: `%s` should be >= 3 for finite differences" % npts)
60
61        # Define list of directions from user input.
62        reciprocal_lattice = scf_input.structure.lattice.reciprocal_lattice
63        all_red_dirs = _get_red_dirs_from_opts(red_dirs, cart_dirs, reciprocal_lattice)
64        kpts = build_segments(k0_list, npts, step, all_red_dirs, reciprocal_lattice)
65
66        # Now build NSCF input with explicit list of k-points.
67        nscf_input = scf_input.make_nscf_kptopt0_input(kpts)
68
69        new = cls(manager=manager)
70
71        # Need to perform SCF run if DEN file is not available.
72        scf_task = new.register_scf_task(scf_input) if den_node is None else Node.as_node(den_node)
73
74        new.register_nscf_task(nscf_input, deps={scf_task: "DEN"})
75        return new
76
77
78class EffMassDFPTWork(Work):
79    """
80    Work for the computation of effective masses with DFPT.
81    Requires explicit list of k-points and range of bands.
82
83    .. rubric:: Inheritance Diagram
84    .. inheritance-diagram:: EffMassDFPTWork
85    """
86
87    @classmethod
88    def from_scf_input(cls, scf_input, k0_list, effmass_bands_f90, den_node=None, manager=None):
89        """
90        Build the Work from an |AbinitInput| representing a GS-SCF calculation.
91
92        Args:
93            scf_input: |AbinitInput| for GS-SCF used as template to generate the other inputs.
94            k0_list: List with the reduced coordinates of the k-points where effective masses are wanted.
95            effmass_bands_f90: (nkpt, 2) array with band range for effmas computation.
96                WARNING: Assumes Fortran convention with indices starting from 1.
97            den_node: Path to the DEN file or Task object producing a DEN file.
98                Can be used to avoid the initial SCF calculation if a DEN file is already available.
99                If None, a GS calculation is performed.
100            manager: |TaskManager| instance. Use default if None.
101        """
102        multi = scf_input.make_dfpt_effmass_inputs(k0_list, effmass_bands_f90)
103        nscf_input, effmass_input = multi[0], multi[1]
104
105        new = cls(manager=manager)
106
107        # Important: keep a reference to the Frohlich input that can be used to run EPH calculations if needed.
108        new.frohlich_input = multi[2]
109
110        # Need to perform SCF run if DEN file is not available
111        scf_task = new.register_scf_task(scf_input) if den_node is None else Node.as_node(den_node)
112
113        nscf_task = new.register_nscf_task(nscf_input, deps={scf_task: "DEN"})
114        new.register_effmass_task(effmass_input, deps={scf_task: "DEN", nscf_task: "WFK"})
115        return new
116
117
118class EffMassAutoDFPTWork(Work):
119    """
120    Work for the automatic computation of effective masses with DFPT.
121    Band extrema are automatically detected by performing a NSCF calculation
122    along a high-symmetry k-path with ndivsm.
123    Requires more computation that EffMassWork since input variables (kpoints and band range)
124    are computed at runtime.
125
126    .. rubric:: Inheritance Diagram
127    .. inheritance-diagram:: EffMassAutoDFPTWork
128    """
129
130    @classmethod
131    def from_scf_input(cls, scf_input, ndivsm=15, tolwfr=1e-20, manager=None):
132        """
133        Build the Work from an |AbinitInput| representing a GS-SCF calculation.
134
135        Args:
136            scf_input: |AbinitInput| for GS-SCF used as template to generate the other inputs.
137            ndivsm: Number of divisions used to sample the smallest segment of the k-path.
138            tolwfr: Tolerance on residuals for NSCF calculation
139            manager: |TaskManager| instance. Use default if None.
140        """
141        if scf_input.get("nsppol", 1) != 1:
142            raise NotImplementedError("Magnetic semiconductors with nsppol = 2 are not implemented!")
143
144        new = cls(manager=manager)
145        # Keep a copy of the initial input.
146        new.scf_input = scf_input.deepcopy()
147
148        # Need SCF run to get DEN file.
149        new.scf_task = new.register_scf_task(new.scf_input)
150
151        # Perform NSCF run along k-path that will be used to find band extrema.
152        bands_input = scf_input.make_ebands_input(ndivsm=ndivsm, tolwfr=tolwfr)
153        new.bands_task = new.register_nscf_task(bands_input, deps={new.scf_task: "DEN"})
154
155        return new
156
157    def on_all_ok(self):
158        """
159        This method is called once the `Work` is completed i.e. when all tasks have reached status S_OK.
160        Here, we read the band structure from GSR to find the position of the band edges and use these values
161        to generate a new work for effective masses with DFPT.
162        """
163        with self.bands_task.open_gsr() as gsr:
164            ebands = gsr.ebands
165            # Warning: Assuming semiconductor with spin-unpolarized band energies.
166            # At present, Abinit input variables do not take into account nsppol.
167            ebands.set_fermie_to_vbm()
168            # Find k0_list and effmass_bands_f90
169            k0_list, effmass_bands_f90 = ebands.get_kpoints_and_band_range_for_edges()
170
171        # Create the work for effective mass computation with DFPT and add it to the flow.
172        # Keep a reference in generated_effmass_dfpt_work.
173        work = EffMassDFPTWork.from_scf_input(self.scf_input, k0_list, effmass_bands_f90, den_node=self.scf_task)
174
175        self.generated_effmass_dfpt_work = work
176        self.flow.register_work(work)
177        self.flow.allocate()
178        self.flow.build_and_pickle_dump()
179        self.flow.finalized = False
180
181        return super().on_all_ok()
182
183
184class FrohlichZPRFlow(Flow):
185    """
186    .. rubric:: Inheritance Diagram
187    .. inheritance-diagram:: FrohlichZPRFlow
188    """
189
190    @classmethod
191    def from_scf_input(cls, workdir, scf_input, ddb_node=None, ndivsm=15, tolwfr=1e-20, metadata=None, manager=None):
192        """
193        Build the Work from an |AbinitInput| representing a GS-SCF calculation.
194        Final results are stored in the "zprfrohl_results.json" in the outdata directory of the flow.
195
196        Args:
197            workdir: Working directory.
198            scf_input: |AbinitInput| for GS-SCF used as template to generate the other inputs.
199            ddb_node: Path to an external DDB file that is used to avoid the calculation of BECS/eps_inf and phonons.
200                If None, a DFPT calculation is automatically performed by the flow.
201            ndivsm: Number of divisions used to sample the smallest segment of the k-path.
202            tolwfr: Tolerance on residuals for NSCF calculation
203            manager: |TaskManager| instance. Use default if None.
204            metadata: Dictionary with metadata addeded to the final JSON file.
205        """
206        new = cls(workdir=workdir, manager=manager)
207        new.metadata = jsanitize(metadata) if metadata is not None else None
208
209        # Build work for the automatic computation of effective masses.
210        new.effmass_auto_work = EffMassAutoDFPTWork.from_scf_input(scf_input, ndivsm=ndivsm, tolwfr=tolwfr)
211        new.register_work(new.effmass_auto_work)
212        new.scf_task, new.ebands_kpath_task = new.effmass_auto_work[0], new.effmass_auto_work[1]
213
214        if ddb_node is not None:
215            new.ddb_node = Node.as_node(ddb_node)
216            new.ddb_file_path_if_ddb_node = os.path.abspath(ddb_node)
217        else:
218            # Compute DDB with BECS and eps_inf.
219            new.ddb_file_path_if_ddb_node = None
220            ph_work = PhononWork.from_scf_task(new.scf_task, qpoints=[0, 0, 0],
221                                               is_ngqpt=False, tolerance=None, with_becs=True,
222                                               ddk_tolerance=None)
223            new.register_work(ph_work)
224            new.ddb_node = ph_work
225
226        return new
227
228    def on_all_ok(self):
229        """
230        This method is called when all the works in the flow have reached S_OK.
231        This method shall return True if the calculation is completed or
232        False if the execution should continue due to side-effects such as adding a new work to the flow.
233        """
234        if self.on_all_ok_num_calls > 0: return True
235        self.on_all_ok_num_calls += 1
236
237        work = Work()
238        inp = self.effmass_auto_work.generated_effmass_dfpt_work.frohlich_input
239        wfk_task = self.effmass_auto_work.generated_effmass_dfpt_work[0]
240        self.effmass_task = self.effmass_auto_work.generated_effmass_dfpt_work[1]
241
242        deps = {wfk_task: "WFK", self.ddb_node: "DDB", self.effmass_task: "EFMAS.nc"}
243        self.frohl_task = work.register_eph_task(inp, deps=deps)
244        self.register_work(work)
245
246        self.allocate()
247        self.build_and_pickle_dump()
248        return False
249
250    def finalize(self):
251        """
252        This method is called when the flow is completed.
253        Here we write the final results in the "zprfrohl_results.json" file
254        in the `outdata` directory of the flow
255        """
256        d = {}
257        if self.metadata is not None: d.update({"metadata": self.metadata})
258
259        # Add GS results.
260        with self.scf_task.open_gsr() as gsr:
261            d["gsr_scf_path"] = gsr.filepath
262            d["pressure_GPa"] = float(gsr.pressure)
263            d["max_force_eV_Ang"] = float(gsr.max_force)
264            d["structure"] = gsr.structure
265
266        # Add NSCF band structure.
267        with self.ebands_kpath_task.open_gsr() as gsr:
268            d["gsr_nscf_kpath"] = gsr.filepath
269            gsr.ebands.set_fermie_to_vbm()
270            d["ebands_kpath"] = gsr.ebands
271            #d["ebands_info"] = gsr.ebands.get_dict4pandas(with_geo=False, with_spglib=False)
272
273        # TODO
274        # Extract results from run.abo
275        #d["effmass_results"] = self.effmass_task.yaml_parse_results()
276        #d["frohl_results"] = self.frohl_task.yaml_parse_results()
277
278        # Add epsinf, e0, BECS, alpha and DDB as string.
279        from abipy import abilab
280        if self.ddb_file_path_if_ddb_node is not None:
281            ddb_filepath = self.ddb_file_path_if_ddb_node
282        else:
283            ddb_filepath = self.ddb_node.outdir.path_in("out_DDB")
284
285        with abilab.abiopen(ddb_filepath) as ddb:
286            d["ddb_path"] = ddb.filepath
287            d["ddb_string"] = ddb.get_string()
288            epsinf, becs = ddb.anaget_epsinf_and_becs(chneut=1)
289            gen = ddb.anaget_dielectric_tensor_generator(asr=2, chneut=1, dipdip=1)
290            eps0 = gen.tensor_at_frequency(w=0.0)
291            d["becs"] = becs
292            d["epsinf_cart"] = epsinf
293            # FIXME Complex is not supported by JSON.
294            d["eps0_cart"] = eps0.real
295
296        #print(d)
297        abilab.mjson_write(d, self.outdir.path_in("zprfrohl_results.json"), indent=4)
298
299        return super().finalize()
300