1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import sys
5import os.path as osp
6import re
7import unittest
8from math import log10
9
10from common import dict_conf, execcmd, tmpdir
11from data   import parametric_export, parametric_poursuite_export
12
13import asrun
14from asrun.run          import AsRunFactory
15from asrun.profil       import AsterProfil
16from asrun.repart       import get_hostrc
17from asrun.parametric   import is_list_of_dict
18from asrun.thread       import Dispatcher
19from asrun.distrib      import DistribParametricTask
20
21
22class TestParametric(unittest.TestCase):
23
24    def test01_cmdline(self):
25        export = osp.join(tmpdir, "parametric.export")
26        with open(export, "w") as f:
27            f.write(parametric_export % dict_conf)
28        cmd = dict_conf["as_run"] + [export]
29        iret, output = execcmd(cmd, "parametric.1", return_output=True)
30        assert iret == 0
31        assert len(re.findall("DIAGNOSTIC JOB *: *<A>", output)) == 1   # repe != base
32        assert len(re.findall("0 err", output)) == 1
33        assert osp.exists(osp.join(tmpdir, 'parametric.resu/calc_1/base/glob.1'))
34        with open(osp.join(tmpdir, 'parametric.resu/calc_1/command_0.comm'), 'r') as f:
35            content = f.read()
36        assert content.find("INCLUDE") > -1
37
38
39    def test02_cmdline_poursuite(self):
40        export = osp.join(tmpdir, "parametric_poursuite.export")
41        with open(export, "w") as f:
42            f.write(parametric_poursuite_export % dict_conf)
43        cmd = dict_conf["as_run"] + [export]
44        iret, output = execcmd(cmd, "parametric.2", return_output=True)
45        assert iret == 0
46        assert len(re.findall("DIAGNOSTIC JOB *: *OK", output)) == 1
47
48
49    def test03_using_api(self):
50        from asrun.core import magic
51        run = AsRunFactory()
52        magic.set_stdout(osp.join(tmpdir, "parametric.3" + ".out"))
53        prof = AsterProfil()
54        prof['actions'] = 'distribution'
55        prof['nomjob'] = 'parametric_api'
56        prof['mode']    = 'interactif'
57        prof['version'] = dict_conf['ASTER_VERSION']
58        prof['debug']   = 'nodebug'
59        user, host = run.system.getuser_host()
60        prof['origine']  = 'as_run %s' % run['version']
61        prof['mclient']  = host
62        prof['uclient']  = user
63        prof['serveur']  = host
64        prof['username'] = user
65        prof.args['memjeveux'] = 32
66        prof.args['tpmax']     = 60
67        prof['memjob'] = 65536
68        prof['tpsjob'] = 1
69        prof.Set('D', {
70            'type' : 'comm', 'isrep' : False, 'ul' : 1, 'compr' : False,
71            'path' : "%(DATA)s/study.comm" % dict_conf })
72        prof.Set('D', {
73            'type' : 'hostfile', 'isrep' : False, 'ul' : 0, 'compr' : False,
74            'path' : "%(DATA)s/hostfile" % dict_conf })
75        # result directories
76        resudir  = "%(TMPDIR)s/parametric_api.resu" % dict_conf
77        flashdir = "%(TMPDIR)s/parametric_api.flash" % dict_conf
78        prof.Set('R', {
79            'type' : 'repe', 'isrep' : True, 'ul' : 0, 'compr' : False,
80            'path' : resudir })
81        # add a result
82        prof.Set('R', {
83            'type' : 'libr', 'isrep' : False, 'ul' : 6, 'compr' : False,
84            'path' : "/unused_directory_name/study.mess" })
85        # get hostrc object
86        hostrc = get_hostrc(run, prof)
87        # timeout before rejected a job
88        timeout = prof.get_timeout()
89        # list of parameters
90        list_val = [
91            { 'P1' : 111., }, { 'P1' : 222., }, { 'P1' : 0., }
92        ]
93        assert is_list_of_dict(list_val)
94        nbval = len(list_val)
95        assert nbval == 3
96        # number of threads to follow execution
97        numthread = 1
98        # ----- Execute calcutions in parallel using a Dispatcher object
99        # elementary task...
100        task = DistribParametricTask(run=run, prof=prof, # IN
101                             hostrc=hostrc,
102                             nbmaxitem=0, timeout=timeout,
103                             resudir=resudir, flashdir=flashdir,
104                             reptrav=tmpdir,
105                             info=1,
106                             keywords={ 'POST_CALCUL' : """print 'test USE_POST_CALCUL'""" },
107                             nbnook=[0,]*numthread, exec_result=[])            # OUT
108        # ... and dispatch task on 'list_val'
109        etiq = 'calc_%%0%dd' % (int(log10(nbval)) + 1)
110        labels = [etiq % (i+1) for i in range(nbval)]
111        couples = list(zip(labels, list_val))
112        execution = Dispatcher(couples, task, numthread=numthread)
113        # expect one error
114        assert task.nbnook[0] == 1, task.nbnook
115        list_output = []
116        for result in task.exec_result:
117            calc, diag, output_filename = result[0], result[2], result[7]
118            gravity = run.GetGrav(diag)
119            if gravity > run.GetGrav("<A>"):
120                list_output.append((calc, diag, output_filename))
121            # check that output files exist
122            assert osp.isfile(osp.join(resudir, calc, "study.mess"))
123        assert len(list_output) == 1, list_output
124
125
126if __name__ == "__main__":
127    unittest.main()
128