1#!/usr/local/bin/python3.8
2#
3# Copyright (c) 2014 Thomas Heller
4#
5# Distributed under the Boost Software License, Version 1.0. (See accompanying
6# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7
8# This script is a simple startup script to start 1 or multiple HPX localities.
9# It supports various startup wrappers for distributed runs.
10#
11# Usage:
12#  hpxrun.py hpx-application [Options] [Additional options]
13#
14# Available options are:
15#   -l Number of localities to start
16#   -t Number of threads per locality
17#   -p Parcelport to use
18#   -r Which runwrapper to use
19#   -e Expected return codes of all invoked processes
20#   -v verbose output
21#   -- delimiter for additional arguments
22
23import sys, os, string, atexit
24import functools, signal
25import traceback
26import time
27import threading
28
29from optparse import OptionParser
30
31import subprocess
32from subprocess import Popen
33
34# Our global list of processes we started
35procs = []
36def subproc(cmd):
37    kwargs = {}
38    if sys.platform == 'win32':
39        # For some reason or another ... this seems to be non existent sometimes
40        try:
41            kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
42        except:
43            pass
44    proc = Popen(cmd, **kwargs)
45    procs.append(proc)
46
47    handler = functools.partial(cleanup, proc)
48    signal.signal(signal.SIGINT, handler)
49    if sys.platform == 'win32':
50        signal.signal(signal.SIGBREAK, handler)
51    else:
52        signal.signal(signal.SIGTERM, handler)
53
54# Run with no run wrapper
55# This is just starting "localities" processes on the local node
56def run_none(cmd, localities, verbose):
57    if localities > 1:
58        for locality in range(localities):
59            exec_cmd = cmd + ['--hpx:node=' + str(locality)]
60            if verbose:
61                print('Executing command: ' + ' '.join(exec_cmd))
62            subproc(exec_cmd)
63    else:
64         if verbose:
65             print('Executing command: ' + ' '.join(cmd))
66         subproc(cmd)
67
68# Run with mpiexec
69# This is executing mpiexec with the "-np" option set to the number of localities
70def run_mpi(cmd, localities, verbose):
71    mpiexec = '@MPIEXEC@'
72    if mpiexec == '':
73        mpiexec = '@MPIEXEC_EXECUTABLE@'
74    if mpiexec == '':
75        msg = 'mpiexec not available on this platform. '
76        msg += 'Please rerun CMake with HPX_PARCELPORT_MPI=True.'
77        print(msg, sys.stderr)
78        sys.exit(1)
79    exec_cmd = [mpiexec, '@MPIEXEC_NUMPROC_FLAG@', str(localities)] + cmd
80    if verbose:
81        print('Executing command: ' + ' '.join(exec_cmd))
82    subproc(exec_cmd)
83
84# Run with srun
85# This is executing srun with the '-n' option set to the number of localities
86def run_srun(cmd, localities, verbose):
87    exec_cmd = ['srun', '-K', '-n', str(localities)] + cmd
88    if verbose:
89        print('Executing command: ' + ' '.join(exec_cmd))
90    subproc(exec_cmd)
91
92# Select the appropriate run function based on runwrapper
93def run(cmd, runwrapper, localities, verbose):
94    if runwrapper == 'none':
95        run_none(cmd, localities, verbose)
96    if runwrapper == 'mpi':
97        run_mpi(cmd, localities, verbose)
98    if runwrapper == 'srun':
99        run_srun(cmd, localities, verbose)
100
101# Building the command line. This function concatenates the different options
102def build_cmd(options, args):
103    cmd = [args[0]]
104    args.pop(0)
105
106    if options.localities > 1:
107        # Selecting the parcelport for hpx via hpx ini confifuration
108        select_parcelport = (lambda pp:
109            ['--hpx:ini=hpx.parcel.verbs.enable=1'] if pp == 'verbs'
110            else ['--hpx:ini=hpx.parcel.ipc.enable=1'] if pp == 'ipc'
111            else ['--hpx:ini=hpx.parcel.mpi.enable=1', '--hpx:ini=hpx.parcel.bootstrap=mpi'] if pp == 'mpi'
112            else ['--hpx:ini=hpx.parcel.tcp.enable=1'] if pp == 'tcp'
113            else [])
114        cmd += select_parcelport(options.parcelport)
115
116    # set number of threads
117    if options.threads == -1:
118        cmd += ['--hpx:threads=all']
119    if options.threads == -2:
120        cmd += ['--hpx:threads=cores']
121    if options.threads >= 1:
122        cmd += ['--hpx:threads=' + str(options.threads)]
123
124    # set number of localities
125    if options.localities > 1:
126        cmd += ['--hpx:localities=' + str(options.localities)]
127
128    # Append the remaining args
129    for arg in args:
130        cmd += [arg]
131
132    return cmd
133
134def check_options(parser, options, args):
135    if 0 == len(args):
136        print('Error: You need to specify at least the application to start\n', sys.stderr)
137        parser.print_help()
138        sys.exit(1)
139
140    if not os.path.exists(args[0]):
141        print('Executable ' + args[0] + ' does not exist', sys.stderr)
142        sys.exit(1)
143
144    if options.localities < 1:
145        print('Can not start less than one locality', sys.stderr)
146        sys.exit(1)
147
148    if options.threads < 1 and options.threads != -1 and options.threads != -2:
149        print('Can not start less than one thread per locality', sys.stderr)
150        sys.exit(1)
151
152    check_valid_parcelport = (lambda x:
153            x == 'verbs' or x == 'ipc' or x == 'mpi' or x == 'tcp');
154    if not check_valid_parcelport(options.parcelport):
155        print('Error: Parcelport option not valid\n', sys.stderr)
156        parser.print_help()
157        sys.exit(1)
158
159    check_valid_runwrapper = (lambda x:
160            x == 'none' or x == 'mpi' or x == 'srun');
161    if not check_valid_runwrapper(options.runwrapper):
162        print('Error: Runwrapper option not valid\n', sys.stderr)
163        parser.print_help()
164        sys.exit(1)
165
166# Send a SIGTERM/SIGBRAK to proc and wait for it to terminate.
167def term(proc):
168    if sys.platform == 'win32':
169        try:
170            proc.send_signal(signal.CTRL_BREAK_EVENT)
171        except:
172            proc.terminate()
173    else:
174        proc.terminate()
175    proc.wait()
176
177# Stop the sub-process child if signum is SIGTERM. Then terminate.
178def cleanup(child, signum, frame):
179    try:
180        if child and ((sys.platform == 'win32') or signum != signal.SIGINT):
181            # Forward SIGTERM on Linux or any signal on Windows
182            term(child)
183    except:
184        traceback.print_exc()
185    finally:
186        sys.exit()
187
188if __name__ == '__main__':
189
190    help_message = 'Usage %proc hpx-application [Options] [-- Additional options]\n'
191    help_message = help_message + '\n'
192    help_message = help_message + 'This script is a simple startup script to start '
193    help_message = help_message + 'one or multiple HPX localities. It supports '
194    help_message = help_message + 'various startup wrappers for distributed runs.'
195
196    parser = OptionParser(usage = help_message)
197
198    default_env = (lambda env, default:
199        os.environ[env] if env in os.environ else default)
200
201    parser.add_option('-l', '--localities'
202      , action='store', type='int'
203      , dest='localities', default=default_env('HPXRUN_LOCALITIES', '1')
204      , help='Number of localities to run (environment variable '
205              'HPXRUN_LOCALITIES')
206
207    parser.add_option('-t', '--threads'
208      , action='store', type='int'
209      , dest='threads', default=default_env('HPXRUN_THREADS', '1')
210      , help='Number of threads per locality (environment variable '
211             'HPXRUN_THREADS)')
212
213    parser.add_option('-p', '--parcelport'
214      , action='store', type='string'
215      , dest='parcelport', default=default_env('HPXRUN_PARCELPORT', 'tcp')
216      , help='Which parcelport to use (Options are: verbs, ipc, mpi, tcp) '
217             '(environment variable HPXRUN_PARCELPORT')
218
219    parser.add_option('-r', '--runwrapper'
220      , action='store', type='string'
221      , dest='runwrapper', default=default_env('HPXRUN_RUNWRAPPER', 'none')
222      , help='Which runwrapper to use (Options are: none, mpi, srun) '
223             '(environment variable HPXRUN_ (environment variable '
224             'HPXRUN_RUNWRAPPER)')
225
226    parser.add_option('-e', '--expected'
227      , action='store', type='int'
228      , dest='expected', default=default_env('HPXRUN_EXPECTED', '0')
229      , help='Expected return codes of all invoked processes '
230             '(environment variable HPXRUN_EXPECTED)')
231
232    parser.add_option('-v', '--verbose'
233      , action='store_true'
234      , dest='verbose', default=False
235            if default_env('HPXRUN_VERBOSE', '0') == '0' else True
236      , help='Verbose output (environment variable HPXRUN_VERBOSE)')
237
238    (options, args) = parser.parse_args()
239
240    check_options(parser, options, args)
241    if 'HPXRUN_ARGS' in os.environ:
242        args += os.environ['HPXRUN_ARGS'].split()
243
244    cmd = build_cmd(options, args)
245
246    if options.verbose:
247        print('Base command is "' + ' '.join(cmd) + '"')
248
249    run(cmd, options.runwrapper, options.localities, options.verbose)
250
251    if options.expected == 0:
252        ret_expected = (lambda ret : True if ret == 0 else False)
253    else:
254        ret_expected = (lambda ret : False if ret == 0 else True)
255
256    if len(procs) == 1:
257        procs[0].wait()
258        ret = procs[0].returncode
259        if not ret_expected(ret):
260            # Output which process failed
261            msg = 'Process 0 failed with an unexpected error '
262            msg += 'code of ' + str(ret) + ' (expected ' + str(options.expected)
263            msg += ')'
264            sys.exit(1)
265        sys.exit(0)
266
267    procs_lock = threading.Lock()
268    returncode = 0
269
270    def wait_on_proc(proc, which):
271        global returncode
272        proc.wait()
273        ret = proc.returncode
274        procs_lock.acquire()
275        try:
276            if not ret_expected(ret):
277                returncode = 1
278                # Output which process failed
279                msg = 'Process ' + str(which) + ' failed with an unexpected error '
280                msg += 'code of ' + str(ret) + ' (expected ' + str(options.expected)
281                msg += ')'
282                print(msg)
283                while procs:
284                    nextproc = procs.pop(0)
285                    if nextproc != proc:
286                        term(nextproc)
287        except:
288            pass
289        finally: procs_lock.release()
290
291    which = 0
292    proc_watchdogs = []
293    procs_lock.acquire()
294    try:
295        for proc in procs:
296            proc_watchdog = threading.Thread(target=wait_on_proc, args=(proc, which))
297            proc_watchdog.start()
298            proc_watchdogs.append(proc_watchdog)
299            which = which + 1
300    except:
301        pass
302    finally: procs_lock.release()
303
304    for proc_watchdog in proc_watchdogs:
305        proc_watchdog.join()
306
307    sys.exit(returncode)
308
309