1# -*- coding: utf-8 -*- 2 3# ============================================================================== 4# COPYRIGHT (C) 1991 - 2015 EDF R&D WWW.CODE-ASTER.ORG 5# THIS PROGRAM IS FREE SOFTWARE; YOU CAN REDISTRIBUTE IT AND/OR MODIFY 6# IT UNDER THE TERMS OF THE GNU GENERAL PUBLIC LICENSE AS PUBLISHED BY 7# THE FREE SOFTWARE FOUNDATION; EITHER VERSION 2 OF THE LICENSE, OR 8# (AT YOUR OPTION) ANY LATER VERSION. 9# 10# THIS PROGRAM IS DISTRIBUTED IN THE HOPE THAT IT WILL BE USEFUL, BUT 11# WITHOUT ANY WARRANTY; WITHOUT EVEN THE IMPLIED WARRANTY OF 12# MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. SEE THE GNU 13# GENERAL PUBLIC LICENSE FOR MORE DETAILS. 14# 15# YOU SHOULD HAVE RECEIVED A COPY OF THE GNU GENERAL PUBLIC LICENSE 16# ALONG WITH THIS PROGRAM; IF NOT, WRITE TO EDF R&D CODE_ASTER, 17# 1 AVENUE DU GENERAL DE GAULLE, 92141 CLAMART CEDEX, FRANCE. 18# ============================================================================== 19 20""" 21This module defines the default schemes. 22 23Additionnal modules can be added to define others schemes to extend 24the capabilities of asrun. 25 26These plugins can be added in any directory listed in PYTHONPATH. 27But it's recommended to place them in etc/codeaster/plugins because 28the modules added in this directory will be kept during updates of asrun. 29""" 30 31import os 32import os.path as osp 33 34from asrun.common.i18n import _ 35from asrun.core import magic 36from asrun.calcul import parse_submission_result, parse_consbtc 37from asrun.job import parse_actu_result, print_actu_result 38from asrun.profil import AsterProfil 39from asrun.core.configuration import get_plt_exec_name 40from asrun.common_func import flash_filename, edit_file 41from asrun.common.utils import get_absolute_path, unique_basename 42from asrun.common.sysutils import local_user, local_full_host, get_home_directory 43from asrun.core.server import build_server_from_profile, TYPES 44from asrun.rex import parse_issue_file 45from asrun.profile_modifier import apply_special_service 46 47 48 49def serv(prof, args, print_output=True, **kwargs): 50 """Call --serv action on a server.""" 51 run = magic.run 52 num_job = kwargs.get('num_job', run['num_job']) 53 # decode special service 54 serv, prof = apply_special_service(prof, run, on_client_side=True) 55 if serv != "": 56 magic.log.info(_("special service : %s"), serv) 57 # set studyid (if already defined in prof, use this one) 58 studyid = prof['studyid'][0] 59 if studyid == '': 60 studyid = "%s-%s" % (num_job, prof['mclient'][0].split('.')[0]) 61 # read server informations 62 sexec = build_server_from_profile(prof, TYPES.EXEC) 63 scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=studyid) 64 magic.log.info(_("prepare execution for %s@%s"), sexec.user, sexec.host) 65 66 # prepare export for execution with all files present in the remote directory 67 fprof = osp.join(run['tmp_user'], "%s.export" % studyid) 68 forig = osp.join(run['tmp_user'], "%s.orig.export" % studyid) 69 prof.set_filename(forig) 70 prof.WriteExportTo(forig) 71 run.ToDelete(fprof) 72 run.ToDelete(forig) 73 74 iret, remote_prof = copy_datafiles_on_server(prof, studyid, fprof) 75 if iret != 0: 76 return iret, '' 77 78 # launch the study 79 cmd = [osp.join(sexec.get_aster_root(), "bin", 80 get_plt_exec_name(remote_prof.get_platform(), "as_run")), ] 81 cmd.append("--serv") 82 cmd.append("--num_job=%s" % studyid) 83 cmd.extend(run.get_rcdir_arg()) 84 cmd.append(remote_prof.get_filename()) 85 86 iret, output, err = sexec.exec_command(cmd, display_forwarding=True) 87 run.DBG("******************** OUTPUT of as_run --serv ********************",output, 88 "******************** ERROR of as_run --serv ********************", err, 89 "******************** END of as_run --serv ********************", 90 all=True, prefix=" ") 91 jobid, queue, stid2 = parse_submission_result(output) 92 run.DBG("The server returns %s and studyid is set to %s" % (stid2, studyid)) 93 if print_output: 94 print("JOBID=%s QUEUE=%s STUDYID=%s" % (jobid, queue, studyid)) 95 btc = parse_consbtc(output) 96 if btc is not None and print_output: 97 print("BTCFILE=%s" % btc) 98 if iret != 0: 99 output += os.linesep.join([output, 100 "******************** ERROR of as_run --serv ********************", err]) 101 return iret, output 102 103 104def get_results(prof, args, **kwargs): 105 """Download result files from the server.""" 106 run = magic.run 107 108 magic.log.info(_("get result files from the server")) 109 # read studyid 110 study_prof = get_study_export(prof) 111 if study_prof is None: 112 return 4 113 studyid = study_prof['studyid'][0] 114 forig = osp.join(run['tmp_user'], "%s.orig.export" % studyid) 115 116 # read server informations 117 scopy = build_server_from_profile(study_prof, TYPES.COPY_FROM, jobid=studyid) 118 # get original export 119 iret = scopy.copyfrom(forig) 120 if iret != 0: 121 magic.log.warn(_("the results seem already downloaded.")) 122 return iret 123 oprof = AsterProfil(forig, run) 124 magic.run.DBG("original export :\n%s" % repr(oprof), all=True) 125 126 run_on_localhost = scopy.is_localhost() 127 # copy results files 128 if not run_on_localhost: 129 local_resu = oprof.get_result().get_on_serv(local_full_host) 130 local_nom, local_other = local_resu.get_type('nom', with_completion=True) 131 iret = scopy.copyfrom(convert=unique_basename, *local_other.topath()) 132 jret = scopy.copyfrom(*local_nom.topath()) 133 iret = max(iret, jret) 134 local_resu = local_resu.topath() 135 else: 136 local_resu = [] 137 138 remote_resu = oprof.get_result().get_on_serv(scopy.host, scopy.user).topath() 139 all = set(oprof.get_result().topath()) 140 all.difference_update(local_resu) 141 all.difference_update(remote_resu) 142 if len(all) > 0: 143 magic.log.warn(_("files on a third host should have been copied " 144 "at the end of the calculation (if possible) : %s"), 145 [e.repr() for e in all]) 146 147 # remove remote repository 148 if iret == 0: 149 scopy.delete_proxy_dir() 150 return iret 151 152 153def get_study_export(prof): 154 """Return the original export.""" 155 run = magic.run 156 # read server informations 157 serv = build_server_from_profile(prof, TYPES.COPY_FROM) 158 159 jobid = prof['jobid'][0] 160 jobname = prof['nomjob'][0] 161 mode = prof['mode'][0] 162 # flasheur is in the home directory 163 dirname, fname = osp.split(flash_filename("flasheur", jobname, jobid, "export", mode)) 164 run.DBG("export file in %s is named %s" % (dirname, fname)) 165 166 # copy export file locally 167 serv.set_proxy_dir(dirname) 168 dst = osp.join(run['tmp_user'], 'flasheur_%s' % serv.host, fname) 169 iret = serv.copyfrom(dst) 170 if iret != 0: 171 return None 172 173 # read studyid 174 study_prof = AsterProfil(dst, run) 175 return study_prof 176 177 178def call_generic_service(action, serv, prof, args, options={}): 179 """Generic function : call the service on the given server.""" 180 # BE CAREFULL : first argument of `args` is ignored (supposed to be equal to `prof`) 181 # the server is created by the caller essentially to give relevant progress informations 182 # launch the service 183 run = magic.run 184 cmd = [osp.join(serv.get_aster_root(), "bin", 185 get_plt_exec_name(prof.get_platform(), "as_run")), ] 186 cmd.append("--" + action) 187 cmd.extend(run.get_rcdir_arg()) 188 for key, val in list(options.items()): 189 if val is True: 190 val = "" 191 else: 192 val = "=%s" % val 193 cmd.append("--%s%s" % (key, val)) 194 cmd.extend(args[1:]) 195 196 iret, output, err = serv.exec_command(cmd) 197 run.DBG("******************** OUTPUT of as_run --%s ********************" % action, output, 198 #"******************** ERROR of as_run --%s ********************" % action, err, 199 "******************** END of as_run --%s ********************" % action, 200 all=True, prefix=" ") 201 if iret != 0: 202 output += os.linesep.join([output, 203 "******************** ERROR of as_run --%s ********************" % action, err]) 204 return iret, output, err 205 206 207def copy_datafiles_on_server(prof, studyid, fprof): 208 """Copy data files on the server, relocate the export and write it (locally) 209 into 'fprof'. Return the relocated export.""" 210 # read server informations 211 sexec = build_server_from_profile(prof, TYPES.EXEC) 212 scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=studyid) 213 forig = prof.get_filename() 214 magic.log.debug("original export name is %s", forig) 215 magic.run.DBG("original export :\n%s" % repr(prof), all=True) 216 217 run_on_localhost = sexec.is_localhost() 218 remote_prof = prof.copy() 219 if not run_on_localhost: 220 remote_prof.relocate(local_full_host, scopy.get_proxy_dir(), 221 convert=unique_basename) 222 remote_prof.relocate(sexec.host, convert=unique_basename) 223 remote_prof['studyid'] = studyid 224 remote_prof.set_filename(fprof) 225 remote_prof.WriteExportTo(fprof) 226 magic.run.DBG("remote export :\n%s" % repr(remote_prof), all=True) 227 228 # copy data files 229 # - local_data : files which are on localhost and to copy on the compute server 230 all_data = prof.get_data() 231 if not run_on_localhost: 232 local_data = all_data.get_on_serv(local_full_host) 233 else: 234 local_data = [] 235 # - remote_data : these files are already on the server, use them directly 236 remote_data = all_data.get_on_serv(sexec.host, sexec.user).topath() 237 # - foreign_data : if they are on a foreign server (not localhost or 238 # compute server), we can : 239 # - first copy them locally, and send them to the compute server. 240 # - or try to copy them from the compute server (<<< let this choice right now). 241 all = set(all_data.topath()) 242 if local_data: 243 all.difference_update(local_data.topath()) 244 all.difference_update(remote_data) 245 if len(all) > 0: 246 magic.log.warn(_("files on a third host may be unavailable " 247 "for calculation : %s"), all) 248 249 # copy files on the server 250 magic.log.info(_("copy export files...")) 251 iret = scopy.copyto(fprof, forig) 252 magic.log.info(_("copy data files...")) 253 if local_data: 254 local_nom, local_other = local_data.get_type('nom', with_completion=True) 255 iret = scopy.copyto(convert=unique_basename, *local_other.topath()) 256 jret = scopy.copyto(*local_nom.topath()) 257 iret = max(iret, jret) 258 259 # set remote filename 260 remote_prof.set_filename(scopy.get_remote_filename(fprof)) 261 return iret, remote_prof 262 263 264def actu(prof, args, **kwargs): 265 """Default schema for 'actu' action.""" 266 return actu_and_results(prof, args, **kwargs) 267 268 269def actu_and_results(prof, args, print_output=True, **kwargs): 270 """Call --actu action on a server 271 + call automatically --get_results if the job is ended.""" 272 iret, output = _call_actu(prof, args) 273 result = parse_actu_result(output) 274 if print_output: 275 print_actu_result(*result) 276 if iret == 0 and result[0] == "ENDED": 277 iret2 = get_results(prof, args) 278 return iret, output 279 280 281def actu_simple(prof, args, print_output=True, **kwargs): 282 """Call --actu action on a server""" 283 iret, output = _call_actu(prof, args) 284 result = parse_actu_result(output) 285 if print_output: 286 print_actu_result(*result) 287 return iret, output 288 289 290def _call_actu(prof, args): 291 """Call --actu action on a server""" 292 jobid = prof['jobid'][0] 293 jobname = prof['nomjob'][0] 294 mode = prof['mode'][0] 295 targs = (None, jobid, jobname, mode) 296 297 # read server informations 298 serv = build_server_from_profile(prof, TYPES.EXEC) 299 300 magic.log.info(_("ask the server for the job status")) 301 iret, output, err = call_generic_service("actu", serv, prof, targs) 302 magic.log.debug("server returns %s", output) 303 result = parse_actu_result(output) 304 magic.log.info(_("job status is %s"), result[0]) 305 return iret, output 306 307 308def stop_del(prof, args, **kwargs): 309 """Call --del action on a server""" 310 # retreive the study export before it would be removed 311 study_prof = get_study_export(prof) 312 313 jobid = prof['jobid'][0] 314 jobname = prof['nomjob'][0] 315 mode = prof['mode'][0] 316 node = prof['noeud'][0] 317 targs = (None, jobid, jobname, mode, node) 318 signal = kwargs.get('signal', magic.run['signal']) 319 320 # read server informations 321 serv = build_server_from_profile(prof, TYPES.EXEC) 322 magic.log.info(_("ask the server to cancel the job and remove its " 323 "files from 'flasheur'")) 324 iret, output, err = call_generic_service("del", serv, prof, targs, 325 { 'signal' : signal }) 326 327 # stop here if signal!=KILL or the study id have not been retreived 328 if signal != 'KILL' or study_prof is None: 329 return iret 330 331 # remove remote repository 332 studyid = study_prof['studyid'][0] 333 scopy = build_server_from_profile(study_prof, TYPES.COPY_FROM, jobid=studyid) 334 scopy.delete_proxy_dir() 335 return iret 336 337 338def purge_flash(prof, args, **kwargs): 339 """Call --purge_flash action on a server""" 340 # read server informations 341 serv = build_server_from_profile(prof, TYPES.EXEC) 342 343 iret, output, err = call_generic_service("purge_flash", serv, prof, args) 344 return iret 345 346 347def tail(prof, args, print_output=True, **kwargs): 348 """Call --tail action on a server""" 349 jobid = prof['jobid'][0] 350 jobname = prof['nomjob'][0] 351 mode = prof['mode'][0] 352 nbline = prof['tail_nbline'][0] 353 regexp = prof['tail_regexp'][0] 354 targs = (None, jobid, jobname, mode, 'None', nbline, regexp) 355 356 # read server informations 357 serv = build_server_from_profile(prof, TYPES.EXEC) 358 359 options = { 'result_to_output' : True } 360 iret, output, err = call_generic_service("tail", serv, prof, targs, options) 361 if print_output: 362 print(output) 363 # not expected in output 364 magic.run.PrintExitCode = False 365 return iret, output 366 367 368def info(prof, args, print_output=True, **kwargs): 369 """Call --info action on a server""" 370 # read server informations 371 serv = build_server_from_profile(prof, TYPES.EXEC) 372 373 magic.log.info(_("retreive configuration informations of the server")) 374 iret, output, err = call_generic_service("info", serv, prof, args) 375 if print_output: 376 print(output) 377 # already in output 378 magic.run.PrintExitCode = False 379 return iret, output 380 381 382def edit(prof, args, **kwargs): 383 """Default schema for 'edit' action.""" 384 return local_edit(prof, args, **kwargs) 385 386 387def remote_edit(prof, args, **kwargs): 388 """Call --edit action on a server by opening an editor on the 389 server.""" 390 #XXX --edit does not yet support display argument: remote_edit may not work 391 # maybe by adding a sleeping time...? 392 action = "edit" 393 jobid = prof['jobid'][0] 394 jobname = prof['nomjob'][0] 395 mode = prof['mode'][0] 396 typ = prof['edit_type'][0] 397 displ = prof['display'][0] 398 399 # read server informations 400 serv = build_server_from_profile(prof, TYPES.EXEC) 401 402 if serv.support_display_forwarding(): 403 # --edit will use os.environ['DISPLAY'] 404 # but this will not work if ssh is run in background... 405 displ = "None:0" 406 407 # launch the service 408 cmd = [osp.join(serv.get_aster_root(), "bin", 409 get_plt_exec_name(prof.get_platform(), "as_run")), ] 410 cmd.append("--" + action) 411 cmd.extend(magic.run.get_rcdir_arg()) 412 targs = (jobid, jobname, mode, typ) 413 cmd.extend(targs) 414 415 iret, output, err = serv.exec_command(cmd, display_forwarding=True) 416 magic.run.DBG("******************** OUTPUT of as_run --%s ********************" % action, output, 417 "******************** END of as_run --%s ********************" % action, 418 all=True, prefix=" ") 419 return iret 420 421 422def local_edit(prof, args, **kwargs): 423 """Call --edit action on a server by using a local editor after 424 copying file if it's remote.""" 425 run = magic.run 426 jobid = prof['jobid'][0] 427 jobname = prof['nomjob'][0] 428 mode = prof['mode'][0] 429 typ = prof['edit_type'][0] 430 to_output = kwargs.get('result_to_output', run['result_to_output']) 431 432 iret = 0 433 # read server informations 434 serv = build_server_from_profile(prof, TYPES.COPY_FROM) 435 436 # flasheur is in the home directory 437 dirname, fname = osp.split(flash_filename("flasheur", jobname, jobid, typ, mode)) 438 run.DBG("file to edit is in %s named %s" % (dirname, fname)) 439 440 # copy from dirname if not on localhost 441 if not serv.is_localhost(): 442 serv.set_proxy_dir(dirname) 443 dst = osp.join(run['tmp_user'], 'flasheur_%s' % serv.host, fname) 444 is_agla_astout = prof['nomjob'][0].startswith('pre_eda') \ 445 or prof['nomjob'][0].startswith('asrest') 446 if not osp.exists(dst) or is_agla_astout: 447 iret = serv.copyfrom(dst) 448 else: 449 iret = 0 450 dst = osp.join(get_home_directory(), dirname, fname) 451 452 if iret == 0 and not to_output: 453 magic.log.info(_("edit file %s"), dst) 454 edit_file(run, dst) 455 return iret 456 457 458def sendmail(prof, args, **kwargs): 459 """Call --sendmail action on a server. 460 Allow to send a mail even if it's not configured on localhost.""" 461 # put the file on the server 462 run = magic.run 463 num_job = kwargs.get('num_job', run['num_job']) 464 to = kwargs.get('report_to', run['report_to']) 465 jobid = '%s-%s' % (num_job, "sendmail") 466 # read server informations 467 sexec = build_server_from_profile(prof, TYPES.EXEC) 468 scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=jobid) 469 470 # copy text file 471 scopy.copyto(args[1]) 472 rfile = scopy.get_remote_filename(args[1]) 473 targs = (None, rfile, ) 474 475 iret, output, err = call_generic_service("sendmail", sexec, prof, targs, 476 { 'report_to' : to }) 477 scopy.delete_proxy_dir() 478 return iret 479 480 481def get_export(prof, args, print_output=True, **kwargs): 482 """Call --get_export action on a server.""" 483 # read server informations 484 serv = build_server_from_profile(prof, TYPES.EXEC) 485 vers = kwargs.get('vers', magic.run['aster_vers']) 486 487 iret, output, err = call_generic_service("get_export", serv, prof, args, 488 { 'vers' : vers }) 489 if print_output: 490 print(output) 491 return iret, output 492 493 494def serv_with_reverse_access(prof, args, print_output=True, **kwargs): 495 """The old way to call a server : 496 - the server is called directly through ssh/rsh 497 - reverse access to the client from the server is required to 498 read the export file and other data files. 499 """ 500 action = "serv" 501 # read server informations 502 serv = build_server_from_profile(prof, TYPES.EXEC) 503 504 filename = get_absolute_path(prof.get_filename()) 505 if not serv.is_localhost(): 506 prof.from_remote_server() 507 prof.WriteExportTo(filename) 508 509 filename = "%s:%s" % (local_full_host, filename) 510 if local_user != '': 511 filename = "%s@%s" % (local_user, filename) 512 513 cmd = [osp.join(serv.get_aster_root(), "bin", 514 get_plt_exec_name(prof.get_platform(), "as_run")), ] 515 cmd.append("--" + action) 516 cmd.append(filename) 517 518 # for compatibility with old asrun server 519 cmd.extend(magic.run.get_as_run_args()) 520 521 iret, output, err = serv.exec_command(cmd, display_forwarding=True) 522 magic.run.DBG("******************** OUTPUT of as_run --%s ********************" % action, output, 523 "******************** END of as_run --%s ********************" % action, 524 all=True, prefix=" ") 525 res = parse_submission_result(output) 526 if print_output: 527 print("JOBID=%s QUEUE=%s STUDYID=%s" % res) 528 return iret 529 530 531def create_issue(prof, args, **kwargs): 532 """Call --create_issue on a server.""" 533 # put the files on the server 534 run = magic.run 535 num_job = kwargs.get('num_job', run['num_job']) 536 jobid = '%s-%s' % (num_job, "create_issue") 537 # read server informations 538 sexec = build_server_from_profile(prof, TYPES.EXEC) 539 scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=jobid) 540 541 # read issue file 542 issue_file = args[0] 543 with open(issue_file, 'r') as f: 544 content = f.read() 545 dinf = parse_issue_file(content) 546 scopy.copyto(issue_file) 547 rfile = scopy.get_remote_filename(issue_file) 548 549 # should we copy data files ? 550 profname = "no_attachment" 551 if dinf.get('FICASS') is not None: 552 fprof = osp.join(run['tmp_user'], "%s.export" % jobid) 553 iret, remote_prof = copy_datafiles_on_server(prof, jobid, fprof) 554 profname = remote_prof.get_filename() 555 if iret != 0: 556 return iret 557 558 targs = (None, rfile, profname) 559 iret, output, err = call_generic_service("create_issue", sexec, prof, targs) 560 scopy.delete_proxy_dir() 561 return iret 562 563 564def insert_in_db(prof, args, **kwargs): 565 """Call --insert_in_db on a server.""" 566 run = magic.run 567 num_job = kwargs.get('num_job', run['num_job']) 568 jobid = '%s-%s' % (num_job, "insert_in_db") 569 # read server informations 570 sexec = build_server_from_profile(prof, TYPES.EXEC) 571 scopy = build_server_from_profile(prof, TYPES.COPY_TO, jobid=jobid) 572 573 # copy data files 574 #XXX should result files be copied too ? 575 #XXX should be run in foreground to keep display 576 fprof = osp.join(run['tmp_user'], "%s.export" % jobid) 577 iret, remote_prof = copy_datafiles_on_server(prof, jobid, fprof) 578 if iret != 0: 579 return iret 580 581 targs = (None, remote_prof.get_filename(), ) 582 iret, output, err = call_generic_service("insert_in_db", sexec, prof, targs) 583 scopy.delete_proxy_dir() 584 return iret 585