1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4import sys 5import os.path as osp 6import re 7import unittest 8from math import log10 9 10from common import dict_conf, execcmd, tmpdir 11from data import parametric_export, parametric_poursuite_export 12 13import asrun 14from asrun.run import AsRunFactory 15from asrun.profil import AsterProfil 16from asrun.repart import get_hostrc 17from asrun.parametric import is_list_of_dict 18from asrun.thread import Dispatcher 19from asrun.distrib import DistribParametricTask 20 21 22class TestParametric(unittest.TestCase): 23 24 def test01_cmdline(self): 25 export = osp.join(tmpdir, "parametric.export") 26 with open(export, "w") as f: 27 f.write(parametric_export % dict_conf) 28 cmd = dict_conf["as_run"] + [export] 29 iret, output = execcmd(cmd, "parametric.1", return_output=True) 30 assert iret == 0 31 assert len(re.findall("DIAGNOSTIC JOB *: *<A>", output)) == 1 # repe != base 32 assert len(re.findall("0 err", output)) == 1 33 assert osp.exists(osp.join(tmpdir, 'parametric.resu/calc_1/base/glob.1')) 34 with open(osp.join(tmpdir, 'parametric.resu/calc_1/command_0.comm'), 'r') as f: 35 content = f.read() 36 assert content.find("INCLUDE") > -1 37 38 39 def test02_cmdline_poursuite(self): 40 export = osp.join(tmpdir, "parametric_poursuite.export") 41 with open(export, "w") as f: 42 f.write(parametric_poursuite_export % dict_conf) 43 cmd = dict_conf["as_run"] + [export] 44 iret, output = execcmd(cmd, "parametric.2", return_output=True) 45 assert iret == 0 46 assert len(re.findall("DIAGNOSTIC JOB *: *OK", output)) == 1 47 48 49 def test03_using_api(self): 50 from asrun.core import magic 51 run = AsRunFactory() 52 magic.set_stdout(osp.join(tmpdir, "parametric.3" + ".out")) 53 prof = AsterProfil() 54 prof['actions'] = 'distribution' 55 prof['nomjob'] = 'parametric_api' 56 prof['mode'] = 'interactif' 57 prof['version'] = dict_conf['ASTER_VERSION'] 58 prof['debug'] = 'nodebug' 59 user, host = run.system.getuser_host() 60 prof['origine'] = 'as_run %s' % run['version'] 61 prof['mclient'] = host 62 prof['uclient'] = user 63 prof['serveur'] = host 64 prof['username'] = user 65 prof.args['memjeveux'] = 32 66 prof.args['tpmax'] = 60 67 prof['memjob'] = 65536 68 prof['tpsjob'] = 1 69 prof.Set('D', { 70 'type' : 'comm', 'isrep' : False, 'ul' : 1, 'compr' : False, 71 'path' : "%(DATA)s/study.comm" % dict_conf }) 72 prof.Set('D', { 73 'type' : 'hostfile', 'isrep' : False, 'ul' : 0, 'compr' : False, 74 'path' : "%(DATA)s/hostfile" % dict_conf }) 75 # result directories 76 resudir = "%(TMPDIR)s/parametric_api.resu" % dict_conf 77 flashdir = "%(TMPDIR)s/parametric_api.flash" % dict_conf 78 prof.Set('R', { 79 'type' : 'repe', 'isrep' : True, 'ul' : 0, 'compr' : False, 80 'path' : resudir }) 81 # add a result 82 prof.Set('R', { 83 'type' : 'libr', 'isrep' : False, 'ul' : 6, 'compr' : False, 84 'path' : "/unused_directory_name/study.mess" }) 85 # get hostrc object 86 hostrc = get_hostrc(run, prof) 87 # timeout before rejected a job 88 timeout = prof.get_timeout() 89 # list of parameters 90 list_val = [ 91 { 'P1' : 111., }, { 'P1' : 222., }, { 'P1' : 0., } 92 ] 93 assert is_list_of_dict(list_val) 94 nbval = len(list_val) 95 assert nbval == 3 96 # number of threads to follow execution 97 numthread = 1 98 # ----- Execute calcutions in parallel using a Dispatcher object 99 # elementary task... 100 task = DistribParametricTask(run=run, prof=prof, # IN 101 hostrc=hostrc, 102 nbmaxitem=0, timeout=timeout, 103 resudir=resudir, flashdir=flashdir, 104 reptrav=tmpdir, 105 info=1, 106 keywords={ 'POST_CALCUL' : """print 'test USE_POST_CALCUL'""" }, 107 nbnook=[0,]*numthread, exec_result=[]) # OUT 108 # ... and dispatch task on 'list_val' 109 etiq = 'calc_%%0%dd' % (int(log10(nbval)) + 1) 110 labels = [etiq % (i+1) for i in range(nbval)] 111 couples = list(zip(labels, list_val)) 112 execution = Dispatcher(couples, task, numthread=numthread) 113 # expect one error 114 assert task.nbnook[0] == 1, task.nbnook 115 list_output = [] 116 for result in task.exec_result: 117 calc, diag, output_filename = result[0], result[2], result[7] 118 gravity = run.GetGrav(diag) 119 if gravity > run.GetGrav("<A>"): 120 list_output.append((calc, diag, output_filename)) 121 # check that output files exist 122 assert osp.isfile(osp.join(resudir, calc, "study.mess")) 123 assert len(list_output) == 1, list_output 124 125 126if __name__ == "__main__": 127 unittest.main() 128