1"""Submit jobs to Sun Grid Engine.""" 2# pylint: disable=invalid-name 3from __future__ import absolute_import 4 5import os 6import subprocess 7from . import tracker 8 9def submit(args): 10 """Job submission script for SGE.""" 11 if args.jobname is None: 12 args.jobname = ('dmlc%d.' % args.num_workers) + args.command[0].split('/')[-1] 13 if args.sge_log_dir is None: 14 args.sge_log_dir = args.jobname + '.log' 15 16 if os.path.exists(args.sge_log_dir): 17 if not os.path.isdir(args.sge_log_dir): 18 raise RuntimeError('specified --sge-log-dir %s is not a dir' % args.sge_log_dir) 19 else: 20 os.mkdir(args.sge_log_dir) 21 22 runscript = '%s/rundmlc.sh' % args.logdir 23 fo = open(runscript, 'w') 24 fo.write('source ~/.bashrc\n') 25 fo.write('export DMLC_TASK_ID=${SGE_TASK_ID}\n') 26 fo.write('export DMLC_JOB_CLUSTER=sge\n') 27 fo.write('\"$@\"\n') 28 fo.close() 29 30 def sge_submit(nworker, nserver, pass_envs): 31 """Internal submission function.""" 32 env_arg = ','.join(['%s=\"%s\"' % (k, str(v)) for k, v in pass_envs.items()]) 33 cmd = 'qsub -cwd -t 1-%d -S /bin/bash' % (nworker + nserver) 34 if args.queue != 'default': 35 cmd += '-q %s' % args.queue 36 cmd += ' -N %s ' % args.jobname 37 cmd += ' -e %s -o %s' % (args.logdir, args.logdir) 38 cmd += ' -pe orte %d' % (args.vcores) 39 cmd += ' -v %s,PATH=${PATH}:.' % env_arg 40 cmd += ' %s %s' % (runscript, ' '.join(args.command)) 41 print(cmd) 42 subprocess.check_call(cmd, shell=True) 43 print('Waiting for the jobs to get up...') 44 45 # call submit, with nslave, the commands to run each job and submit function 46 tracker.submit(args.num_workers, args.num_servers, 47 fun_submit=sge_submit, 48 pscmd=' '.join(args.command)) 49