#!/usr/local/bin/python3.8 # # Copyright (c) 2014 Thomas Heller # # Distributed under the Boost Software License, Version 1.0. (See accompanying # file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) # This script is a simple startup script to start 1 or multiple HPX localities. # It supports various startup wrappers for distributed runs. # # Usage: # hpxrun.py hpx-application [Options] [Additional options] # # Available options are: # -l Number of localities to start # -t Number of threads per locality # -p Parcelport to use # -r Which runwrapper to use # -e Expected return codes of all invoked processes # -v verbose output # -- delimiter for additional arguments import sys, os, string, atexit import functools, signal import traceback import time import threading from optparse import OptionParser import subprocess from subprocess import Popen # Our global list of processes we started procs = [] def subproc(cmd): kwargs = {} if sys.platform == 'win32': # For some reason or another ... this seems to be non existent sometimes try: kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP except: pass proc = Popen(cmd, **kwargs) procs.append(proc) handler = functools.partial(cleanup, proc) signal.signal(signal.SIGINT, handler) if sys.platform == 'win32': signal.signal(signal.SIGBREAK, handler) else: signal.signal(signal.SIGTERM, handler) # Run with no run wrapper # This is just starting "localities" processes on the local node def run_none(cmd, localities, verbose): if localities > 1: for locality in range(localities): exec_cmd = cmd + ['--hpx:node=' + str(locality)] if verbose: print('Executing command: ' + ' '.join(exec_cmd)) subproc(exec_cmd) else: if verbose: print('Executing command: ' + ' '.join(cmd)) subproc(cmd) # Run with mpiexec # This is executing mpiexec with the "-np" option set to the number of localities def run_mpi(cmd, localities, verbose): mpiexec = '@MPIEXEC@' if mpiexec == '': mpiexec = '@MPIEXEC_EXECUTABLE@' if mpiexec == '': msg = 'mpiexec not available on this platform. ' msg += 'Please rerun CMake with HPX_PARCELPORT_MPI=True.' print(msg, sys.stderr) sys.exit(1) exec_cmd = [mpiexec, '@MPIEXEC_NUMPROC_FLAG@', str(localities)] + cmd if verbose: print('Executing command: ' + ' '.join(exec_cmd)) subproc(exec_cmd) # Run with srun # This is executing srun with the '-n' option set to the number of localities def run_srun(cmd, localities, verbose): exec_cmd = ['srun', '-K', '-n', str(localities)] + cmd if verbose: print('Executing command: ' + ' '.join(exec_cmd)) subproc(exec_cmd) # Select the appropriate run function based on runwrapper def run(cmd, runwrapper, localities, verbose): if runwrapper == 'none': run_none(cmd, localities, verbose) if runwrapper == 'mpi': run_mpi(cmd, localities, verbose) if runwrapper == 'srun': run_srun(cmd, localities, verbose) # Building the command line. This function concatenates the different options def build_cmd(options, args): cmd = [args[0]] args.pop(0) if options.localities > 1: # Selecting the parcelport for hpx via hpx ini confifuration select_parcelport = (lambda pp: ['--hpx:ini=hpx.parcel.verbs.enable=1'] if pp == 'verbs' else ['--hpx:ini=hpx.parcel.ipc.enable=1'] if pp == 'ipc' else ['--hpx:ini=hpx.parcel.mpi.enable=1', '--hpx:ini=hpx.parcel.bootstrap=mpi'] if pp == 'mpi' else ['--hpx:ini=hpx.parcel.tcp.enable=1'] if pp == 'tcp' else []) cmd += select_parcelport(options.parcelport) # set number of threads if options.threads == -1: cmd += ['--hpx:threads=all'] if options.threads == -2: cmd += ['--hpx:threads=cores'] if options.threads >= 1: cmd += ['--hpx:threads=' + str(options.threads)] # set number of localities if options.localities > 1: cmd += ['--hpx:localities=' + str(options.localities)] # Append the remaining args for arg in args: cmd += [arg] return cmd def check_options(parser, options, args): if 0 == len(args): print('Error: You need to specify at least the application to start\n', sys.stderr) parser.print_help() sys.exit(1) if not os.path.exists(args[0]): print('Executable ' + args[0] + ' does not exist', sys.stderr) sys.exit(1) if options.localities < 1: print('Can not start less than one locality', sys.stderr) sys.exit(1) if options.threads < 1 and options.threads != -1 and options.threads != -2: print('Can not start less than one thread per locality', sys.stderr) sys.exit(1) check_valid_parcelport = (lambda x: x == 'verbs' or x == 'ipc' or x == 'mpi' or x == 'tcp'); if not check_valid_parcelport(options.parcelport): print('Error: Parcelport option not valid\n', sys.stderr) parser.print_help() sys.exit(1) check_valid_runwrapper = (lambda x: x == 'none' or x == 'mpi' or x == 'srun'); if not check_valid_runwrapper(options.runwrapper): print('Error: Runwrapper option not valid\n', sys.stderr) parser.print_help() sys.exit(1) # Send a SIGTERM/SIGBRAK to proc and wait for it to terminate. def term(proc): if sys.platform == 'win32': try: proc.send_signal(signal.CTRL_BREAK_EVENT) except: proc.terminate() else: proc.terminate() proc.wait() # Stop the sub-process child if signum is SIGTERM. Then terminate. def cleanup(child, signum, frame): try: if child and ((sys.platform == 'win32') or signum != signal.SIGINT): # Forward SIGTERM on Linux or any signal on Windows term(child) except: traceback.print_exc() finally: sys.exit() if __name__ == '__main__': help_message = 'Usage %proc hpx-application [Options] [-- Additional options]\n' help_message = help_message + '\n' help_message = help_message + 'This script is a simple startup script to start ' help_message = help_message + 'one or multiple HPX localities. It supports ' help_message = help_message + 'various startup wrappers for distributed runs.' parser = OptionParser(usage = help_message) default_env = (lambda env, default: os.environ[env] if env in os.environ else default) parser.add_option('-l', '--localities' , action='store', type='int' , dest='localities', default=default_env('HPXRUN_LOCALITIES', '1') , help='Number of localities to run (environment variable ' 'HPXRUN_LOCALITIES') parser.add_option('-t', '--threads' , action='store', type='int' , dest='threads', default=default_env('HPXRUN_THREADS', '1') , help='Number of threads per locality (environment variable ' 'HPXRUN_THREADS)') parser.add_option('-p', '--parcelport' , action='store', type='string' , dest='parcelport', default=default_env('HPXRUN_PARCELPORT', 'tcp') , help='Which parcelport to use (Options are: verbs, ipc, mpi, tcp) ' '(environment variable HPXRUN_PARCELPORT') parser.add_option('-r', '--runwrapper' , action='store', type='string' , dest='runwrapper', default=default_env('HPXRUN_RUNWRAPPER', 'none') , help='Which runwrapper to use (Options are: none, mpi, srun) ' '(environment variable HPXRUN_ (environment variable ' 'HPXRUN_RUNWRAPPER)') parser.add_option('-e', '--expected' , action='store', type='int' , dest='expected', default=default_env('HPXRUN_EXPECTED', '0') , help='Expected return codes of all invoked processes ' '(environment variable HPXRUN_EXPECTED)') parser.add_option('-v', '--verbose' , action='store_true' , dest='verbose', default=False if default_env('HPXRUN_VERBOSE', '0') == '0' else True , help='Verbose output (environment variable HPXRUN_VERBOSE)') (options, args) = parser.parse_args() check_options(parser, options, args) if 'HPXRUN_ARGS' in os.environ: args += os.environ['HPXRUN_ARGS'].split() cmd = build_cmd(options, args) if options.verbose: print('Base command is "' + ' '.join(cmd) + '"') run(cmd, options.runwrapper, options.localities, options.verbose) if options.expected == 0: ret_expected = (lambda ret : True if ret == 0 else False) else: ret_expected = (lambda ret : False if ret == 0 else True) if len(procs) == 1: procs[0].wait() ret = procs[0].returncode if not ret_expected(ret): # Output which process failed msg = 'Process 0 failed with an unexpected error ' msg += 'code of ' + str(ret) + ' (expected ' + str(options.expected) msg += ')' sys.exit(1) sys.exit(0) procs_lock = threading.Lock() returncode = 0 def wait_on_proc(proc, which): global returncode proc.wait() ret = proc.returncode procs_lock.acquire() try: if not ret_expected(ret): returncode = 1 # Output which process failed msg = 'Process ' + str(which) + ' failed with an unexpected error ' msg += 'code of ' + str(ret) + ' (expected ' + str(options.expected) msg += ')' print(msg) while procs: nextproc = procs.pop(0) if nextproc != proc: term(nextproc) except: pass finally: procs_lock.release() which = 0 proc_watchdogs = [] procs_lock.acquire() try: for proc in procs: proc_watchdog = threading.Thread(target=wait_on_proc, args=(proc, which)) proc_watchdog.start() proc_watchdogs.append(proc_watchdog) which = which + 1 except: pass finally: procs_lock.release() for proc_watchdog in proc_watchdogs: proc_watchdog.join() sys.exit(returncode)