1# -*- coding: utf-8 -*-
2
3"""
4This module allows to adjust some parameters according to
5the type of execution.
6
7For athosdev cluster.
8
9These asrun customizations are called through (in asrun configuration file) :
10
11    schema_calcul : plugins.athosdev.modifier
12
13    schema_execute : plugins.athosdev.change_command_line
14"""
15
16import os
17
18from asrun.core import magic
19from asrun.runner import Runner
20from asrun.build import AsterBuild
21from asrun.config import build_config_of_version
22from asrun.common_func import get_tmpname
23from asrun.plugins.generic_func import getCpuParameters, setDistrLimits
24
25
26# memory (MB) added to memjob for testcases
27MEMSUP = 0
28
29
30def modifier(calcul):
31    """Call elementary functions to adjust :
32        - batch parameters,
33        - TODO submit interactive mpi execution in interactive queues.
34    Argument : ASTER_CALCUL object
35    Return value : ASTER_PROFIL object."""
36    serv = calcul.serv
37    prof = calcul.prof
38    if prof['mode'][0] == 'batch':
39        prof = change_batch_parameters(serv, prof)
40    # longest queue on athosdev: 24 hours
41    setDistrLimits(prof, 128, 24 * 3600 - 1, 'batch')
42    return prof
43
44def getCpuParametersLocal(prof):
45    """Force to use all available threads for OpenMP AND Blas.
46    See `asrun.plugins.generic_func.getCpuParameters` function."""
47    # Fix the number of physical processors (2) & cores per processor (12).
48    cpu_mpi, node_mpi, cpu_openmp, blas_thread = getCpuParameters(2, 12, prof)
49    cpu_openmp = cpu_openmp * blas_thread
50    return cpu_mpi, node_mpi, cpu_openmp, blas_thread
51
52def change_batch_parameters(serv, prof):
53    """Change the batch parameters in an export object (classe...)."""
54    # available services are defined in calcul.py :
55
56    cpu_mpi, node_mpi, cpu_openmp, blas_thread = getCpuParametersLocal(prof)
57    cpu_per_node = 1. * cpu_mpi / node_mpi
58
59    # change job queue if :
60    #  - it's a study and the batch queue is not defined
61    #  - or it's a testcase.
62    DEFAULT_QUEUE = 'seq'
63    g0 = group = prof['classe'][0]
64    if group == '':
65        # by default : prod
66        group = DEFAULT_QUEUE
67    batch_custom = "--wckey=P11YB:ASTER"
68
69    # add MEMSUP MB
70    if not 'distribution' in prof['actions']:
71        prof['memjob'] = int(float(prof['memjob'][0])) + MEMSUP * 1024
72    if 'astout' in prof['actions']:
73        prof['memjob'] = 1000*1024
74        prof['tpsjob'] = 60*24
75    if cpu_mpi > 1:
76        # should allow ncpu=2, node=1 but it does not work.
77        batch_custom += ' --exclusive'
78        group = 'par'
79        # --nodes is now required, even if it is equal to 1
80        if not prof['mpi_nbnoeud'][0]:
81            prof['mpi_nbnoeud'] = 1
82    else:
83        # --nodes must not be set in sequential
84        if prof['mpi_nbnoeud'][0]:
85            prof['mpi_nbnoeud'] = ""
86
87    memory_limit = float(prof['memjob'][0]) / 1024.
88    prof.set_param_memory(memory_limit)
89    prof['memoryNode'] = memory_limit * cpu_per_node
90    # memory per node in GB
91    memGB = memory_limit * cpu_per_node / 1024.
92    if cpu_mpi == 1:
93        if memGB > 256:
94            group = 'bm512g'
95        elif memGB > 64:
96            group = 'bm256g'
97
98    # time limit in hour
99    tpsjob = float(prof['tpsjob'][0]) * 60. / 3600.
100
101    # special hook for performance testcases with 1 cpu
102    if cpu_mpi == 1 and 'performance' in prof['testlist'][0] and memGB < 64:
103        batch_custom += ' --exclusive'
104        group = 'par'
105
106    # allocate all the available cores if not given in export
107    if not prof['ncpus'][0]:
108        magic.run.DBG("Change number of threads: %s" % cpu_openmp)
109        # prof['ncpus'] = cpu_openmp
110        prof['ncpus'] = min([6, cpu_openmp])
111    else:
112        prof['ncpus'] = int(prof['ncpus'][0])
113    if prof['ncpus'][0] > 1:
114        group = 'par'
115
116    # general - see https://computing.llnl.gov/linux/slurm/cpu_management.html
117    if 'par' in group:
118        batch_custom += (' --cpus-per-task={0} --threads-per-core=1 '
119            '--distribution=block:block --mem_bind=local').format(prof['ncpus'][0])
120
121    prof['batch_custom'] = batch_custom
122    if group != g0:
123        prof['classe'] = group
124        magic.run.DBG("Change batch queue group to : %s" % group)
125    return prof
126
127
128def change_command_line(prof):
129    """Change mpirun command line and arguments.
130    Argument : ASTER_PROFIL object
131    Return value : derivated of Runner class.
132    """
133    cpu_mpi, node_mpi, cpu_openmp, blas_thread = getCpuParametersLocal(prof)
134    # for ompatibility with version < 13.1
135    use_numthreads = False
136    vers = prof['version'][0]
137    if vers:
138        conf = build_config_of_version(magic.run, vers, error=False)
139        if conf:
140            build = AsterBuild(magic.run, conf)
141            use_numthreads = build.support('use_numthreads')
142    if not use_numthreads:
143        cpu_openmp = prof['ncpus'][0]
144    # end of compatibility block
145
146
147    class ModifiedRunner(Runner):
148        """Modified Runner to export some variables before execution"""
149
150        def get_exec_command(self, cmd_in, add_tee=False, env=None):
151            """Return command to run Code_Aster.
152            Export specific variables for Intel MKL"""
153            cmd = Runner.get_exec_command(self, cmd_in, add_tee, env)
154            cmd = (
155                "export OMP_NUM_THREADS={openmp} ; "
156                "export MKL_NUM_THREADS={blas} ; "
157                "export I_MPI_PIN_DOMAIN=omp:compact ; "
158            ).format(openmp=cpu_openmp, blas=blas_thread) + cmd
159            return cmd
160
161    return ModifiedRunner
162