1# -*- coding: utf-8 -*- 2 3""" 4This module allows to adjust some parameters according to 5the type of execution. 6 7For athosdev cluster. 8 9These asrun customizations are called through (in asrun configuration file) : 10 11 schema_calcul : plugins.athosdev.modifier 12 13 schema_execute : plugins.athosdev.change_command_line 14""" 15 16import os 17 18from asrun.core import magic 19from asrun.runner import Runner 20from asrun.build import AsterBuild 21from asrun.config import build_config_of_version 22from asrun.common_func import get_tmpname 23from asrun.plugins.generic_func import getCpuParameters, setDistrLimits 24 25 26# memory (MB) added to memjob for testcases 27MEMSUP = 0 28 29 30def modifier(calcul): 31 """Call elementary functions to adjust : 32 - batch parameters, 33 - TODO submit interactive mpi execution in interactive queues. 34 Argument : ASTER_CALCUL object 35 Return value : ASTER_PROFIL object.""" 36 serv = calcul.serv 37 prof = calcul.prof 38 if prof['mode'][0] == 'batch': 39 prof = change_batch_parameters(serv, prof) 40 setDistrLimits(prof, 512, 3 * 24 * 3600 - 1, 'batch') 41 return prof 42 43def getCpuParametersLocal(prof): 44 """Force to use all available threads for OpenMP AND Blas. 45 See `asrun.plugins.generic_func.getCpuParameters` function.""" 46 # Fix the number of physical processors (2) & cores per processor (18). 47 cpu_mpi, node_mpi, cpu_openmp, blas_thread = getCpuParameters(2, 18, prof) 48 cpu_openmp = cpu_openmp * blas_thread 49 return cpu_mpi, node_mpi, cpu_openmp, blas_thread 50 51def change_batch_parameters(serv, prof): 52 """Change the batch parameters in an export object (classe...).""" 53 cpu_mpi, node_mpi, cpu_openmp, blas_thread = getCpuParametersLocal(prof) 54 cpu_per_node = cpu_mpi / node_mpi 55 56 # change job queue if : 57 # - it's a study and the batch queue is not defined 58 # - or it's a testcase. 59 DEFAULT_QUEUE = 'cn' 60 g0 = group = prof['classe'][0] 61 if group == '': 62 # by default : prod 63 group = DEFAULT_QUEUE 64 batch_custom = "--wckey=P11YB:ASTER" 65 66 # add MEMSUP MB 67 if not 'distribution' in prof['actions']: 68 prof['memjob'] = int(float(prof['memjob'][0])) + MEMSUP * 1024 69 if 'astout' in prof['actions']: 70 prof['memjob'] = 1000*1024 71 prof['tpsjob'] = 60*24 72 if cpu_mpi > 1: 73 # should allow ncpu=2, node=1 but it does not work. 74 batch_custom += ' --exclusive' 75 # --nodes is now required, even if it is equal to 1 76 if not prof['mpi_nbnoeud'][0]: 77 prof['mpi_nbnoeud'] = 1 78 else: 79 # --nodes must not be set in sequential 80 if prof['mpi_nbnoeud'][0]: 81 prof['mpi_nbnoeud'] = "" 82 83 memory_limit = float(prof['memjob'][0]) / 1024. 84 prof.set_param_memory(memory_limit) 85 prof['memoryNode'] = memory_limit * cpu_per_node 86 # memory per node in GB 87 memGB = memory_limit * cpu_per_node / 1024. 88 if memGB > 180: 89 group = 'bm' 90 91 # time limit in hour 92 tpsjob = float(prof['tpsjob'][0]) * 60. / 3600. 93 94 # special hook for performance testcases with 1 cpu 95 if cpu_mpi == 1 and 'performance' in prof['testlist'][0] and memGB < 64: 96 batch_custom += ' --exclusive' 97 98 # allocate all the available cores if not given in export 99 if not prof['ncpus'][0]: 100 magic.run.DBG("Change number of threads: %s" % cpu_openmp) 101 # prof['ncpus'] = cpu_openmp 102 prof['ncpus'] = min([6, cpu_openmp]) 103 104 prof['ncpus'] = int(prof['ncpus'][0]) 105 106 # general - see https://computing.llnl.gov/linux/slurm/cpu_management.html 107 108 batch_custom += (' --cpus-per-task={0} --threads-per-core=1 ' 109 '--distribution=block:block ' 110 '--mem-bind=local').format(prof['ncpus'][0]) 111 112 prof['batch_custom'] = batch_custom 113 if group != g0: 114 prof['classe'] = group 115 magic.run.DBG("Change batch queue group to : %s" % group) 116 return prof 117 118 119# NOT USED ANYMORE - SET TOO MANY THREADS 120def change_command_line(prof): 121 """Change mpirun command line and arguments. 122 Argument : ASTER_PROFIL object 123 Return value : derivated of Runner class. 124 """ 125 cpu_mpi, node_mpi, cpu_openmp, blas_thread = getCpuParametersLocal(prof) 126 # for compatibility with version < 13.1 127 use_numthreads = False 128 vers = prof['version'][0] 129 if vers: 130 conf = build_config_of_version(magic.run, vers, error=False) 131 if conf: 132 build = AsterBuild(magic.run, conf) 133 use_numthreads = build.support('use_numthreads') 134 if not use_numthreads: 135 cpu_openmp = prof['ncpus'][0] 136 # end of compatibility block 137 138 139 class ModifiedRunner(Runner): 140 """Modified Runner to export some variables before execution""" 141 142 def get_exec_command(self, cmd_in, add_tee=False, env=None): 143 """Return command to run Code_Aster. 144 Export specific variables for Intel MKL""" 145 cmd = Runner.get_exec_command(self, cmd_in, add_tee, env) 146 cmd = ( 147 "export OMP_NUM_THREADS={openmp} ; " 148 "export MKL_NUM_THREADS={blas} ; " 149 "export I_MPI_PIN_DOMAIN=omp:compact ; " 150 ).format(openmp=cpu_openmp, blas=blas_thread) + cmd 151 return cmd 152 153 return ModifiedRunner 154