1# -*- coding: utf-8 -*-
2
3# ==============================================================================
4# COPYRIGHT (C) 1991 - 2015  EDF R&D                  WWW.CODE-ASTER.ORG
5# THIS PROGRAM IS FREE SOFTWARE; YOU CAN REDISTRIBUTE IT AND/OR MODIFY
6# IT UNDER THE TERMS OF THE GNU GENERAL PUBLIC LICENSE AS PUBLISHED BY
7# THE FREE SOFTWARE FOUNDATION; EITHER VERSION 2 OF THE LICENSE, OR
8# (AT YOUR OPTION) ANY LATER VERSION.
9#
10# THIS PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT
11# WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF
12# MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. SEE THE GNU
13# GENERAL PUBLIC LICENSE FOR MORE DETAILS.
14#
15# YOU SHOULD HAVE RECEIVED A COPY OF THE GNU GENERAL PUBLIC LICENSE
16# ALONG WITH THIS PROGRAM; IF NOT, WRITE TO EDF R&D CODE_ASTER,
17#    1 AVENUE DU GENERAL DE GAULLE, 92141 CLAMART CEDEX, FRANCE.
18# ==============================================================================
19
20"""
21This module defines the default schemes.
22
23Additionnal modules can be added to define others schemes to extend
24the capabilities of asrun.
25
26These plugins can be added in any directory listed in PYTHONPATH.
27But it's recommended to place them in etc/codeaster/plugins because
28the modules added in this directory will be kept during updates of asrun.
29"""
30
31import os
32import os.path as osp
33
34from asrun.common.i18n  import _
35from asrun.core         import magic
36from asrun.calcul       import parse_submission_result, parse_consbtc
37from asrun.job          import parse_actu_result, print_actu_result
38from asrun.profil       import AsterProfil
39from asrun.core.configuration import get_plt_exec_name
40from asrun.common_func  import flash_filename, edit_file
41from asrun.common.utils import get_absolute_path, unique_basename
42from asrun.common.sysutils import local_user, local_full_host, get_home_directory
43from asrun.core.server  import build_server_from_profile, TYPES
44from asrun.rex          import parse_issue_file
45from asrun.profile_modifier import apply_special_service
46
47
48
49def serv(prof, args, print_output=True, **kwargs):
50    """Call --serv action on a server."""
51    run = magic.run
52    num_job = kwargs.get('num_job', run['num_job'])
53    # decode special service
54    serv, prof = apply_special_service(prof, run, on_client_side=True)
55    if serv != "":
56        magic.log.info(_("special service : %s"), serv)
57    # set studyid (if already defined in prof, use this one)
58    studyid = prof['studyid'][0]
59    if studyid == '':
60        studyid = "%s-%s" % (num_job, prof['mclient'][0].split('.')[0])
61    # read server informations
62    sexec = build_server_from_profile(prof, TYPES.EXEC)
63    scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=studyid)
64    magic.log.info(_("prepare execution for %s@%s"), sexec.user, sexec.host)
65
66    # prepare export for execution with all files present in the remote directory
67    fprof = osp.join(run['tmp_user'], "%s.export" % studyid)
68    forig = osp.join(run['tmp_user'], "%s.orig.export" % studyid)
69    prof.set_filename(forig)
70    prof.WriteExportTo(forig)
71    run.ToDelete(fprof)
72    run.ToDelete(forig)
73
74    iret, remote_prof = copy_datafiles_on_server(prof, studyid, fprof)
75    if iret != 0:
76        return iret, ''
77
78    # launch the study
79    cmd = [osp.join(sexec.get_aster_root(), "bin",
80                    get_plt_exec_name(remote_prof.get_platform(), "as_run")), ]
81    cmd.append("--serv")
82    cmd.append("--num_job=%s" % studyid)
83    cmd.extend(run.get_rcdir_arg())
84    cmd.append(remote_prof.get_filename())
85
86    iret, output, err = sexec.exec_command(cmd, display_forwarding=True)
87    run.DBG("******************** OUTPUT of as_run --serv ********************",output,
88            "******************** ERROR of as_run --serv ********************", err,
89            "******************** END of as_run --serv ********************",
90            all=True, prefix="    ")
91    jobid, queue, stid2 = parse_submission_result(output)
92    run.DBG("The server returns %s and studyid is set to %s" % (stid2, studyid))
93    if print_output:
94        print("JOBID=%s QUEUE=%s STUDYID=%s" % (jobid, queue, studyid))
95    btc = parse_consbtc(output)
96    if btc is not None and print_output:
97        print("BTCFILE=%s" % btc)
98    if iret != 0:
99        output += os.linesep.join([output,
100            "******************** ERROR of as_run --serv ********************", err])
101    return iret, output
102
103
104def get_results(prof, args, **kwargs):
105    """Download result files from the server."""
106    run = magic.run
107
108    magic.log.info(_("get result files from the server"))
109    # read studyid
110    study_prof = get_study_export(prof)
111    if study_prof is None:
112        return 4
113    studyid = study_prof['studyid'][0]
114    forig = osp.join(run['tmp_user'], "%s.orig.export" % studyid)
115
116    # read server informations
117    scopy = build_server_from_profile(study_prof, TYPES.COPY_FROM, jobid=studyid)
118    # get original export
119    iret = scopy.copyfrom(forig)
120    if iret != 0:
121        magic.log.warn(_("the results seem already downloaded."))
122        return iret
123    oprof = AsterProfil(forig, run)
124    magic.run.DBG("original export :\n%s" % repr(oprof), all=True)
125
126    run_on_localhost = scopy.is_localhost()
127    # copy results files
128    if not run_on_localhost:
129        local_resu = oprof.get_result().get_on_serv(local_full_host)
130        local_nom, local_other = local_resu.get_type('nom', with_completion=True)
131        iret = scopy.copyfrom(convert=unique_basename, *local_other.topath())
132        jret = scopy.copyfrom(*local_nom.topath())
133        iret = max(iret, jret)
134        local_resu = local_resu.topath()
135    else:
136        local_resu = []
137
138    remote_resu = oprof.get_result().get_on_serv(scopy.host, scopy.user).topath()
139    all = set(oprof.get_result().topath())
140    all.difference_update(local_resu)
141    all.difference_update(remote_resu)
142    if len(all) > 0:
143        magic.log.warn(_("files on a third host should have been copied "
144            "at the end of the calculation (if possible) : %s"),
145            [e.repr() for e in all])
146
147    # remove remote repository
148    if iret == 0:
149        scopy.delete_proxy_dir()
150    return iret
151
152
153def get_study_export(prof):
154    """Return the original export."""
155    run = magic.run
156    # read server informations
157    serv = build_server_from_profile(prof, TYPES.COPY_FROM)
158
159    jobid = prof['jobid'][0]
160    jobname = prof['nomjob'][0]
161    mode = prof['mode'][0]
162    # flasheur is in the home directory
163    dirname, fname = osp.split(flash_filename("flasheur", jobname, jobid, "export", mode))
164    run.DBG("export file in %s is named %s" % (dirname, fname))
165
166    # copy export file locally
167    serv.set_proxy_dir(dirname)
168    dst = osp.join(run['tmp_user'], 'flasheur_%s' % serv.host, fname)
169    iret = serv.copyfrom(dst)
170    if iret != 0:
171        return None
172
173    # read studyid
174    study_prof = AsterProfil(dst, run)
175    return study_prof
176
177
178def call_generic_service(action, serv, prof, args, options={}):
179    """Generic function : call the service on the given server."""
180    # BE CAREFULL : first argument of `args` is ignored (supposed to be equal to `prof`)
181    # the server is created by the caller essentially to give relevant progress informations
182    # launch the service
183    run = magic.run
184    cmd = [osp.join(serv.get_aster_root(), "bin",
185                    get_plt_exec_name(prof.get_platform(), "as_run")), ]
186    cmd.append("--" + action)
187    cmd.extend(run.get_rcdir_arg())
188    for key, val in list(options.items()):
189        if val is True:
190            val = ""
191        else:
192            val = "=%s" % val
193        cmd.append("--%s%s" % (key, val))
194    cmd.extend(args[1:])
195
196    iret, output, err = serv.exec_command(cmd)
197    run.DBG("******************** OUTPUT of as_run --%s ********************" % action, output,
198            #"******************** ERROR of as_run --%s ********************" % action, err,
199            "******************** END of as_run --%s ********************" % action,
200            all=True, prefix="    ")
201    if iret != 0:
202        output += os.linesep.join([output,
203            "******************** ERROR of as_run --%s ********************" % action, err])
204    return iret, output, err
205
206
207def copy_datafiles_on_server(prof, studyid, fprof):
208    """Copy data files on the server, relocate the export and write it (locally)
209    into 'fprof'. Return the relocated export."""
210    # read server informations
211    sexec = build_server_from_profile(prof, TYPES.EXEC)
212    scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=studyid)
213    forig = prof.get_filename()
214    magic.log.debug("original export name is %s", forig)
215    magic.run.DBG("original export :\n%s" % repr(prof), all=True)
216
217    run_on_localhost = sexec.is_localhost()
218    remote_prof = prof.copy()
219    if not run_on_localhost:
220        remote_prof.relocate(local_full_host, scopy.get_proxy_dir(),
221                             convert=unique_basename)
222    remote_prof.relocate(sexec.host, convert=unique_basename)
223    remote_prof['studyid'] = studyid
224    remote_prof.set_filename(fprof)
225    remote_prof.WriteExportTo(fprof)
226    magic.run.DBG("remote export :\n%s" % repr(remote_prof), all=True)
227
228    # copy data files
229    # - local_data : files which are on localhost and to copy on the compute server
230    all_data = prof.get_data()
231    if not run_on_localhost:
232        local_data = all_data.get_on_serv(local_full_host)
233    else:
234        local_data = []
235    # - remote_data : these files are already on the server, use them directly
236    remote_data = all_data.get_on_serv(sexec.host, sexec.user).topath()
237    # - foreign_data : if they are on a foreign server (not localhost or
238    #   compute server), we can :
239    #   - first copy them locally, and send them to the compute server.
240    #   - or try to copy them from the compute server (<<< let this choice right now).
241    all = set(all_data.topath())
242    if local_data:
243        all.difference_update(local_data.topath())
244    all.difference_update(remote_data)
245    if len(all) > 0:
246        magic.log.warn(_("files on a third host may be unavailable "
247                          "for calculation : %s"), all)
248
249    # copy files on the server
250    magic.log.info(_("copy export files..."))
251    iret = scopy.copyto(fprof, forig)
252    magic.log.info(_("copy data files..."))
253    if local_data:
254        local_nom, local_other = local_data.get_type('nom', with_completion=True)
255        iret = scopy.copyto(convert=unique_basename, *local_other.topath())
256        jret = scopy.copyto(*local_nom.topath())
257        iret = max(iret, jret)
258
259    # set remote filename
260    remote_prof.set_filename(scopy.get_remote_filename(fprof))
261    return iret, remote_prof
262
263
264def actu(prof, args, **kwargs):
265    """Default schema for 'actu' action."""
266    return actu_and_results(prof, args, **kwargs)
267
268
269def actu_and_results(prof, args, print_output=True, **kwargs):
270    """Call --actu action on a server
271    + call automatically --get_results if the job is ended."""
272    iret, output = _call_actu(prof, args)
273    result = parse_actu_result(output)
274    if print_output:
275        print_actu_result(*result)
276    if iret == 0 and result[0] == "ENDED":
277        iret2 = get_results(prof, args)
278    return iret, output
279
280
281def actu_simple(prof, args, print_output=True, **kwargs):
282    """Call --actu action on a server"""
283    iret, output = _call_actu(prof, args)
284    result = parse_actu_result(output)
285    if print_output:
286        print_actu_result(*result)
287    return iret, output
288
289
290def _call_actu(prof, args):
291    """Call --actu action on a server"""
292    jobid = prof['jobid'][0]
293    jobname = prof['nomjob'][0]
294    mode = prof['mode'][0]
295    targs = (None, jobid, jobname, mode)
296
297    # read server informations
298    serv = build_server_from_profile(prof, TYPES.EXEC)
299
300    magic.log.info(_("ask the server for the job status"))
301    iret, output, err = call_generic_service("actu", serv, prof, targs)
302    magic.log.debug("server returns %s", output)
303    result = parse_actu_result(output)
304    magic.log.info(_("job status is %s"), result[0])
305    return iret, output
306
307
308def stop_del(prof, args, **kwargs):
309    """Call --del action on a server"""
310    # retreive the study export before it would be removed
311    study_prof = get_study_export(prof)
312
313    jobid = prof['jobid'][0]
314    jobname = prof['nomjob'][0]
315    mode = prof['mode'][0]
316    node = prof['noeud'][0]
317    targs = (None, jobid, jobname, mode, node)
318    signal = kwargs.get('signal', magic.run['signal'])
319
320    # read server informations
321    serv = build_server_from_profile(prof, TYPES.EXEC)
322    magic.log.info(_("ask the server to cancel the job and remove its "
323                      "files from 'flasheur'"))
324    iret, output, err = call_generic_service("del", serv, prof, targs,
325        { 'signal' : signal })
326
327    # stop here if signal!=KILL or the study id have not been retreived
328    if signal != 'KILL' or study_prof is None:
329        return iret
330
331    # remove remote repository
332    studyid = study_prof['studyid'][0]
333    scopy = build_server_from_profile(study_prof, TYPES.COPY_FROM, jobid=studyid)
334    scopy.delete_proxy_dir()
335    return iret
336
337
338def purge_flash(prof, args, **kwargs):
339    """Call --purge_flash action on a server"""
340    # read server informations
341    serv = build_server_from_profile(prof, TYPES.EXEC)
342
343    iret, output, err = call_generic_service("purge_flash", serv, prof, args)
344    return iret
345
346
347def tail(prof, args, print_output=True, **kwargs):
348    """Call --tail action on a server"""
349    jobid = prof['jobid'][0]
350    jobname = prof['nomjob'][0]
351    mode = prof['mode'][0]
352    nbline = prof['tail_nbline'][0]
353    regexp = prof['tail_regexp'][0]
354    targs = (None, jobid, jobname, mode, 'None', nbline, regexp)
355
356    # read server informations
357    serv = build_server_from_profile(prof, TYPES.EXEC)
358
359    options = { 'result_to_output' : True }
360    iret, output, err = call_generic_service("tail", serv, prof, targs, options)
361    if print_output:
362        print(output)
363    # not expected in output
364    magic.run.PrintExitCode = False
365    return iret, output
366
367
368def info(prof, args, print_output=True, **kwargs):
369    """Call --info action on a server"""
370    # read server informations
371    serv = build_server_from_profile(prof, TYPES.EXEC)
372
373    magic.log.info(_("retreive configuration informations of the server"))
374    iret, output, err = call_generic_service("info", serv, prof, args)
375    if print_output:
376        print(output)
377    # already in output
378    magic.run.PrintExitCode = False
379    return iret, output
380
381
382def edit(prof, args, **kwargs):
383    """Default schema for 'edit' action."""
384    return local_edit(prof, args, **kwargs)
385
386
387def remote_edit(prof, args, **kwargs):
388    """Call --edit action on a server by opening an editor on the
389    server."""
390    #XXX --edit does not yet support display argument: remote_edit may not work
391    # maybe by adding a sleeping time...?
392    action = "edit"
393    jobid = prof['jobid'][0]
394    jobname = prof['nomjob'][0]
395    mode = prof['mode'][0]
396    typ = prof['edit_type'][0]
397    displ = prof['display'][0]
398
399    # read server informations
400    serv = build_server_from_profile(prof, TYPES.EXEC)
401
402    if serv.support_display_forwarding():
403        # --edit will use os.environ['DISPLAY']
404        # but this will not work if ssh is run in background...
405        displ = "None:0"
406
407    # launch the service
408    cmd = [osp.join(serv.get_aster_root(), "bin",
409                    get_plt_exec_name(prof.get_platform(), "as_run")), ]
410    cmd.append("--" + action)
411    cmd.extend(magic.run.get_rcdir_arg())
412    targs = (jobid, jobname, mode, typ)
413    cmd.extend(targs)
414
415    iret, output, err = serv.exec_command(cmd, display_forwarding=True)
416    magic.run.DBG("******************** OUTPUT of as_run --%s ********************" % action, output,
417            "******************** END of as_run --%s ********************" % action,
418            all=True, prefix="    ")
419    return iret
420
421
422def local_edit(prof, args, **kwargs):
423    """Call --edit action on a server by using a local editor after
424    copying file if it's remote."""
425    run = magic.run
426    jobid = prof['jobid'][0]
427    jobname = prof['nomjob'][0]
428    mode = prof['mode'][0]
429    typ = prof['edit_type'][0]
430    to_output = kwargs.get('result_to_output', run['result_to_output'])
431
432    iret = 0
433    # read server informations
434    serv = build_server_from_profile(prof, TYPES.COPY_FROM)
435
436    # flasheur is in the home directory
437    dirname, fname = osp.split(flash_filename("flasheur", jobname, jobid, typ, mode))
438    run.DBG("file to edit is in %s named %s" % (dirname, fname))
439
440    # copy from dirname if not on localhost
441    if not serv.is_localhost():
442        serv.set_proxy_dir(dirname)
443        dst = osp.join(run['tmp_user'], 'flasheur_%s' % serv.host, fname)
444        is_agla_astout = prof['nomjob'][0].startswith('pre_eda') \
445                      or prof['nomjob'][0].startswith('asrest')
446        if not osp.exists(dst) or is_agla_astout:
447            iret = serv.copyfrom(dst)
448    else:
449        iret = 0
450        dst = osp.join(get_home_directory(), dirname, fname)
451
452    if iret == 0 and not to_output:
453        magic.log.info(_("edit file %s"), dst)
454        edit_file(run, dst)
455    return iret
456
457
458def sendmail(prof, args, **kwargs):
459    """Call --sendmail action on a server.
460    Allow to send a mail even if it's not configured on localhost."""
461    # put the file on the server
462    run = magic.run
463    num_job = kwargs.get('num_job', run['num_job'])
464    to = kwargs.get('report_to', run['report_to'])
465    jobid = '%s-%s' % (num_job, "sendmail")
466    # read server informations
467    sexec = build_server_from_profile(prof, TYPES.EXEC)
468    scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=jobid)
469
470    # copy text file
471    scopy.copyto(args[1])
472    rfile = scopy.get_remote_filename(args[1])
473    targs = (None, rfile, )
474
475    iret, output, err = call_generic_service("sendmail", sexec, prof, targs,
476        { 'report_to' : to })
477    scopy.delete_proxy_dir()
478    return iret
479
480
481def get_export(prof, args, print_output=True, **kwargs):
482    """Call --get_export action on a server."""
483    # read server informations
484    serv = build_server_from_profile(prof, TYPES.EXEC)
485    vers = kwargs.get('vers', magic.run['aster_vers'])
486
487    iret, output, err = call_generic_service("get_export", serv, prof, args,
488        { 'vers' : vers })
489    if print_output:
490        print(output)
491    return iret, output
492
493
494def serv_with_reverse_access(prof, args, print_output=True, **kwargs):
495    """The old way to call a server :
496    - the server is called directly through ssh/rsh
497    - reverse access to the client from the server is required to
498      read the export file and other data files.
499    """
500    action = "serv"
501    # read server informations
502    serv = build_server_from_profile(prof, TYPES.EXEC)
503
504    filename = get_absolute_path(prof.get_filename())
505    if not serv.is_localhost():
506        prof.from_remote_server()
507        prof.WriteExportTo(filename)
508
509    filename = "%s:%s" % (local_full_host, filename)
510    if local_user != '':
511        filename = "%s@%s" % (local_user, filename)
512
513    cmd = [osp.join(serv.get_aster_root(), "bin",
514                    get_plt_exec_name(prof.get_platform(), "as_run")), ]
515    cmd.append("--" + action)
516    cmd.append(filename)
517
518    # for compatibility with old asrun server
519    cmd.extend(magic.run.get_as_run_args())
520
521    iret, output, err = serv.exec_command(cmd, display_forwarding=True)
522    magic.run.DBG("******************** OUTPUT of as_run --%s ********************" % action, output,
523            "******************** END of as_run --%s ********************" % action,
524            all=True, prefix="    ")
525    res = parse_submission_result(output)
526    if print_output:
527        print("JOBID=%s QUEUE=%s STUDYID=%s" % res)
528    return iret
529
530
531def create_issue(prof, args, **kwargs):
532    """Call --create_issue on a server."""
533    # put the files on the server
534    run = magic.run
535    num_job = kwargs.get('num_job', run['num_job'])
536    jobid = '%s-%s' % (num_job, "create_issue")
537    # read server informations
538    sexec = build_server_from_profile(prof, TYPES.EXEC)
539    scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=jobid)
540
541    # read issue file
542    issue_file = args[0]
543    with open(issue_file, 'r') as f:
544        content = f.read()
545    dinf = parse_issue_file(content)
546    scopy.copyto(issue_file)
547    rfile = scopy.get_remote_filename(issue_file)
548
549    # should we copy data files ?
550    profname = "no_attachment"
551    if dinf.get('FICASS') is not None:
552        fprof = osp.join(run['tmp_user'], "%s.export" % jobid)
553        iret, remote_prof = copy_datafiles_on_server(prof, jobid, fprof)
554        profname = remote_prof.get_filename()
555        if iret != 0:
556            return iret
557
558    targs = (None, rfile, profname)
559    iret, output, err = call_generic_service("create_issue", sexec, prof, targs)
560    scopy.delete_proxy_dir()
561    return iret
562
563
564def insert_in_db(prof, args, **kwargs):
565    """Call --insert_in_db on a server."""
566    run = magic.run
567    num_job = kwargs.get('num_job', run['num_job'])
568    jobid = '%s-%s' % (num_job, "insert_in_db")
569    # read server informations
570    sexec = build_server_from_profile(prof, TYPES.EXEC)
571    scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=jobid)
572
573    # copy data files
574    #XXX should result files be copied too ?
575    #XXX should be run in foreground to keep display
576    fprof = osp.join(run['tmp_user'], "%s.export" % jobid)
577    iret, remote_prof = copy_datafiles_on_server(prof, jobid, fprof)
578    if iret != 0:
579        return iret
580
581    targs = (None, remote_prof.get_filename(), )
582    iret, output, err = call_generic_service("insert_in_db", sexec, prof, targs)
583    scopy.delete_proxy_dir()
584    return iret
585