1# -*- coding: utf-8 -*-
2
3"""
4This module allows to adjust some parameters according to
5the type of execution.
6
7For Aster4 cluster.
8
9These asrun customizations are called through (in asrun configuration file) :
10
11    schema_calcul : plugins.aster4_calcul.modifier
12
13    schema_execute : plugins.aster4_calcul.change_command_line
14"""
15
16import os
17from math import ceil
18
19from asrun.core import magic
20from asrun.runner import Runner
21
22# memory (MB) added to memjob for testcases
23MEMSUP = 400
24def iceil(x):
25    return int(ceil(x))
26
27
28def modifier(calcul):
29    """Call elementary functions to adjust :
30        - batch parameters,
31        - submit interactive mpi execution in interactive queues.
32    Argument : ASTER_CALCUL object
33    Return value : ASTER_PROFIL object."""
34    serv = calcul.serv
35    prof = calcul.prof
36    if prof['mode'][0] == 'batch':
37        prof = change_batch_parameters(serv, prof)
38        change_bsub_command(prof)
39    return prof
40
41
42def change_bsub_command(prof):
43    """Add bsub arguments.
44    """
45    run = magic.run
46    # add arguments to bsub
47    lcmd = run['batch_sub'].split()
48    cpu_mpi, node_mpi, cpu_openmp, blas_thread = _get_cpu_parameters(prof)
49    cpu_per_node = cpu_mpi / node_mpi
50    # just to avoid to do it several times
51    if '-n' not in lcmd:
52        lcmd.insert(1, '-n %d' % cpu_mpi)
53    # to place processes on nodes
54    if cpu_mpi > 1:
55        if "intelmpi" not in lcmd:
56            lcmd.append("-a intelmpi -x")
57            if node_mpi > 1:
58                lcmd.append('-R "span[ptile=%d]"' % cpu_per_node)
59    else:
60        if "seq" not in lcmd:
61            lcmd.append("-a seq")
62    prof['batch_sub'] = " ".join(lcmd)
63
64
65def change_batch_parameters(serv, prof):
66    """Change the batch parameters in an export object (classe...)."""
67    # available services are defined in calcul.py :
68
69    cpu_mpi, node_mpi, cpu_openmp, blas_thread = _get_cpu_parameters(prof)
70
71    # change job queue if :
72    #  - it's a study and the batch queue is not defined
73    #  - or it's a testcase.
74    DEFAULT_QUEUE = 'prod'
75    g0 = group = prof['classe'][0]
76    if group == '':
77        # by default : prod
78        group = DEFAULT_QUEUE
79
80    # add MEMSUP MB
81    if not 'distribution' in prof['actions']:
82        prof['memjob'] = int(float(prof['memjob'][0])) + MEMSUP * 1024
83        if g0 == 'distM':
84            group = ''
85    else:
86        if g0 == '':
87            group = 'distM'
88    if g0 in ('TI', 'MRI'):
89        pass
90    elif 'astout' in prof['actions']:
91        group = 'astout'
92        prof.set_param_memory(512)
93        prof.set_param_time(3600 * 24)
94    elif serv == 'study':
95        if cpu_mpi > 1:
96            group = 'mpi'
97    elif serv == 'testcase':
98        group = 'test'
99        if cpu_mpi > 1:
100            group = 'mpi' # mpi_test ?
101    elif serv == 'parametric_study':
102        # parametric_study : group not yet exists
103        group = 'distr'
104        if cpu_mpi > 1:
105            group = 'mpi'
106    if group in ('mpi', 'TI' , 'MRI'):
107        # multiply the time limit by the number of processors
108        if prof['tpsjob'][0]:
109            max = (float(prof['tpsjob'][0]) + 2.0) * cpu_mpi
110            magic.run.DBG("Change tpsjob from %s to %s" % (prof['tpsjob'][0], max))
111            prof['tpsjob'] = max
112    if group != g0 and g0 != "urgent":
113        prof['classe'] = group
114        magic.run.DBG("Change batch queue group to : %s" % group)
115    return prof
116
117
118def change_command_line(prof):
119    """Change mpirun command line and arguments."""
120
121    class ModifiedRunner(Runner):
122        def __init__(self, *args, **kwargs):
123            Runner.__init__(self, *args, **kwargs)
124            self._prof = prof
125
126        def set_rep_trav(self, reptrav, basename=''):
127            """Set temporary directory for Code_Aster executions."""
128            Runner.set_rep_trav(self, reptrav, basename='')
129            run = magic.run
130            if not reptrav and os.environ.get('TMPDIR'):
131                self.global_reptrav = os.environ['TMPDIR']
132                if self.really():   # for a MPI execution
133                    self.global_reptrav = os.path.join(self.global_reptrav, 'global')
134                run.DBG("global_reptrav set to %s" % self.global_reptrav)
135                run.DBG("local_reptrav  set to %s" % self.local_reptrav)
136            return self.global_reptrav
137
138        def build_dict_mpi_args(self):
139            """Return dict arguments to build the script."""
140            dict_mpi_args = Runner.build_dict_mpi_args(self)
141            # to use a different mpirun in interactive mode
142            if self._prof['mode'][0] != 'batch':
143                dict_mpi_args['mpirun_cmd'] = "/logiciels/impi/bin64/mpirun -r ssh -IB -rr -l -np %(mpi_nbcpu)s %(program)s"
144            #magic.run.DBG("(job %s) mpi arguments :" % self._prof['nomjob'][0], dict_mpi_args)
145            return dict_mpi_args
146
147        def really(self):
148            """Return True if Code_Aster executions need mpirun."""
149            return self._use_mpi and self.nbcpu() > 1
150
151    return ModifiedRunner
152
153
154def _get_cpu_parameters(prof):
155    """Return number of OpenMP threads, MPI cpus and MPI nodes
156    asked in the export."""
157    try:
158        cpu_openmp = int(prof['ncpus'][0] or 1)
159    except ValueError:
160        cpu_openmp = 1
161    cpu_openmp = max(cpu_openmp, 1)
162    try:
163        cpu_mpi = int(prof['mpi_nbcpu'][0] or 1)
164    except ValueError:
165        cpu_mpi = 1
166    cpu_mpi = max(cpu_mpi, 1)
167    node_mpi = 999999
168    try:
169        node_mpi = int(prof['mpi_nbnoeud'][0]) or node_mpi
170    except ValueError:
171        pass
172    return cpu_mpi, node_mpi, cpu_openmp, 1
173
174def adjust_cpu_parameters(cpu_mpi, node_mpi, cpu_openmp):
175    """Adjust the number of processors, nodes to optimize the
176    utilization of resources and performances."""
177    #print ">>> Requested (cpu_mpi / node / openmp) :", cpu_mpi, node_mpi, cpu_openmp
178    PHYSICAL_PROC = 2  # number of physical processors
179    CORE_PER_PROC = 4  # number of cores on each processor
180
181    cpu_per_node = iceil(1. * cpu_mpi / node_mpi)
182    # use at least PHYSICAL_PROC procs per node
183    if cpu_per_node < PHYSICAL_PROC:
184        cpu_per_node = PHYSICAL_PROC
185    # do not allocate more nodes than necessary
186    if node_mpi > 1. * cpu_mpi / cpu_per_node:
187        node_mpi = iceil(1. * cpu_mpi / cpu_per_node)
188    # because the nodes are exclusive, use all the processors (if cpu_mpi > 1)
189    if cpu_mpi > 1 and cpu_mpi < node_mpi * PHYSICAL_PROC:
190        cpu_mpi = node_mpi * PHYSICAL_PROC
191
192    # recommandations
193    if cpu_per_node > PHYSICAL_PROC:
194        print("Warning: more MPI processors per node (%d) than physical processors (%d)." \
195            % (cpu_per_node, PHYSICAL_PROC))
196    # not yet used and usable
197    thread_per_cpu = PHYSICAL_PROC * CORE_PER_PROC / cpu_per_node
198    blas_thread = max(thread_per_cpu / cpu_openmp, 1)
199    if cpu_openmp * blas_thread * cpu_per_node > PHYSICAL_PROC * CORE_PER_PROC:
200        print("Warning: more threads (%d) than cores (%d)." \
201            % (cpu_openmp * blas_thread * cpu_per_node, PHYSICAL_PROC * CORE_PER_PROC))
202    #print "    return (cpu_mpi / node / cpu_per_node / openmp / blas) :", \
203        #cpu_mpi, node_mpi, cpu_per_node, cpu_openmp, blas_thread
204    #print
205    return cpu_mpi, node_mpi, cpu_openmp, blas_thread
206
207
208
209# unittest
210if __name__ == '__main__':
211    res = adjust_cpu_parameters(1, 5, 1)  # seq
212    assert res == (1, 1, 1, 4), res
213    res = adjust_cpu_parameters(8, 999999, 1)
214    assert res == (8, 4, 1, 4), res
215    res = adjust_cpu_parameters(12, 3, 1)
216    assert res == (12, 3, 1, 2), res
217    res = adjust_cpu_parameters(13, 8, 1)
218    assert res == (14, 7, 1, 4), res
219    res = adjust_cpu_parameters(11, 11, 1)
220    assert res == (12, 6, 1, 4), res
221    res = adjust_cpu_parameters(9, 2, 4)
222    assert res == (9, 2, 4, 1), res
223    res = adjust_cpu_parameters(17, 3, 2)
224    assert res == (17, 3, 2, 1), res
225