1# -*- coding: utf-8 -*- 2 3""" 4This module allows to adjust some parameters according to 5the type of execution. 6 7For Aster4 cluster. 8 9These asrun customizations are called through (in asrun configuration file) : 10 11 schema_calcul : plugins.aster4_calcul.modifier 12 13 schema_execute : plugins.aster4_calcul.change_command_line 14""" 15 16import os 17from math import ceil 18 19from asrun.core import magic 20from asrun.runner import Runner 21 22# memory (MB) added to memjob for testcases 23MEMSUP = 400 24def iceil(x): 25 return int(ceil(x)) 26 27 28def modifier(calcul): 29 """Call elementary functions to adjust : 30 - batch parameters, 31 - submit interactive mpi execution in interactive queues. 32 Argument : ASTER_CALCUL object 33 Return value : ASTER_PROFIL object.""" 34 serv = calcul.serv 35 prof = calcul.prof 36 if prof['mode'][0] == 'batch': 37 prof = change_batch_parameters(serv, prof) 38 change_bsub_command(prof) 39 return prof 40 41 42def change_bsub_command(prof): 43 """Add bsub arguments. 44 """ 45 run = magic.run 46 # add arguments to bsub 47 lcmd = run['batch_sub'].split() 48 cpu_mpi, node_mpi, cpu_openmp, blas_thread = _get_cpu_parameters(prof) 49 cpu_per_node = cpu_mpi / node_mpi 50 # just to avoid to do it several times 51 if '-n' not in lcmd: 52 lcmd.insert(1, '-n %d' % cpu_mpi) 53 # to place processes on nodes 54 if cpu_mpi > 1: 55 if "intelmpi" not in lcmd: 56 lcmd.append("-a intelmpi -x") 57 if node_mpi > 1: 58 lcmd.append('-R "span[ptile=%d]"' % cpu_per_node) 59 else: 60 if "seq" not in lcmd: 61 lcmd.append("-a seq") 62 prof['batch_sub'] = " ".join(lcmd) 63 64 65def change_batch_parameters(serv, prof): 66 """Change the batch parameters in an export object (classe...).""" 67 # available services are defined in calcul.py : 68 69 cpu_mpi, node_mpi, cpu_openmp, blas_thread = _get_cpu_parameters(prof) 70 71 # change job queue if : 72 # - it's a study and the batch queue is not defined 73 # - or it's a testcase. 74 DEFAULT_QUEUE = 'prod' 75 g0 = group = prof['classe'][0] 76 if group == '': 77 # by default : prod 78 group = DEFAULT_QUEUE 79 80 # add MEMSUP MB 81 if not 'distribution' in prof['actions']: 82 prof['memjob'] = int(float(prof['memjob'][0])) + MEMSUP * 1024 83 if g0 == 'distM': 84 group = '' 85 else: 86 if g0 == '': 87 group = 'distM' 88 if g0 in ('TI', 'MRI'): 89 pass 90 elif 'astout' in prof['actions']: 91 group = 'astout' 92 prof.set_param_memory(512) 93 prof.set_param_time(3600 * 24) 94 elif serv == 'study': 95 if cpu_mpi > 1: 96 group = 'mpi' 97 elif serv == 'testcase': 98 group = 'test' 99 if cpu_mpi > 1: 100 group = 'mpi' # mpi_test ? 101 elif serv == 'parametric_study': 102 # parametric_study : group not yet exists 103 group = 'distr' 104 if cpu_mpi > 1: 105 group = 'mpi' 106 if group in ('mpi', 'TI' , 'MRI'): 107 # multiply the time limit by the number of processors 108 if prof['tpsjob'][0]: 109 max = (float(prof['tpsjob'][0]) + 2.0) * cpu_mpi 110 magic.run.DBG("Change tpsjob from %s to %s" % (prof['tpsjob'][0], max)) 111 prof['tpsjob'] = max 112 if group != g0 and g0 != "urgent": 113 prof['classe'] = group 114 magic.run.DBG("Change batch queue group to : %s" % group) 115 return prof 116 117 118def change_command_line(prof): 119 """Change mpirun command line and arguments.""" 120 121 class ModifiedRunner(Runner): 122 def __init__(self, *args, **kwargs): 123 Runner.__init__(self, *args, **kwargs) 124 self._prof = prof 125 126 def set_rep_trav(self, reptrav, basename=''): 127 """Set temporary directory for Code_Aster executions.""" 128 Runner.set_rep_trav(self, reptrav, basename='') 129 run = magic.run 130 if not reptrav and os.environ.get('TMPDIR'): 131 self.global_reptrav = os.environ['TMPDIR'] 132 if self.really(): # for a MPI execution 133 self.global_reptrav = os.path.join(self.global_reptrav, 'global') 134 run.DBG("global_reptrav set to %s" % self.global_reptrav) 135 run.DBG("local_reptrav set to %s" % self.local_reptrav) 136 return self.global_reptrav 137 138 def build_dict_mpi_args(self): 139 """Return dict arguments to build the script.""" 140 dict_mpi_args = Runner.build_dict_mpi_args(self) 141 # to use a different mpirun in interactive mode 142 if self._prof['mode'][0] != 'batch': 143 dict_mpi_args['mpirun_cmd'] = "/logiciels/impi/bin64/mpirun -r ssh -IB -rr -l -np %(mpi_nbcpu)s %(program)s" 144 #magic.run.DBG("(job %s) mpi arguments :" % self._prof['nomjob'][0], dict_mpi_args) 145 return dict_mpi_args 146 147 def really(self): 148 """Return True if Code_Aster executions need mpirun.""" 149 return self._use_mpi and self.nbcpu() > 1 150 151 return ModifiedRunner 152 153 154def _get_cpu_parameters(prof): 155 """Return number of OpenMP threads, MPI cpus and MPI nodes 156 asked in the export.""" 157 try: 158 cpu_openmp = int(prof['ncpus'][0] or 1) 159 except ValueError: 160 cpu_openmp = 1 161 cpu_openmp = max(cpu_openmp, 1) 162 try: 163 cpu_mpi = int(prof['mpi_nbcpu'][0] or 1) 164 except ValueError: 165 cpu_mpi = 1 166 cpu_mpi = max(cpu_mpi, 1) 167 node_mpi = 999999 168 try: 169 node_mpi = int(prof['mpi_nbnoeud'][0]) or node_mpi 170 except ValueError: 171 pass 172 return cpu_mpi, node_mpi, cpu_openmp, 1 173 174def adjust_cpu_parameters(cpu_mpi, node_mpi, cpu_openmp): 175 """Adjust the number of processors, nodes to optimize the 176 utilization of resources and performances.""" 177 #print ">>> Requested (cpu_mpi / node / openmp) :", cpu_mpi, node_mpi, cpu_openmp 178 PHYSICAL_PROC = 2 # number of physical processors 179 CORE_PER_PROC = 4 # number of cores on each processor 180 181 cpu_per_node = iceil(1. * cpu_mpi / node_mpi) 182 # use at least PHYSICAL_PROC procs per node 183 if cpu_per_node < PHYSICAL_PROC: 184 cpu_per_node = PHYSICAL_PROC 185 # do not allocate more nodes than necessary 186 if node_mpi > 1. * cpu_mpi / cpu_per_node: 187 node_mpi = iceil(1. * cpu_mpi / cpu_per_node) 188 # because the nodes are exclusive, use all the processors (if cpu_mpi > 1) 189 if cpu_mpi > 1 and cpu_mpi < node_mpi * PHYSICAL_PROC: 190 cpu_mpi = node_mpi * PHYSICAL_PROC 191 192 # recommandations 193 if cpu_per_node > PHYSICAL_PROC: 194 print("Warning: more MPI processors per node (%d) than physical processors (%d)." \ 195 % (cpu_per_node, PHYSICAL_PROC)) 196 # not yet used and usable 197 thread_per_cpu = PHYSICAL_PROC * CORE_PER_PROC / cpu_per_node 198 blas_thread = max(thread_per_cpu / cpu_openmp, 1) 199 if cpu_openmp * blas_thread * cpu_per_node > PHYSICAL_PROC * CORE_PER_PROC: 200 print("Warning: more threads (%d) than cores (%d)." \ 201 % (cpu_openmp * blas_thread * cpu_per_node, PHYSICAL_PROC * CORE_PER_PROC)) 202 #print " return (cpu_mpi / node / cpu_per_node / openmp / blas) :", \ 203 #cpu_mpi, node_mpi, cpu_per_node, cpu_openmp, blas_thread 204 #print 205 return cpu_mpi, node_mpi, cpu_openmp, blas_thread 206 207 208 209# unittest 210if __name__ == '__main__': 211 res = adjust_cpu_parameters(1, 5, 1) # seq 212 assert res == (1, 1, 1, 4), res 213 res = adjust_cpu_parameters(8, 999999, 1) 214 assert res == (8, 4, 1, 4), res 215 res = adjust_cpu_parameters(12, 3, 1) 216 assert res == (12, 3, 1, 2), res 217 res = adjust_cpu_parameters(13, 8, 1) 218 assert res == (14, 7, 1, 4), res 219 res = adjust_cpu_parameters(11, 11, 1) 220 assert res == (12, 6, 1, 4), res 221 res = adjust_cpu_parameters(9, 2, 4) 222 assert res == (9, 2, 4, 1), res 223 res = adjust_cpu_parameters(17, 3, 2) 224 assert res == (17, 3, 2, 1), res 225