3# Copyright (c) 2014 Thomas Heller
5# Distributed under the Boost Software License, Version 1.0. (See accompanying
6# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8# This script is a simple startup script to start 1 or multiple HPX localities.
9# It supports various startup wrappers for distributed runs.
11# Usage:
12#  hpxrun.py hpx-application [Options] [Additional options]
14# Available options are:
15#   -l Number of localities to start
16#   -t Number of threads per locality
17#   -p Parcelport to use
18#   -r Which runwrapper to use
19#   -e Expected return codes of all invoked processes
20#   -v verbose output
21#   -- delimiter for additional arguments
23import sys, os, string, atexit
24import functools, signal
25import traceback
26import time
27import threading
29from optparse import OptionParser
31import subprocess
32from subprocess import Popen
34# Our global list of processes we started
35procs = []
36def subproc(cmd):
37    kwargs = {}
38    if sys.platform == 'win32':
39        # For some reason or another ... this seems to be non existent sometimes
40        try:
41            kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
42        except:
43            pass
44    proc = Popen(cmd, **kwargs)
45    procs.append(proc)
47    handler = functools.partial(cleanup, proc)
48    signal.signal(signal.SIGINT, handler)
49    if sys.platform == 'win32':
50        signal.signal(signal.SIGBREAK, handler)
51    else:
52        signal.signal(signal.SIGTERM, handler)
54# Run with no run wrapper
55# This is just starting "localities" processes on the local node
56def run_none(cmd, localities, verbose):
57    if localities > 1:
58        for locality in range(localities):
59            exec_cmd = cmd + ['--hpx:node=' + str(locality)]
60            if verbose:
61                print('Executing command: ' + ' '.join(exec_cmd))
62            subproc(exec_cmd)
63    else:
64         if verbose:
65             print('Executing command: ' + ' '.join(cmd))
66         subproc(cmd)
68# Run with mpiexec
69# This is executing mpiexec with the "-np" option set to the number of localities
70def run_mpi(cmd, localities, verbose):
71    mpiexec = '@MPIEXEC@'
72    if mpiexec == '':
73        mpiexec = '@MPIEXEC_EXECUTABLE@'
74    if mpiexec == '':
75        msg = 'mpiexec not available on this platform. '
76        msg += 'Please rerun CMake with HPX_PARCELPORT_MPI=True.'
77        print(msg, sys.stderr)
78        sys.exit(1)
79    exec_cmd = [mpiexec, '@MPIEXEC_NUMPROC_FLAG@', str(localities)] + cmd
80    if verbose:
81        print('Executing command: ' + ' '.join(exec_cmd))
82    subproc(exec_cmd)
84# Run with srun
85# This is executing srun with the '-n' option set to the number of localities
86def run_srun(cmd, localities, verbose):
87    exec_cmd = ['srun', '-K', '-n', str(localities)] + cmd
88    if verbose:
89        print('Executing command: ' + ' '.join(exec_cmd))
90    subproc(exec_cmd)
92# Select the appropriate run function based on runwrapper
93def run(cmd, runwrapper, localities, verbose):
94    if runwrapper == 'none':
95        run_none(cmd, localities, verbose)
96    if runwrapper == 'mpi':
97        run_mpi(cmd, localities, verbose)
98    if runwrapper == 'srun':
99        run_srun(cmd, localities, verbose)
101# Building the command line. This function concatenates the different options
102def build_cmd(options, args):
103    cmd = [args[0]]
104    args.pop(0)
106    if options.localities > 1:
107        # Selecting the parcelport for hpx via hpx ini confifuration
108        select_parcelport = (lambda pp:
109            ['--hpx:ini=hpx.parcel.verbs.enable=1'] if pp == 'verbs'
110            else ['--hpx:ini=hpx.parcel.ipc.enable=1'] if pp == 'ipc'
111            else ['--hpx:ini=hpx.parcel.mpi.enable=1', '--hpx:ini=hpx.parcel.bootstrap=mpi'] if pp == 'mpi'
112            else ['--hpx:ini=hpx.parcel.tcp.enable=1'] if pp == 'tcp'
113            else [])
114        cmd += select_parcelport(options.parcelport)
116    # set number of threads
117    if options.threads == -1:
118        cmd += ['--hpx:threads=all']
119    if options.threads == -2:
120        cmd += ['--hpx:threads=cores']
121    if options.threads >= 1:
122        cmd += ['--hpx:threads=' + str(options.threads)]
124    # set number of localities
125    if options.localities > 1:
126        cmd += ['--hpx:localities=' + str(options.localities)]
128    # Append the remaining args
129    for arg in args:
130        cmd += [arg]
132    return cmd
134def check_options(parser, options, args):
135    if 0 == len(args):
136        print('Error: You need to specify at least the application to start\n', sys.stderr)
137        parser.print_help()
138        sys.exit(1)
140    if not os.path.exists(args[0]):
141        print('Executable ' + args[0] + ' does not exist', sys.stderr)
142        sys.exit(1)
144    if options.localities < 1:
145        print('Can not start less than one locality', sys.stderr)
146        sys.exit(1)
148    if options.threads < 1 and options.threads != -1 and options.threads != -2:
149        print('Can not start less than one thread per locality', sys.stderr)
150        sys.exit(1)
152    check_valid_parcelport = (lambda x:
153            x == 'verbs' or x == 'ipc' or x == 'mpi' or x == 'tcp');
154    if not check_valid_parcelport(options.parcelport):
155        print('Error: Parcelport option not valid\n', sys.stderr)
156        parser.print_help()
157        sys.exit(1)
159    check_valid_runwrapper = (lambda x:
160            x == 'none' or x == 'mpi' or x == 'srun');
161    if not check_valid_runwrapper(options.runwrapper):
162        print('Error: Runwrapper option not valid\n', sys.stderr)
163        parser.print_help()
164        sys.exit(1)
166# Send a SIGTERM/SIGBRAK to proc and wait for it to terminate.
167def term(proc):
168    if sys.platform == 'win32':
169        try:
170            proc.send_signal(signal.CTRL_BREAK_EVENT)
171        except:
172            proc.terminate()
173    else:
174        proc.terminate()
175    proc.wait()
177# Stop the sub-process child if signum is SIGTERM. Then terminate.
178def cleanup(child, signum, frame):
179    try:
180        if child and ((sys.platform == 'win32') or signum != signal.SIGINT):
181            # Forward SIGTERM on Linux or any signal on Windows
182            term(child)
183    except:
184        traceback.print_exc()
185    finally:
186        sys.exit()
188if __name__ == '__main__':
190    help_message = 'Usage %proc hpx-application [Options] [-- Additional options]\n'
191    help_message = help_message + '\n'
192    help_message = help_message + 'This script is a simple startup script to start '
193    help_message = help_message + 'one or multiple HPX localities. It supports '
194    help_message = help_message + 'various startup wrappers for distributed runs.'
196    parser = OptionParser(usage = help_message)
198    default_env = (lambda env, default:
199        os.environ[env] if env in os.environ else default)
201    parser.add_option('-l', '--localities'
202      , action='store', type='int'
203      , dest='localities', default=default_env('HPXRUN_LOCALITIES', '1')
204      , help='Number of localities to run (environment variable '
205              'HPXRUN_LOCALITIES')
207    parser.add_option('-t', '--threads'
208      , action='store', type='int'
209      , dest='threads', default=default_env('HPXRUN_THREADS', '1')
210      , help='Number of threads per locality (environment variable '
211             'HPXRUN_THREADS)')
213    parser.add_option('-p', '--parcelport'
214      , action='store', type='string'
215      , dest='parcelport', default=default_env('HPXRUN_PARCELPORT', 'tcp')
216      , help='Which parcelport to use (Options are: verbs, ipc, mpi, tcp) '
217             '(environment variable HPXRUN_PARCELPORT')
219    parser.add_option('-r', '--runwrapper'
220      , action='store', type='string'
221      , dest='runwrapper', default=default_env('HPXRUN_RUNWRAPPER', 'none')
222      , help='Which runwrapper to use (Options are: none, mpi, srun) '
223             '(environment variable HPXRUN_ (environment variable '
224             'HPXRUN_RUNWRAPPER)')
226    parser.add_option('-e', '--expected'
227      , action='store', type='int'
228      , dest='expected', default=default_env('HPXRUN_EXPECTED', '0')
229      , help='Expected return codes of all invoked processes '
230             '(environment variable HPXRUN_EXPECTED)')
232    parser.add_option('-v', '--verbose'
233      , action='store_true'
234      , dest='verbose', default=False
235            if default_env('HPXRUN_VERBOSE', '0') == '0' else True
236      , help='Verbose output (environment variable HPXRUN_VERBOSE)')
238    (options, args) = parser.parse_args()
240    check_options(parser, options, args)
241    if 'HPXRUN_ARGS' in os.environ:
242        args += os.environ['HPXRUN_ARGS'].split()
244    cmd = build_cmd(options, args)
246    if options.verbose:
247        print('Base command is "' + ' '.join(cmd) + '"')
249    run(cmd, options.runwrapper, options.localities, options.verbose)
251    if options.expected == 0:
252        ret_expected = (lambda ret : True if ret == 0 else False)
253    else:
254        ret_expected = (lambda ret : False if ret == 0 else True)
256    if len(procs) == 1:
257        procs[0].wait()
258        ret = procs[0].returncode
259        if not ret_expected(ret):
260            # Output which process failed
261            msg = 'Process 0 failed with an unexpected error '
262            msg += 'code of ' + str(ret) + ' (expected ' + str(options.expected)
263            msg += ')'
264            sys.exit(1)
265        sys.exit(0)
267    procs_lock = threading.Lock()
268    returncode = 0
270    def wait_on_proc(proc, which):
271        global returncode
272        proc.wait()
273        ret = proc.returncode
274        procs_lock.acquire()
275        try:
276            if not ret_expected(ret):
277                returncode = 1
278                # Output which process failed
279                msg = 'Process ' + str(which) + ' failed with an unexpected error '
280                msg += 'code of ' + str(ret) + ' (expected ' + str(options.expected)
281                msg += ')'
282                print(msg)
283                while procs:
284                    nextproc = procs.pop(0)
285                    if nextproc != proc:
286                        term(nextproc)
287        except:
288            pass
289        finally: procs_lock.release()
291    which = 0
292    proc_watchdogs = []
293    procs_lock.acquire()
294    try:
295        for proc in procs:
296            proc_watchdog = threading.Thread(target=wait_on_proc, args=(proc, which))
297            proc_watchdog.start()
298            proc_watchdogs.append(proc_watchdog)
299            which = which + 1
300    except:
301        pass
302    finally: procs_lock.release()
304    for proc_watchdog in proc_watchdogs:
305        proc_watchdog.join()
307    sys.exit(returncode)