1#!/usr/bin/env python3 2""" 3DMLC submission script by mesos 4 5One need to make sure all slaves machines are ssh-able. 6""" 7from __future__ import absolute_import 8 9import os 10import sys 11import json 12import uuid 13import logging 14from threading import Thread 15from . import tracker 16try: 17 import pymesos.subprocess 18 logging.getLogger('pymesos').setLevel(logging.WARNING) 19 20 def _run(prog, env, resources): 21 cwd = os.getcwd() 22 pymesos.subprocess.check_call( 23 prog, shell=True, env=env, cwd=cwd, 24 cpus=resources['cpus'], mem=resources['mem'] 25 ) 26 27 _USE_PYMESOS = True 28 29except ImportError: 30 import subprocess 31 DEVNULL = open(os.devnull, 'w') 32 33 def _run(prog, env, resources): 34 master = os.environ['MESOS_MASTER'] 35 if ':' not in master: 36 master += ':5050' 37 38 name = str(uuid.uuid4()) 39 cwd = os.getcwd() 40 prog = "cd %s && %s" % (cwd, prog) 41 42 resources = ';'.join('%s:%s' % (k, v) for k, v in resources.items()) 43 prog = prog.replace('\'', '\\\'') 44 env = json.dumps(env).replace('\'', '\\\'') 45 resources = resources.replace('\'', '\\\'') 46 cmd = ( 47 'mesos-execute --master=%s --name=\'%s\'' 48 ' --command=\'%s\' --env=\'%s\' --resources=\'%s\'' % 49 (master, name, prog, env, resources) 50 ) 51 52 subprocess.check_call( 53 cmd, 54 shell=True, 55 stdout=DEVNULL, 56 stderr=subprocess.STDOUT) 57 58 _USE_PYMESOS = False 59 60def get_env(): 61 # get system envs 62 keys = set(['OMP_NUM_THREADS', 'KMP_AFFINITY', 'LD_LIBRARY_PATH']) 63 return {k: v for k, v in os.environ.items() if k in keys} 64 65 66def submit(args): 67 def mesos_submit(nworker, nserver, pass_envs): 68 """ 69 customized submit script 70 """ 71 # launch jobs 72 for i in range(nworker + nserver): 73 resources = {} 74 pass_envs['DMLC_ROLE'] = 'server' if i < nserver else 'worker' 75 if i < nserver: 76 pass_envs['DMLC_SERVER_ID'] = i 77 resources['cpus'] = args.server_cores 78 resources['mem'] = args.server_memory_mb 79 else: 80 pass_envs['DMLC_WORKER_ID'] = i - nserver 81 resources['cpus'] = args.worker_cores 82 resources['mem'] = args.worker_memory_mb 83 84 env = {str(k): str(v) for k, v in pass_envs.items()} 85 env.update(get_env()) 86 prog = ' '.join(args.command) 87 thread = Thread(target=_run, args=(prog, env, resources)) 88 thread.setDaemon(True) 89 thread.start() 90 91 return mesos_submit 92 93 if not _USE_PYMESOS: 94 logging.warning('No PyMesos found, use mesos-execute instead,' 95 ' no task output available') 96 97 if args.mesos_master: 98 os.environ['MESOS_MASTER'] = args.mesos_master 99 100 assert 'MESOS_MASTER' in os.environ, 'No mesos master configured!' 101 102 tracker.submit(args.num_workers, args.num_servers, 103 fun_submit=mesos_submit, 104 pscmd=(' '.join(args.command))) 105