1#!/usr/bin/env python3
2"""
3DMLC submission script by mesos
4
5One need to make sure all slaves machines are ssh-able.
6"""
7from __future__ import absolute_import
8
9import os
10import sys
11import json
12import uuid
13import logging
14from threading import Thread
15from . import tracker
16try:
17    import pymesos.subprocess
18    logging.getLogger('pymesos').setLevel(logging.WARNING)
19
20    def _run(prog, env, resources):
21        cwd = os.getcwd()
22        pymesos.subprocess.check_call(
23            prog, shell=True, env=env, cwd=cwd,
24            cpus=resources['cpus'], mem=resources['mem']
25        )
26
27    _USE_PYMESOS = True
28
29except ImportError:
30    import subprocess
31    DEVNULL = open(os.devnull, 'w')
32
33    def _run(prog, env, resources):
34        master = os.environ['MESOS_MASTER']
35        if ':' not in master:
36            master += ':5050'
37
38        name = str(uuid.uuid4())
39        cwd = os.getcwd()
40        prog = "cd %s && %s" % (cwd, prog)
41
42        resources = ';'.join('%s:%s' % (k, v) for k, v in resources.items())
43        prog = prog.replace('\'', '\\\'')
44        env = json.dumps(env).replace('\'', '\\\'')
45        resources = resources.replace('\'', '\\\'')
46        cmd = (
47            'mesos-execute --master=%s --name=\'%s\''
48            ' --command=\'%s\' --env=\'%s\' --resources=\'%s\'' %
49            (master, name, prog, env, resources)
50        )
51
52        subprocess.check_call(
53            cmd,
54            shell=True,
55            stdout=DEVNULL,
56            stderr=subprocess.STDOUT)
57
58    _USE_PYMESOS = False
59
60def get_env():
61    # get system envs
62    keys = set(['OMP_NUM_THREADS', 'KMP_AFFINITY', 'LD_LIBRARY_PATH'])
63    return {k: v for k, v in os.environ.items() if k in keys}
64
65
66def submit(args):
67    def mesos_submit(nworker, nserver, pass_envs):
68        """
69        customized submit script
70        """
71        # launch jobs
72        for i in range(nworker + nserver):
73            resources = {}
74            pass_envs['DMLC_ROLE'] = 'server' if i < nserver else 'worker'
75            if i < nserver:
76                pass_envs['DMLC_SERVER_ID'] = i
77                resources['cpus'] = args.server_cores
78                resources['mem'] = args.server_memory_mb
79            else:
80                pass_envs['DMLC_WORKER_ID'] = i - nserver
81                resources['cpus'] = args.worker_cores
82                resources['mem'] = args.worker_memory_mb
83
84            env = {str(k): str(v) for k, v in pass_envs.items()}
85            env.update(get_env())
86            prog = ' '.join(args.command)
87            thread = Thread(target=_run, args=(prog, env, resources))
88            thread.setDaemon(True)
89            thread.start()
90
91        return mesos_submit
92
93    if not _USE_PYMESOS:
94        logging.warning('No PyMesos found, use mesos-execute instead,'
95                        ' no task output available')
96
97    if args.mesos_master:
98        os.environ['MESOS_MASTER'] = args.mesos_master
99
100    assert 'MESOS_MASTER' in os.environ, 'No mesos master configured!'
101
102    tracker.submit(args.num_workers, args.num_servers,
103                   fun_submit=mesos_submit,
104                   pscmd=(' '.join(args.command)))
105