1################################################################## 2## (c) Copyright 2015- by Jaron T. Krogel ## 3################################################################## 4 5 6#====================================================================# 7# machines.py # 8# Representations of local machine environments including # 9# workstations and supercomputers and the jobs that will be # 10# executed on them. # 11# # 12# Content summary: # 13# Job # 14# Class to represent a generic simulation job. # 15# # 16# Machine # 17# Represents a generic machine. # 18# Base class for specific machine types. # 19# # 20# Workstation # 21# Represents a workstation with a fixed number of cores. # 22# # 23# InteractiveCluster # 24# Represents a supercomputer in interactive mode. # 25# Similar to a workstation with many cores. # 26# # 27# Supercomputer # 28# Represents a generic supercomputer with a batch queue. # 29# Base class for specific supercomputers. # 30# See Jaguar, Kraken, Golub, OIC5, Hopper, Edison, BlueWatersXE,# 31# BlueWatersXK, Titan, EOS, Vesta, Cetus, Mira, Lonestar, # 32# Matisse, Komodo, and Amos # 33# # 34# cpu_count # 35# Function to return the number of cores on the local machine. # 36# # 37# Options # 38# Class representing command line options for a simulation job, # 39# including arguments to the simulation executable, # 40# the run launcher (aprun/mpirun, etc), and the job submitter # 41# (e.g. qsub). # 42# # 43#====================================================================# 44 45 46import os 47import time 48#from multiprocessing import cpu_count 49from socket import gethostname 50from subprocess import Popen,PIPE 51from numpy import array,mod,floor,ceil,round,log,empty 52from generic import obj 53from developer import DevBase,to_str 54from nexus_base import NexusCore,nexus_core 55from execute import execute 56from debug import * 57from imp import load_source 58 59 60import re,subprocess 61def cpu_count(): 62 """ Number of virtual or physical CPUs on this system, i.e. 63 user/real as output by time(1) when called with an optimally scaling 64 userspace-only program""" 65 66 # Python 2.6+ 67 try: 68 import multiprocessing 69 return multiprocessing.cpu_count() 70 except (ImportError,NotImplementedError): 71 None 72 #end try 73 74 # POSIX 75 try: 76 res = int(os.sysconf('SC_NPROCESSORS_ONLN')) 77 78 if res > 0: 79 return res 80 #end if 81 except (AttributeError,ValueError): 82 None 83 #end try 84#end def cpu_count 85 86 87 88class Options(DevBase): 89 def __init__(self,**kwargs): 90 self.add(**kwargs) 91 #end def __init__ 92 93 94 def add(self,**kwargs): 95 self.transfer_from(kwargs) 96 #end def add 97 98 99 def read(self,options): 100 nopts = 0 101 intext = False 102 nstart = -2 103 nend = -2 104 n = 0 105 for c in options: 106 if c=='-' and nstart!=n-1 and not intext: 107 prevdash=True 108 if nopts>0: 109 opt = options[nstart:n].strip() 110 name = opt.replace('-','').replace('=',' ').split()[0] 111 self[name]=opt 112 #end if 113 nopts+=1 114 nstart = n 115 elif c=='"' or c=="'": 116 intext=not intext 117 #end if 118 n+=1 119 #end for 120 if nopts>0: 121 opt = options[nstart:n].strip() 122 name = opt.replace('-','').replace('=',' ').split()[0] 123 self[name]=opt 124 #end if 125 #end def read 126 127 128 def write(self): 129 s = '' 130 for k in sorted(self.keys()): 131 s += ' '+str(self[k]) 132 #end for 133 return s 134 #end def write 135#end class Options 136 137 138 139job_defaults_assign = obj( 140 name = 'jobname', 141 type = None, 142 directory = None, 143 subdir = None, 144 app_name = None, # name of/path to application 145 app_command = None, # command used to launch application 146 app_props = None, 147 full_command = None, # custom command including e.g. mpirun 148 outfile = None, 149 errfile = None, 150 env = None, 151 user_env = True, # import user environment 152 presub = '', # shell text executed just prior to submission 153 postsub = '', # shell text executed just after submission 154 queue = None, 155 bundled_jobs = None, 156 relative = False, 157 cores = None, # number of cores for the job 158 nodes = None, # number of nodes for the job 159 threads = 1, # number of openmp threads for the job 160 hyperthreads = None, 161 ppn = None, 162 gpus = None, # number of gpus per node 163 serial = False, # run job serially, no mpi 164 local = False, # run job locally, no queue submission 165 days = 0, 166 hours = 0, 167 minutes = 0, 168 seconds = 0, 169 subfile = None, 170 grains = None, 171 procs = None, 172 processes = None, 173 processes_per_proc = None, 174 processes_per_node = None, 175 account = None, 176 email = None, 177 constraint = None, # slurm specific, Cori 178 core_spec = None, # slurm specific, Cori 179 switches = None, # slurm specific, SuperMUC-NG 180 alloc_flags = None, # lsf specific, Summit 181 qos = None, 182 group_list = None, 183 default_cpus_per_task = False, # optionally bypass typical nexus processing for supermucng 184 ntasks_per_core = None, 185 cpus_per_task = None, 186 ) 187 188 # these are not assigned directly 189job_defaults_nonassign = obj( 190 fake = False, 191 app = None, # name of/path to application 192 machine = None, 193 options = None, 194 app_flags = None, 195 app_options = None, 196 run_options = None, 197 sub_options = None, 198 skip_machine = False, 199 ) 200 201job_defaults = obj(job_defaults_assign,job_defaults_nonassign) 202 203class Job(NexusCore): 204 205 machine = None #default machine if none is specified in settings 206 207 states = obj( 208 none = 0, 209 waiting = 1, 210 submitted = 2, 211 running = 3, 212 finished = 4 213 ) 214 state_names = states.inverse() 215 216 job_count = 0 217 218 219 @staticmethod 220 def restore_default_settings(): 221 Job.machine = None 222 #end def restore_default_settings 223 224 225 @staticmethod 226 def generate_jobid(): 227 Job.job_count += 1 228 return Job.job_count 229 #end def generate_jobid 230 231 232 @classmethod 233 def zero_time(cls): 234 time = obj(days=0,hours=0,minutes=0,seconds=0) 235 return time 236 #end def zero_time 237 238 239 def __init__(self,**kwargs): 240 # rewrap keyword arguments 241 kw = obj(**kwargs) 242 243 # save information used to initialize job object 244 self.init_info = kw.copy() 245 246 # set defaults 247 kw.set_optional(**job_defaults) 248 249 # extract keywords not assigned 250 app = kw.delete('app') 251 machine = kw.delete('machine') 252 options = kw.delete('options') 253 app_flags = kw.delete('app_flags') 254 app_options = kw.delete('app_options') 255 run_options = kw.delete('run_options') 256 sub_options = kw.delete('sub_options') 257 env = kw.delete('env') 258 fake = kw.delete('fake') 259 skip_machine = kw.delete('skip_machine') 260 261 # assign keywords 262 self.set(**kw) 263 264 # assign fake job 265 self.fake_job = fake 266 267 # initialize other internal variables 268 self.app_options = Options() 269 self.run_options = Options() 270 self.sub_options = Options() 271 self.env = None 272 self.internal_id = None 273 self.system_id = None 274 self.tot_cores = None 275 self.identifier = None 276 self.submitted = False 277 self.status = self.states.none 278 self.crashed = False 279 self.overtime = False 280 self.successful = False 281 self.finished = False 282 283 self.user_app_command = self.app_command is not None 284 285 if app is not None: 286 self.app_name = app 287 #end if 288 if app_options is not None: 289 self.app_options.read(app_options) 290 #end if 291 if run_options is not None: 292 self.run_options.read(run_options) 293 #end if 294 if sub_options is not None: 295 self.sub_options.read(sub_options) 296 #end if 297 if app_flags is not None: 298 self.app_options.read(app_flags) 299 #end if 300 if options is not None: 301 self.run_options.read(options) 302 #end if 303 304 if self.app_props is None: 305 self.app_props = [] 306 #end if 307 308 if self.serial: 309 self.cores = 1 310 self.nodes = None 311 #end if 312 313 if skip_machine: 314 self.machine = machine 315 else: 316 if machine is not None: 317 self.machine = machine 318 #end if 319 if env is not None: 320 self.set_environment(**env) 321 #end if 322 #check that the machine exists and have it complete the job info 323 self.process() 324 325 machine = self.get_machine() 326 self.batch_mode = machine.in_batch_mode() 327 328 if self.bundled_jobs is not None and not machine.batch_capable: 329 self.error('running batched/bundled jobs on {0} is either not possible or not yet implemented, sorry.'.format(machine.name)) 330 #end if 331 #end if 332 333 self.normalize_time() 334 #end def __init__ 335 336 337 def get_machine(self): 338 return Machine.get(self.machine) 339 #end def get_machine 340 341 342 def process(self,machine=None): 343 if machine is None: 344 machine = self.get_machine() 345 #end if 346 machine.process_job(self) 347 #end def process 348 349 350 # test needed 351 def process_options(self,machine=None): 352 if machine is None: 353 machine = self.get_machine() 354 #end if 355 machine.process_job_options(self) 356 #end def process_options 357 358 359 # test needed 360 def initialize(self,sim): 361 self.set_id() 362 self.identifier = sim.identifier 363 machine = self.get_machine() 364 if machine.prefixed_output: 365 sim.outfile = sim.identifier + machine.outfile_extension 366 sim.errfile = sim.identifier + machine.errfile_extension 367 #end if 368 if self.directory is None: 369 self.directory = sim.locdir 370 self.abs_dir = os.path.abspath(sim.locdir) 371 elif self.abs_dir is None: 372 self.abs_dir = os.path.abspath(self.directory) 373 #end if 374 if self.subdir is None: 375 if machine.local_directory!=None: 376 self.subdir = os.path.join(machine.local_directory,nexus_core.runs,sim.path) 377 self.abs_subdir = self.subdir 378 else: 379 self.subdir = self.directory 380 self.abs_subdir = self.abs_dir 381 #end if 382 #end if 383 if self.app_name is None: 384 app_name = sim.app_name 385 else: 386 app_name = self.app_name 387 #end if 388 if app_name!=None and not '/' in app_name: 389 ads = machine.app_directories 390 ad = machine.app_directory 391 new_app_name = None 392 if ads!=None and self.app_name in ads: 393 new_app_name = os.path.join(ads[self.app_name],app_name) 394 elif ad!=None: 395 new_app_name = os.path.join(ad,app_name) 396 #end if 397 if new_app_name!=None: 398 self.app_name = new_app_name 399 #end if 400 #end if 401 sim.set_app_name(app_name) 402 self.set( 403 name = sim.identifier, 404 simid = sim.simid, 405 outfile = sim.outfile, 406 errfile = sim.errfile 407 ) 408 if self.app_command is None: 409 self.app_command = sim.app_command() 410 #end if 411 if self.app_props==None: 412 self.app_props = list(sim.app_props) 413 #end if 414 # ensure job is processed properly by this initialization stage 415 self.process() 416 #end def initialize 417 418 419 # test needed 420 def renew_app_command(self,sim): 421 if not self.user_app_command: 422 self.app_command = sim.app_command() 423 #end if 424 #end def renew_app_command 425 426 427 def set_id(self): 428 self.internal_id = Job.generate_jobid() 429 #end def set_id 430 431 432 # remove? 433 def set_processes(self): 434 if self.processes is None: 435 self.error('processes should have been set before now\ncontact the developers and have them fix this','Developer') 436 self.processes = int(ceil(float(self.cores)/self.threads)) 437 #end if 438 #end def set_processes 439 440 441 def set_environment(self,limited_env=False,clear_env=False,**env): 442 machine = self.get_machine() 443 if isinstance(machine,Supercomputer): 444 limited_env = True 445 #end if 446 if self.env is None: 447 self.env = os.environ.copy() 448 if limited_env: 449 self.env.clear() 450 #end if 451 #end if 452 if clear_env: 453 self.env.clear() 454 #end if 455 for n,v in env.items(): 456 self.env[n]=str(v) 457 #end for 458 #end def set_environment 459 460 461 def divert_out_err(self): 462 self.identifier += '_divert' 463 #end def divert_out_err 464 465 466 def get_time(self): 467 time = obj( 468 days = self.days, 469 hours = self.hours, 470 minutes = self.minutes, 471 seconds = self.seconds 472 ) 473 return time 474 #end def get_time 475 476 477 def max_time(self,time): 478 t = time.seconds + 60*(time.minutes+60*(time.hours+24*time.days)) 479 ts = self.seconds + 60*(self.minutes+60*(self.hours+24*self.days)) 480 if ts>t: 481 time.days = self.days 482 time.hours = self.hours 483 time.minutes = self.minutes 484 time.seconds = self.seconds 485 #end if 486 return time 487 #end def max_time 488 489 490 def serial_only(self): 491 return 'serial' in self.app_props and len(self.app_props)==1 492 #end if 493 494 495 # remove? 496 def determine_end_status(self,status): 497 if not nexus_core.generate_only: 498 self.successful = False # not really implemented yet 499 #end if 500 #end def determine_end_status 501 502 503 # test needed 504 def write(self,file=False): 505 machine = self.get_machine() 506 return machine.write_job(self,file=file) 507 #end def write 508 509 510 # test needed 511 def submit(self): 512 machine = self.get_machine() 513 machine.add_job(self) 514 self.submitted = True 515 #end def submit 516 517 518 # test needed 519 def reenter_queue(self): 520 machine = self.get_machine() 521 machine.requeue_job(self) 522 #end def reenter_queue 523 524 525 def run_command(self,launcher=None,redirect=False,serial=False): 526 machine = self.get_machine() 527 if launcher is None: 528 launcher = machine.app_launcher 529 #end if 530 c = '' 531 if self.bundled_jobs is None: 532 if self.full_command is not None: 533 c = self.full_command 534 else: 535 if self.app_command is None: 536 self.error('app_command has not been provided') 537 #end if 538 if launcher=='runjob': 539 separator = ' : ' 540 else: 541 separator = ' ' 542 #end if 543 if self.serial and self.processes==1: 544 c = '' 545 else: 546 c = launcher + self.run_options.write() + separator 547 #end if 548 c+=self.app_command+self.app_options.write() 549 if redirect: 550 c+=' >'+self.outfile+' 2>'+self.errfile 551 if not serial: 552 c+='&' 553 #end if 554 elif machine.redirect_output and self.outfile is not None: 555 c+=' >'+self.outfile+' 2>'+self.errfile 556 #end if 557 #end if 558 elif self.relative: 559 cdir = self.abs_subdir 560 c+='\n' 561 for job in self.bundled_jobs: 562 c+='\ncd '+os.path.relpath(job.abs_subdir,cdir)+'\n' 563 c+=job.run_command(launcher,redirect=True,serial=serial)+'\n' 564 cdir = job.abs_subdir 565 #end for 566 c+='\nwait\n' 567 else: 568 c+='\n' 569 for job in self.bundled_jobs: 570 c+='\ncd '+job.abs_subdir+'\n' 571 c+=job.run_command(launcher,redirect=True,serial=serial)+'\n' 572 #end for 573 c+='\nwait\n' 574 #end if 575 return c 576 #end def run_command 577 578 579 def pbs_walltime(self): 580 walltime=\ 581 str(int(self.hours )).zfill(2)+':'\ 582 +str(int(self.minutes)).zfill(2)+':'\ 583 +str(int(self.seconds)).zfill(2) 584 if self.days!=0: 585 walltime = str(self.days)+':'+walltime 586 #end if 587 return walltime 588 #end def pbs_walltime 589 590 591 def sbatch_walltime(self): 592 walltime=\ 593 str(int(24*self.days+self.hours)).zfill(2)+':'\ 594 +str(int(self.minutes)).zfill(2)+':'\ 595 +str(int(self.seconds)).zfill(2) 596 return walltime 597 #end def sbatch_walltime 598 599 600 def ll_walltime(self): 601 walltime=\ 602 str(int(24*self.days+self.hours)).zfill(2)+':'\ 603 +str(int(self.minutes)).zfill(2)+':'\ 604 +str(int(self.seconds)).zfill(2) 605 return walltime 606 #end def ll_walltime 607 608 609 def lsf_walltime(self): 610 walltime=\ 611 str(int(24*self.days+self.hours)).zfill(2)+':'\ 612 +str(int(self.minutes)).zfill(2) 613 return walltime 614 #end def lsf_walltime 615 616 617 def normalize_time(self): 618 t = self.total_seconds() 619 d = int(t/(24*3600)) 620 t -= d*24*3600 621 h = int(t/3600) 622 t -= h*3600 623 m = int(t/60) 624 t -= m*60 625 s = int(t) 626 self.days = d 627 self.hours = h 628 self.minutes = m 629 self.seconds = s 630 #end def normalize_time 631 632 633 def total_seconds(self): 634 return self.seconds+60*(self.minutes+60*(self.hours+24*self.days)) 635 #end def total_seconds 636 637 638 def total_minutes(self): 639 return int(self.total_seconds()/60) 640 #end def total_minutes 641 642 643 def total_hours(self): 644 return int(self.total_seconds()/3600) 645 #end def total_hours 646 647 648 def total_days(self): 649 return int(self.total_seconds()/(24*3600)) 650 #end def total_days 651 652 653 def clone(self): 654 job = self.copy() 655 job.set_id() 656 return job 657 #end def clone 658 659 660 def serial_clone(self): 661 kw = self.init_info.copy() 662 kw.serial=True 663 return Job(**kw) 664 #end def serial_clone 665 666 667 def split_nodes(self,n): 668 run_options = self.run_options 669 if not isinstance(n,int): 670 self.error('cannot split job by nodes\nrequested split value must be an integer\nreceived type: {0}\nwith value: {1}'.format(n.__class__.__name__,n)) 671 elif n<1 or n>=self.nodes: 672 self.error('cannot split job by nodes\nrequested split must be in the range [1,{0})\nrequested split: {1}'.format(self.nodes,n)) 673 #end if 674 m = self.get_machine() 675 if m.app_launcher=='srun': 676 self.error('splitting jobs by nodes is not currently supported on machine "{0}" (SLURM)'.format(m.name)) 677 #end if 678 job1 = self.clone() 679 job2 = self.clone() 680 job1.nodes = n 681 job2.nodes = self.nodes - n 682 m.process_job(job1) 683 m.process_job(job2) 684 return job1,job2 685 #end def split_nodes 686#end class Job 687 688 689 690 691class Machine(NexusCore): 692 693 machines = obj() 694 695 modes = obj( 696 none = 0, 697 interactive = 1, 698 batch = 2 699 ) 700 mode = modes.none 701 702 batch_capable = False 703 requires_account = False 704 executable_subfile = False 705 redirect_output = False 706 query_with_username = False 707 708 prefixed_output = False 709 outfile_extension = None 710 errfile_extension = None 711 712 allow_warnings = True 713 714 @staticmethod 715 def get_hostname(): 716 hostname = gethostname() 717 if '.' in hostname: 718 machine_name = hostname.split('.')[0] 719 else: 720 machine_name = hostname 721 #end if 722 return machine_name.lower() 723 #end def get_hostname 724 725 726 @staticmethod 727 def exists(machine_name): 728 return machine_name in Machine.machines 729 #end def exists 730 731 732 @staticmethod 733 def is_unique(machine): 734 return id(machine)==id(Machine.machines[machine.name]) 735 #end def is_unique 736 737 738 @staticmethod 739 def add(machine): 740 if not isinstance(machine,Machine): 741 Machine.class_error('attempted to add non-machine instance') 742 #end if 743 if not 'name' in machine: 744 Machine.class_error('attempted to add a machine without a name') 745 #end if 746 name = machine.name 747 if not name in Machine.machines: 748 Machine.machines[name] = machine 749 else: 750 Machine.class_error('attempted to create machine {0}, but it already exists'.format(name)) 751 #end if 752 #end def add 753 754 755 @staticmethod 756 def get(machine_name): 757 if isinstance(machine_name,str): 758 machine_name = machine_name.lower() 759 else: 760 Machine.class_error('machine name must be a string, you provided a '+machine_name.__class__.__name__) 761 #end if 762 if Machine.exists(machine_name): 763 machine = Machine.machines[machine_name] 764 else: 765 machs = sorted(Machine.machines.keys()) 766 Machine.class_error('attempted to get machine '+machine_name+', but it is unknown\nknown options are '+str(machs)) 767 #end if 768 return machine 769 #end def get 770 771 772 def warn(self,*args,**kwargs): 773 if Machine.allow_warnings: 774 NexusCore.warn(self,*args,**kwargs) 775 #end if 776 #end def warn 777 778 779 def validate(self): 780 if Machine.exists(self.name): 781 if not Machine.is_unique(self): 782 self.error('duplicate instance of machine '+self.name+' encountered\n this is either a developer error, or you have created a duplicate machine') 783 #end if 784 else: 785 self.error('machine {0} id {1} was created without calling Machine.__init__() and is therefore invalid'.format(self.name,id(self))) 786 #end if 787 #end def validate 788 789 790 def in_batch_mode(self): 791 return self.mode==self.modes.batch 792 #end def in_batch_mode 793 794 795 def query_queue(self): 796 self.not_implemented() 797 #end def query_queue 798 799 def submit_jobs(self): 800 self.not_implemented() 801 #end def submit_jobs 802 803 # update all job information, must be idempotent 804 def process_job(self,job): 805 self.not_implemented() 806 #end def process_job 807 808 def process_job_options(self,job): 809 self.not_implemented() 810 #end def process_job_options 811 812 def write_job(self,job,file=False): 813 self.not_implemented() 814 #end def write_job 815 816 def submit_job(self,job): 817 self.not_implemented() 818 #end def submit_job 819 820 821 def __init__(self,name,queue_size=0): 822 self.name = name 823 self.queue_size = queue_size 824 self.processes = obj() 825 self.jobs = obj() 826 self.waiting = set() 827 self.running = set() 828 self.finished= set() 829 830 #user defined variables 831 self.account = None 832 self.user = None 833 self.local_directory = None 834 self.app_directory = None 835 self.app_directories = None 836 837 if not isinstance(name,str): 838 self.error('machine name must be a string\nyou provided '+str(name)) 839 #end if 840 841 Machine.add(self) 842 #end def __init__ 843 844 845 def restore_default_settings(self): 846 self.account = None 847 self.user = None 848 self.local_directory = None 849 self.app_directory = None 850 self.app_directories = None 851 #end def restore_default_settings 852 853 854 def add_job(self,job): 855 if isinstance(job,Job): 856 self.process_job(job) 857 self.write_job(job) 858 jid = job.internal_id 859 self.jobs[jid] = job 860 job.status = job.states.waiting 861 self.waiting.add(jid) 862 #self.write_job_states('add_job') 863 else: 864 self.error('add_job received non-Job instance '+job.__class__.__name__) 865 #end if 866 #end def add_job 867 868 869 def requeue_job(self,job): 870 None 871 #end def requeue_job 872 873 874 allowed_user_info = set(['account','local_directory','app_directory','app_directories']) 875 def incorporate_user_info(self,infoin): 876 info = obj(**infoin) 877 vars = set(info.keys()) 878 invalid = vars-self.allowed_user_info 879 if len(invalid)>0: 880 self.error('invalid inputs encountered in incorporate_user_info\nallowed inputs: {0}\n invalid inputs: {1}'.format(list(self.allowed_user_info),list(invalid))) 881 #end if 882 if 'app_directories' in info: 883 ad = info.app_directories 884 if not isinstance(ad,dict) and not isinstance(ad,obj): 885 self.error('app_directories must be of type dict or obj\nyou provided '+ad.__class__.__name__) 886 #end if 887 #end if 888 self.transfer_from(info) 889 #end def incorporate_user_info 890#end class Machine 891 892 893 894 895class Workstation(Machine): 896 897 mode = Machine.modes.interactive 898 899 batch_capable = False 900 901 def __init__(self, 902 name = 'workstation', 903 cores = None, 904 app_launcher = 'mpirun', 905 process_granularity = 1 906 ): 907 Machine.__init__(self,name) 908 self.app_launcher = app_launcher 909 if cores==None: 910 self.cores = cpu_count() 911 else: 912 self.cores = cores 913 #end if 914 self.queue_size = cores 915 self.process_granularity = process_granularity 916 #end def __init__ 917 918 919 def process_job(self,job): 920 if job.serial_only(): 921 job.cores=1 922 elif job.cores==None: 923 if job.processes!=None: 924 job.cores = job.processes*job.threads 925 else: 926 job.cores = self.cores 927 #end if 928 #end if 929 job.processes = max(1,int(floor(float(job.cores)/job.threads))) 930 grains = int(ceil(float(job.cores)/self.process_granularity)) 931 if abs(grains-1-float(job.cores)/self.process_granularity)<1e-6: 932 grains-=1 933 #end if 934 job.grains = grains 935 job.cores = grains*self.process_granularity 936 937 self.process_job_options(job) 938 #end def process_job 939 940 941 def process_job_options(self,job): 942 job.run_options.add(np='-np '+str(job.processes)) 943 #end def process_job_options 944 945 946 def write_job_states(self,title=''): 947 self.log(title,n=2) 948 n=3 949 self.log('{0} {1} {2} job states'.format(self.__class__.__name__,self.name,id(self)),n=n ) 950 self.log('processes',n=n+1) 951 for process in self.processes: 952 job = process.job 953 self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2) 954 #end for 955 self.log('jobs',n=n+1) 956 jobids = list(self.jobs.keys()) 957 jobids.sort() 958 for jobid in jobids: 959 job = self.jobs[jobid] 960 self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2) 961 #end for 962 self.log('waiting',n=n+1) 963 jobids = list(self.waiting) 964 jobids.sort() 965 for jobid in jobids: 966 job = self.jobs[jobid] 967 self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2) 968 #end for 969 self.log('running',n=n+1) 970 jobids = list(self.running) 971 jobids.sort() 972 for jobid in jobids: 973 job = self.jobs[jobid] 974 self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2) 975 #end for 976 self.log('finished',n=n+1) 977 jobids = list(self.finished) 978 jobids.sort() 979 for jobid in jobids: 980 job = self.jobs[jobid] 981 self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2) 982 #end for 983 self.log('end job states',n=1) 984 #end def write_job_states 985 986 987 def query_queue(self): 988 #self.write_job_states('query queue') 989 self.validate() 990 done = [] 991 for pid,process in self.processes.items(): 992 if nexus_core.generate_only or not nexus_core.monitor: 993 qpid,status = pid,0 994 else: 995 qpid,status = os.waitpid(pid,os.WNOHANG) 996 #end if 997 if pid==qpid: 998 job = process.job 999 job.status = job.states.finished 1000 job.finished = True 1001 job.determine_end_status(status) 1002 iid = job.internal_id 1003 self.running.remove(iid) 1004 self.finished.add(iid) 1005 done.append(pid) 1006 if not nexus_core.generate_only: 1007 job.out.close() 1008 job.err.close() 1009 #end if 1010 #end if 1011 #end for 1012 for pid in done: 1013 del self.processes[pid] 1014 #end for 1015 #end def query_queue 1016 1017 1018 def submit_jobs(self): 1019 cores_used = 0 1020 for process in self.processes: 1021 cores_used += process.job.cores 1022 #end for 1023 cores_available = self.cores-cores_used 1024 1025 core_req = [] 1026 job_req = [] 1027 for iid in self.waiting: 1028 job = self.jobs[iid] 1029 job_req.append(job) 1030 core_req.append(job.cores) 1031 #end for 1032 core_req = array(core_req,dtype=int) 1033 1034 # The following line does not work correctly under Numpy 1.10 or greater. 1035 # It should create an ndarray of Job objects from a list of Job objects. 1036 # Instead it creates nested ndarray's with inner type of bool 1037 #job_req = array(job_req ,dtype=object) 1038 1039 job_req_tmp = job_req 1040 job_req = empty(len(job_req_tmp) ,dtype=object) 1041 for idx,job in enumerate(job_req_tmp): 1042 job_req[idx] = job 1043 #end for 1044 1045 order = core_req.argsort() 1046 job_req = job_req[order] 1047 1048 for job in job_req: 1049 if job.cores>self.cores and not nexus_core.generate_only: 1050 self.error('job '+str(job.internal_id)+' is too large to run on this machine\ncores requested: '+str(job.cores)+'\nmachine cores: '+str(self.cores)) 1051 #end if 1052 if job.cores<=cores_available: 1053 iid = job.internal_id 1054 self.waiting.remove(iid) 1055 self.running.add(iid) 1056 self.submit_job(job) 1057 cores_available-=job.cores 1058 elif job.cores>self.cores: 1059 self.error('job requested more cores than are present on '+self.name+'\ncores requested: {0}\ncores present: {1}'.format(job.cores,self.cores)) 1060 else: 1061 break 1062 #end if 1063 #end for 1064 #end def submit_jobs 1065 1066 1067 def job_command(self,job,pad=None): 1068 command = 'export OMP_NUM_THREADS='+str(job.threads)+'\n' 1069 if len(job.presub)>0: 1070 command += job.presub+'\n' 1071 #end if 1072 if job.serial is not None: 1073 command += job.run_command(self.app_launcher,serial=job.serial) 1074 else: 1075 command += job.run_command(self.app_launcher) 1076 #end if 1077 if len(job.postsub)>0: 1078 command += job.postsub+'\n' 1079 #end if 1080 if pad!=None: 1081 command = ('\n'+command).replace('\n','\n '+pad) 1082 #end if 1083 return command 1084 #end def job_command 1085 1086 1087 def write_job(self,job,file=False): 1088 c = self.job_command(job) 1089 return c 1090 #end def write_job 1091 1092 1093 def submit_job(self,job): 1094 pad = self.enter(job.directory,msg=job.simid) 1095 command = self.job_command(job,pad=pad) 1096 job.status = job.states.running 1097 process = obj() 1098 process.job = job 1099 if nexus_core.generate_only: 1100 self.log(pad+'Would have executed: '+command) 1101 job.system_id = job.internal_id 1102 else: 1103 if nexus_core.monitor: 1104 self.log(pad+'Executing: '+command) 1105 job.out = open(job.outfile,'w') 1106 job.err = open(job.errfile,'w') 1107 p = Popen(command,env=job.env,stdout=job.out,stderr=job.err,shell=True) 1108 process.popen = p 1109 job.system_id = p.pid 1110 else: 1111 command+=' >'+job.outfile+' 2>'+job.errfile+'&' 1112 self.log(pad+'Executing: '+command) 1113 os.system(command) 1114 job.system_id = job.internal_id 1115 #end if 1116 #end if 1117 self.processes[job.system_id] = process 1118 self.leave() 1119 #end def submit_job 1120#end class Workstation 1121 1122 1123 1124 1125# test needed 1126class InteractiveCluster(Workstation): 1127 1128 def __init__(self,*args,**kwargs): 1129 if len(args)==0 or not isinstance(args[0],Supercomputer): 1130 self.init_from_args(*args,**kwargs) 1131 else: 1132 super = args[0] 1133 cores = args[1] 1134 self.init_from_supercomputer(super,cores) 1135 #end if 1136 Machine.__init__(self,self.name,self.queue_size) 1137 #end def __init__ 1138 1139 1140 def init_from_args(self, 1141 name = 'icluster', 1142 nodes = None, 1143 procs_per_node = None, 1144 cores_per_proc = None, 1145 process_granularity = None, 1146 ram_per_node = None, 1147 app_launcher = None 1148 ): 1149 self.name = name 1150 self.nodes = nodes 1151 self.procs_per_node = procs_per_node 1152 self.cores_per_proc = cores_per_proc 1153 self.process_granularity = process_granularity 1154 self.ram_per_node = ram_per_node 1155 self.app_launcher = app_launcher 1156 1157 self.cores_per_node = self.cores_per_proc*self.procs_per_node 1158 if process_granularity is None: 1159 self.process_granularity = self.cores_per_node 1160 #end if 1161 1162 self.procs = self.procs_per_node*self.nodes 1163 self.cores = self.cores_per_proc*self.procs 1164 self.ram = self.ram_per_node*self.nodes 1165 1166 self.queue_size = self.cores 1167 #end def init_from_args 1168 1169 1170 def init_from_supercomputer(self,super,cores): 1171 nodes = cores//super.cores_per_node 1172 if cores-nodes*super.cores_per_node!=0: 1173 self.error('interactive cores corresponds to a fractional number of nodes\n cores '+str(cores)+'\n cores per node '+str(super.cores_per_node)) 1174 #end if 1175 self.init_from_args(super.name+'_interactive',nodes,super.procs_per_node, 1176 super.cores_per_proc,super.cores_per_node, 1177 super.ram_per_node,super.app_launcher) 1178 #end def init_from_supercomputer 1179 1180 1181 def process_job(self,job): 1182 job.cores = min(job.cores,self.cores) 1183 1184 Workstation.process_job(self,job) 1185 1186 job.nodes = job.grains 1187 job.procs = job.nodes*self.procs_per_node 1188 1189 if mod(job.processes,job.nodes)!=0: 1190 job.processes_per_node = None 1191 else: 1192 job.processes_per_node = job.processes//job.nodes 1193 #end if 1194 1195 if mod(job.processes,job.procs)!=0: 1196 job.processes_per_proc = None 1197 else: 1198 job.processes_per_proc = job.processes//job.procs 1199 #end if 1200 #end def process_job 1201#end class InteractiveCluster 1202 1203 1204 1205 1206class Supercomputer(Machine): 1207 mode = Machine.modes.batch 1208 name = 'supercomputer' 1209 1210 batch_capable = False #only set to true for specific machines 1211 1212 aprun_options = set(['n','d']) 1213 1214 required_inputs = [ 1215 'nodes', 1216 'procs_per_node', 1217 'cores_per_proc', 1218 'ram_per_node', 1219 'queue_size', 1220 'app_launcher', 1221 'sub_launcher', 1222 'queue_querier', 1223 'job_remover' 1224 ] 1225 1226 def __init__(self, 1227 nodes = None, 1228 procs_per_node = None, 1229 cores_per_proc = None, 1230 ram_per_node = None, 1231 queue_size = 0, 1232 app_launcher = None, 1233 sub_launcher = None, 1234 queue_querier = None, 1235 job_remover = None, 1236 name = None, 1237 ): 1238 if name is None: 1239 if self.name is not None: 1240 name = self.name 1241 else: 1242 name = self.__class__.__name__.lower() 1243 #end if 1244 #end if 1245 Machine.__init__(self,name) 1246 self.nodes = nodes # # of nodes 1247 self.procs_per_node = procs_per_node # # of processors/sockets on a node 1248 self.cores_per_proc = cores_per_proc # # of cores on a processor/socket 1249 self.ram_per_node = ram_per_node 1250 self.queue_size = queue_size 1251 self.app_launcher = app_launcher 1252 self.sub_launcher = sub_launcher 1253 self.queue_querier = queue_querier 1254 self.job_remover = job_remover 1255 1256 for var in Supercomputer.required_inputs: 1257 if self[var] is None: 1258 self.error('input variable '+var+' is required to initialize Supercomputer object.') 1259 #end if 1260 #end for 1261 1262 self.cores_per_node = self.cores_per_proc*self.procs_per_node 1263 1264 self.procs = self.procs_per_node*self.nodes 1265 self.cores = self.cores_per_proc*self.procs 1266 self.ram = self.ram_per_node*self.nodes 1267 1268 # 'complete' is the only actively used status so far 1269 # At least one queue state should correspond to 'complete', 1270 # though even this is not strictly necessary. 1271 # In general if a pid is missing from the queue, 1272 # that job is assumed to be complete. 1273 1274 self.system_queue = obj() 1275 if self.queue_querier=='qstat': 1276 self.job_states=dict(E = 'exiting', 1277 H = 'held', 1278 Q = 'queued', 1279 R = 'running', 1280 S = 'suspended', 1281 T = 'transferring', 1282 W = 'waiting', 1283 C = 'complete' 1284 ) 1285 elif self.queue_querier=='qstata': 1286 #already gives status as queued, running, etc. 1287 None 1288 elif self.queue_querier=='squeue': 1289 self.job_states=dict(CG = 'exiting', 1290 TO = 'timeout', 1291 CA = 'failed', 1292 F = 'failed', 1293 NF = 'node_fail', 1294 PD = 'waiting', 1295 R = 'running', 1296 S = 'suspended', 1297 CD = 'complete', 1298 RD = 'held', 1299 BF = 'failed', 1300 CF = 'configuring', 1301 DL = 'deadline', 1302 OOM= 'out_of_memory', 1303 PR = 'preempted', 1304 RF = 'requeue_fed', 1305 RH = 'requeue_hold', 1306 RQ = 'requeued', 1307 RS = 'resizing', 1308 RV = 'revoked', 1309 SI = 'signaling', 1310 SE = 'special_exit', 1311 SO = 'stage_out', 1312 ST = 'stopped', 1313 ) 1314 elif self.queue_querier=='sacct': 1315 self.job_states=dict(CANCELLED = 'failed', #long form 1316 COMPLETED = 'complete', 1317 COMPLETING = 'exiting', 1318 CONFIGURING = 'waiting', 1319 FAILED = 'failed', 1320 PREMEEMPTED = 'failed', 1321 PENDING = 'waiting', 1322 NODE_FAIL = 'failed', 1323 RESIZING = 'resizing', 1324 RUNNING = 'running', 1325 SUSPENDED = 'suspended', 1326 TIMEOUT = 'failed', 1327 CA = 'failed', #short form 1328 CD = 'complete', 1329 CG = 'exiting', 1330 CF = 'waiting', 1331 F = 'failed', 1332 PR = 'failed', 1333 PD = 'waiting', 1334 NF = 'failed', 1335 RS = 'resizing', 1336 R = 'running', 1337 S = 'suspended', 1338 TO = 'failed' 1339 ) 1340 elif self.queue_querier=='llq': 1341 self.job_states=dict(I = 'idle', 1342 NQ = 'not_queued', 1343 H = 'user_hold', 1344 S = 'system_hold', 1345 HS = 'user_system_hold', 1346 D = 'deferred', 1347 R = 'running', 1348 P = 'pending', 1349 ST = 'starting', 1350 C = 'complete', 1351 CA = 'canceled', 1352 E = 'preempted', 1353 EP = 'preempt_pending', 1354 MP = 'resume_pending', 1355 ) 1356 elif self.queue_querier=='bjobs': 1357 self.job_states=dict(PEND = 'pending', 1358 RUN = 'running', 1359 DONE = 'complete', 1360 EXIT = 'failed', 1361 PSUSP = 'suspended', 1362 USUSP = 'suspended', 1363 SSUSP = 'suspended', 1364 ) 1365 elif self.queue_querier=='test_query': 1366 None 1367 else: 1368 self.error('ability to query queue with '+self.queue_querier+' has not yet been implemented') 1369 #end if 1370 1371 #end def __init__ 1372 1373 1374 # test needed 1375 def interactive_representation(self,cores): 1376 return InteractiveCluster(self,cores) 1377 #end def interactive_representation 1378 1379 1380 # test needed 1381 def requeue_job(self,job): 1382 if isinstance(job,Job): 1383 jid = job.internal_id 1384 pid = job.system_id 1385 if pid is None: 1386 self.error('job {0} does not have a process id issued by the scheduler'.format(jid)) 1387 #end if 1388 self.process_job(job) 1389 self.jobs[jid] = job 1390 job.status = job.states.running 1391 self.running.add(jid) 1392 process = obj(job=job) 1393 self.processes[pid] = process 1394 else: 1395 self.error('requeue_job received non-Job instance '+job.__class__.__name__) 1396 #end if 1397 #end def requeue_job 1398 1399 1400 def process_job(self,job): 1401 if job.fake_job: 1402 return 1403 #end if 1404 1405 self.pre_process_job(job) 1406 1407 job.subfile = job.name+'.'+self.sub_launcher+'.in' 1408 no_cores = job.cores is None 1409 no_nodes = job.nodes is None 1410 if no_cores and no_nodes: 1411 self.error('job did not specify cores or nodes\nAt least one must be provided') 1412 elif no_cores: 1413 job.cores = self.cores_per_node*job.nodes 1414 elif no_nodes: 1415 job.nodes = int(ceil(float(job.cores)/self.cores_per_node)) 1416 if abs(job.nodes-1-float(job.cores)/self.cores_per_node)<1e-6: 1417 job.nodes-=1 1418 #end if 1419 else: 1420 job.cores = min(job.cores,job.nodes*self.cores_per_node) 1421 #end if 1422 if job.processes_per_node is not None: 1423 job.processes = job.nodes*job.processes_per_node 1424 else: 1425 job.processes = max(1,int(float(job.cores)/job.threads)) 1426 #end if 1427 job.tot_cores = job.nodes*self.cores_per_node 1428 job.procs = job.nodes*self.procs_per_node 1429 1430 if mod(job.processes,job.nodes)!=0: 1431 job.processes_per_node = None 1432 else: 1433 job.processes_per_node = job.processes//job.nodes 1434 #end if 1435 1436 if mod(job.processes,job.procs)!=0: 1437 job.processes_per_proc = None 1438 else: 1439 job.processes_per_proc = job.processes//job.procs 1440 #end if 1441 1442 if job.ppn is None: 1443 job.ppn = self.cores_per_node 1444 #end if 1445 1446 if job.account is None: 1447 if self.account is not None: 1448 job.account = self.account 1449 elif self.requires_account: 1450 self.error('account not specified for job on '+self.name) 1451 #end if 1452 #end if 1453 1454 self.post_process_job(job) 1455 1456 job.set_environment(OMP_NUM_THREADS=job.threads) 1457 1458 self.process_job_options(job) 1459 #end def process_job 1460 1461 1462 def process_job_options(self,job): 1463 launcher = self.app_launcher 1464 if launcher=='mpirun': 1465 job.run_options.add(np='-np '+str(job.processes)) 1466 elif launcher=='mpiexec': 1467 job.run_options.add(n='-n '+str(job.processes)) 1468 elif launcher=='aprun': 1469 if 'n' in self.aprun_options: 1470 job.run_options.add(n='-n '+str(job.processes)) 1471 #end if 1472 if 'd' in self.aprun_options and job.threads>1: 1473 job.run_options.add(d='-d '+str(job.threads)) 1474 #end if 1475 if 'N' in self.aprun_options and job.processes_per_node is not None: 1476 job.run_options.add(N='-N '+str(job.processes_per_node)) 1477 #end if 1478 if 'S' in self.aprun_options and job.processes_per_proc is not None: 1479 job.run_options.add(S='-S '+str(job.processes_per_proc)) 1480 #end if 1481 elif launcher=='runjob': 1482 #bypass setup_environment 1483 if job.env is not None: 1484 envs='--envs' 1485 for name,value in job.env.items(): 1486 envs+=' {0}={1}'.format(name,value) 1487 #end for 1488 job.env = None 1489 elif 'envs' in job.run_options: 1490 envs = job.run_options.envs 1491 else: 1492 self.error('failed to set env options for runjob') 1493 #end if 1494 job.run_options.add( 1495 np = '--np '+str(job.processes), 1496 p = '-p '+str(job.processes_per_node), 1497 xlocargs = '$LOCARGS', 1498 verbose = '--verbose=INFO', 1499 envs = envs 1500 ) 1501 elif launcher=='srun': # Amos contribution from Ryan McAvoy 1502 None 1503 elif launcher=='ibrun': # Lonestar contribution from Paul Young 1504 job.run_options.add( 1505 np = '-n '+str(job.processes), 1506 p = '-o '+str(0), 1507 ) 1508 elif launcher=='jsrun': # Summit 1509 None # Summit class takes care of this in post_process_job 1510 else: 1511 self.error(launcher+' is not yet implemented as an application launcher') 1512 #end if 1513 #end def process_job_options 1514 1515 1516 def pre_process_job(self,job): 1517 None 1518 #end def pre_process_job 1519 1520 1521 def post_process_job(self,job): 1522 None 1523 #end def post_process_job 1524 1525 1526 def query_queue(self,out=None): 1527 self.system_queue.clear() 1528 if self.query_with_username and self.user is None: 1529 self.error('querying queue on machine "{}" requires user name\nplease provide username via the "user" keyword in settings'.format(self.name)) 1530 #end if 1531 if self.queue_querier=='qstat': 1532 if out is None: 1533 out,err,rc = execute('qstat -a') 1534 #end if 1535 lines = out.splitlines() 1536 for line in lines: 1537 tokens=line.split() 1538 if len(tokens)>0: 1539 if '.' in tokens[0]: 1540 spid = tokens[0].split('.')[0] 1541 else: 1542 spid = tokens[0] 1543 #endif 1544 if spid.isdigit() and len(tokens)==11: 1545 pid = int(spid) 1546 jid,uname,queue,jname,sessid,nodes,tasks,mem,rtime,status,etime = tokens 1547 if status in self.job_states: 1548 self.system_queue[pid] = self.job_states[status] 1549 else: 1550 self.error('job state '+status+' is unrecognized') 1551 #end if 1552 #end if 1553 #end if 1554 #end for 1555 elif self.queue_querier=='qstata': 1556 if out is None: 1557 out,err,rc = execute('qstat') 1558 #end if 1559 lines = out.splitlines() 1560 for line in lines: 1561 tokens=line.split() 1562 if len(tokens)>0: 1563 if '.' in tokens[0]: 1564 spid = tokens[0].split('.')[0] 1565 else: 1566 spid = tokens[0] 1567 #endif 1568 if spid.isdigit() and len(tokens)==6: 1569 pid = int(spid) 1570 jid,uname,wtime,nodes,status,loc = tokens 1571 self.system_queue[pid] = status 1572 #end if 1573 #end if 1574 #end for 1575 elif self.queue_querier=='squeue': # contributed by Ryan McAvoy 1576 if out is None: 1577 extra = '' 1578 if self.user is not None: 1579 extra = ' -u {}'.format(self.user) 1580 #end if 1581 out,err,rc = execute('squeue'+extra) 1582 #end if 1583 lines = out.splitlines() 1584 for line in lines: 1585 tokens=line.split() 1586 if len(tokens)>0: 1587 if '.' in tokens[0]: 1588 spid = tokens[0].split('.')[0] 1589 else: 1590 spid = tokens[0] 1591 #endif 1592 if spid.isdigit(): 1593 pid = int(spid) 1594 status = None 1595 jid,loc,name,uname,status,wtime,nodes,reason = tokens[:8] 1596 if status is not None: 1597 if status in self.job_states: 1598 self.system_queue[pid] = self.job_states[status] 1599 else: 1600 self.error('job state '+status+' is unrecognized') 1601 #end if 1602 #end if 1603 #end if 1604 #end if 1605 #end for 1606 elif self.queue_querier=='sacct': # contributed by Ryan McAvoy 1607 if out is None: 1608 out,err,rc = execute('sacct') 1609 #end if 1610 lines = out.splitlines() 1611 for line in lines: 1612 tokens=line.split() 1613 if len(tokens)>0: 1614 if '.' in tokens[0]: 1615 spid = tokens[0].split('.')[0] 1616 else: 1617 spid = tokens[0] 1618 #endif 1619 if spid.isdigit() and len(tokens)==6: #if account is empty, only 6 tokens. 1620 1621 pid = int(spid) 1622 jid,name,loc,cores,status,exit_code = tokens 1623 status = status.split('+')[0] ## get rid of '+' in the end 1624 if status in self.job_states: 1625 self.system_queue[pid] = self.job_states[status] 1626 else: 1627 self.error('job state '+status+' is unrecognized') 1628 #end if 1629 elif spid.isdigit() and len(tokens)==7: 1630 1631 pid = int(spid) 1632 jid,name,loc,uname,cores,status,exit_code = tokens 1633 status = status.split('+')[0] ## get rid of '+' in the end 1634 if status in self.job_states: 1635 self.system_queue[pid] = self.job_states[status] 1636 else: 1637 self.error('job state '+status+' is unrecognized') 1638 #end if 1639 #end if 1640 #end if 1641 #end for 1642 elif self.queue_querier=='llq': 1643 if out is None: 1644 out,err,rc = execute('sacct') 1645 #end if 1646 lines = out.splitlines() 1647 for line in lines: 1648 tokens=line.split() 1649 if len(tokens)>0: 1650 if '.' in tokens[0]: 1651 spid = tokens[0].split('.')[1] 1652 else: 1653 spid = tokens[0] 1654 #endif 1655 if spid.isdigit() and (len(tokens)==7 or len(tokens)==8): 1656 pid = int(spid) 1657 if len(tokens)==7: 1658 jid,owner,subdate,subtime,status,pri,class_ = tokens 1659 elif len(tokens)==8: 1660 jid,owner,subdate,subtime,status,pri,class_,running_on = tokens 1661 #end if 1662 if status in self.job_states: 1663 self.system_queue[pid] = self.job_states[status] 1664 else: 1665 self.error('job state '+status+' is unrecognized') 1666 #end if 1667 #end if 1668 #end if 1669 #end for 1670 elif self.queue_querier=='bjobs': 1671 if out is None: 1672 out,err,rc = execute('bjobs') 1673 #end if 1674 lines = out.splitlines() 1675 for line in lines: 1676 tokens=line.split() 1677 if len(tokens)>0: 1678 spid = tokens[0] 1679 if spid.isdigit() and len(tokens)==8: 1680 pid = int(spid) 1681 jid,uname,status,slots,queue,start,finish,jname = tokens 1682 if status in self.job_states: 1683 self.system_queue[pid] = self.job_states[status] 1684 else: 1685 self.error('job state '+status+' is unrecognized') 1686 #end if 1687 #end if 1688 #end if 1689 #end for 1690 elif self.queue_querier=='test_query': # for testing 1691 # pretend that all jobs have finished 1692 for pid in self.processes.keys(): 1693 self.system_queue[pid] = 'complete' 1694 #end for 1695 else: 1696 self.error('ability to query queue with '+self.queue_querier+' has not yet been implemented') 1697 #end if 1698 done = [] 1699 for pid,process in self.processes.items(): 1700 if not pid in self.system_queue or self.system_queue[pid]=='complete' or nexus_core.generate_only: 1701 job = process.job 1702 job.status = job.states.finished 1703 job.finished = True 1704 iid = job.internal_id 1705 self.running.remove(iid) 1706 self.finished.add(iid) 1707 done.append(pid) 1708 #end if 1709 #end for 1710 for pid in done: 1711 del self.processes[pid] 1712 #end for 1713 return self.system_queue 1714 #end def query_queue 1715 1716 1717 def submit_jobs(self): 1718 nprocesses_running = len(self.processes) 1719 queue_slots_available = self.queue_size-nprocesses_running 1720 remove = [] 1721 for iid in self.waiting: 1722 if queue_slots_available>0: 1723 remove.append(iid) 1724 self.running.add(iid) 1725 job = self.jobs[iid] 1726 self.submit_job(job) 1727 queue_slots_available -= 1 1728 else: 1729 break 1730 #end if 1731 #end for 1732 for iid in remove: 1733 self.waiting.remove(iid) 1734 #end for 1735 #end def submit_jobs 1736 1737 1738 def submit_job(self,job): 1739 pad = self.enter(job.directory,msg=job.internal_id) 1740 if job.subfile==None: 1741 self.error('submission file not specified for job') 1742 elif not os.path.exists(job.subfile): 1743 self.error('job submission file was not written prior to submission\n submission file: '+os.path.join(job.directory,job.subfile)) 1744 #end if 1745 command = self.sub_command(job) 1746 if nexus_core.generate_only: 1747 self.log(pad+'Would have executed: '+command) 1748 job.status = job.states.running 1749 process = obj() 1750 process.job = job 1751 self.processes[job.internal_id] = process 1752 else: 1753 self.log(pad+'Executing: '+command) 1754 job.status = job.states.running 1755 process = obj() 1756 process.job = job 1757 out,err,rc = execute(command) 1758 output=out+'\n'+err 1759 pid = self.read_process_id(output) 1760 if pid is None: 1761 self.error('process id could not be determined from submission output\n output:\n'+output) 1762 else: 1763 self.log(pad+' pid: {0}'.format(pid)) 1764 #end if 1765 #pid = 'fakepid_'+str(job.internal_id) 1766 job.system_id = pid 1767 self.processes[pid] = process 1768 #end if 1769 self.leave() 1770 #end def submit_job 1771 1772 1773 def sub_command(self,job): 1774 return self.sub_launcher+job.sub_options.write()+' '+job.subfile 1775 #end def sub_command 1776 1777 1778 def remove_job(self,job): 1779 if self.job_remover=='qdel': 1780 command = 'qdel '+str(job.system_id) 1781 elif self.job_remover=='scancel': 1782 command = 'scancel '+str(job.system_id) 1783 else: 1784 self.error('ability to remove job using '+self.job_remover+' has not yet been implemented') 1785 #endif 1786 os.system(command) 1787 #end def remove_job 1788 1789 1790 def setup_environment(self,job): 1791 env = '' 1792 if job.env!=None: 1793 for name,val in job.env.items(): 1794 env +='export {0}={1}\n'.format(name,val) 1795 #end for 1796 #end if 1797 return env 1798 #end def setup_environment 1799 1800 1801 def write_job(self,job,file=False): 1802 job.subfile = job.name+'.'+self.sub_launcher+'.in' 1803 env = self.setup_environment(job) 1804 command = job.run_command(self.app_launcher,serial=job.serial) 1805 1806 c = self.write_job_header(job)+'\n' 1807 if len(job.presub)>0: 1808 c+=job.presub+'\n' 1809 #end if 1810 c+=env 1811 c+=command+'\n' 1812 if len(job.postsub)>0: 1813 c+=job.postsub+'\n' 1814 #end if 1815 if file: 1816 filepath = os.path.join(job.directory,job.subfile) 1817 fobj = open(filepath,'w') 1818 fobj.write(c) 1819 fobj.close() 1820 if self.executable_subfile: 1821 os.system('chmod +x '+filepath) 1822 #end if 1823 #end if 1824 return c 1825 #end def write_job 1826 1827 1828 def write_job_header(self,job): 1829 self.not_implemented() 1830 #end def write_job_header 1831 1832 1833 def read_process_id(self,output): 1834 pid = None 1835 lines = output.splitlines() 1836 if self.sub_launcher=='llsubmit': # specialization for load leveler (SuperMUC) 1837 for line in lines: 1838 if 'llsubmit: The job' in line and '"' in line: 1839 spid = line.split('"')[1].split('.')[1].strip() 1840 if spid.isdigit(): 1841 pid = int(spid) 1842 break 1843 #end if 1844 #end if 1845 #end for 1846 else: # most other machines follow the pattern below 1847 for line in lines: 1848 ls = line.strip() 1849 if ls.isdigit(): 1850 pid = int(ls) 1851 break 1852 elif '.' in line: 1853 spid = line.split('.')[0] 1854 if spid.isdigit(): 1855 pid = int(spid) 1856 break 1857 #end if 1858 elif ' ' in line: # specialized for Amos? 1859 spid = line.split(' ')[-1] 1860 if spid.isdigit(): 1861 pid = int(spid) 1862 break 1863 #end if 1864 #end if 1865 #end for 1866 #end if 1867 return pid 1868 #end def read_process_id 1869 1870#end class Supercomputer 1871 1872 1873 1874# Load local class for local cluster's setting from ~/.nexus/local.py 1875# The following is an example of this file (see machines.py for more examples): 1876''' 1877from machines import Supercomputer 1878class Clustername(Supercomputer): 1879 name = 'clustername' 1880 requires_account = False 1881 batch_capable = True 1882 def write_job_header(self,job): 1883 if job.queue is None: 1884 job.queue='batch' 1885 #end if 1886 c= '#!/bin/bash\n' 1887 c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,job.ppn) 1888 c+='#PBS -l walltime='+job.pbs_walltime()+'\n' 1889 c+='#PBS -N '+job.name +'\n' 1890 c+='#PBS -o '+job.outfile+'\n' 1891 c+='#PBS -e '+job.errfile+'\n' 1892 c+='#PBS -V\n' 1893 c+='#PBS -q '+job.queue+'\n' 1894 c+=' \n ' 1895 return c 1896 #end def write_job_header 1897#end class Clustername 1898# nodes sockets cores ram qslots qlaunch qsubmit qstatus qdelete 1899Clustername( 4, 1, 16, 24, 4, 'mpirun', 'qsub', 'qstat', 'qdel') 1900''' 1901try: 1902 load_source('*',os.path.expanduser('~/.nexus/local_machines.py')) 1903except IOError: 1904 pass 1905except: 1906 raise 1907#end try 1908 1909 1910class Kraken(Supercomputer): 1911 1912 name = 'kraken' 1913 1914 requires_account = True 1915 1916 def write_job_header(self,job): 1917 c='#!/bin/bash\n' 1918 c+='#PBS -A '+str(job.account)+'\n' 1919 c+='#PBS -N '+str(job.name)+'\n' 1920 c+='#PBS -l walltime='+job.pbs_walltime()+'\n' 1921 c+='#PBS -l size='+str(job.tot_cores)+'\n' 1922 c+='#PBS -o '+job.outfile+'\n' 1923 c+='#PBS -e '+job.errfile+'\n' 1924 if job.user_env: 1925 c+='#PBS -V\n' 1926 #end if 1927 c+=''' 1928cd $PBS_O_WORKDIR 1929export MPICH_PTL_SEND_CREDITS=-1 1930export MPICH_MAX_SHORT_MSG_SIZE=1024 1931export MPICH_PTL_UNEX_EVENTS=800000 1932export MPICH_UNEX_BUFFER_SIZE=16M 1933export MPI_MSGS_PER_PROC=32768 1934''' 1935 return c 1936 #end def write_job_header 1937#end class Kraken 1938 1939 1940 1941class Jaguar(Supercomputer): 1942 name = 'jaguar' 1943 1944 requires_account = True 1945 1946 def write_job_header(self,job): 1947 if job.queue is None: 1948 job.queue='batch' 1949 #end if 1950 c='#!/bin/bash\n' 1951 c+='#PBS -A '+str(job.account)+'\n' 1952 c+='#PBS -q '+job.queue+'\n' 1953 c+='#PBS -N '+str(job.name)+'\n' 1954 c+='#PBS -l walltime='+job.pbs_walltime()+'\n' 1955 c+='#PBS -l size='+str(job.tot_cores)+'\n' 1956 c+='#PBS -l gres=widow2%widow3\n' 1957 c+='#PBS -o '+job.outfile+'\n' 1958 c+='#PBS -e '+job.errfile+'\n' 1959 if job.user_env: 1960 c+='#PBS -V\n' 1961 #end if 1962 c+=''' 1963echo $PBS_O_WORKDIR 1964cd $PBS_O_WORKDIR 1965export MPICH_PTL_SEND_CREDITS=-1 1966export MPICH_MAX_SHORT_MSG_SIZE=1024 1967export MPICH_PTL_UNEX_EVENTS=800000 1968export MPICH_UNEX_BUFFER_SIZE=16M 1969export MPI_MSGS_PER_PROC=32768 1970''' 1971 return c 1972 #end def write_job_header 1973#end class Jaguar 1974 1975 1976 1977 1978class Golub(Supercomputer): 1979 1980 name = 'golub' 1981 1982 def write_job_header(self,job): 1983 if job.queue is None: 1984 job.queue='secondary' 1985 #end if 1986 c='' 1987 c+='#PBS -q '+job.queue+'\n' 1988 c+='#PBS -N '+job.name+'\n' 1989 c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,job.ppn) 1990 c+='#PBS -l walltime='+job.pbs_walltime()+'\n' 1991 c+='#PBS -e '+job.errfile+'\n' 1992 c+='#PBS -o '+job.outfile+'\n' 1993 if job.user_env: 1994 c+='#PBS -V\n' 1995 #end if 1996 c+=''' 1997cd ${PBS_O_WORKDIR} 1998 1999''' 2000 return c 2001 #end def write_job_header 2002 2003#end class Taub 2004 2005 2006 2007 2008class OIC5(Supercomputer): 2009 2010 name = 'oic5' 2011 batch_capable = True 2012 2013 def write_job_header(self,job): 2014 if job.queue is None: 2015 job.queue = 'mstqmc13q' 2016 #end if 2017 2018 ppn = job.processes_per_node 2019 #ppn = 32/job.threads 2020 #if ppn*job.threads!=32: 2021 # self.error('ppn is not being set properly for OIC5\n perhaps the number of threads requested does not evenly divide the 32 cores\n you requested {0} threads'.format(job.threads)) 2022 ##end if 2023 2024 c='#!/bin/bash\n' 2025 c+='#PBS -q '+job.queue+'\n' 2026 c+='#PBS -N '+str(job.name)+'\n' 2027 c+='#PBS -l walltime='+job.pbs_walltime()+'\n' 2028 c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,ppn) 2029 c+='#PBS -W x=\"NACCESSPOLICY:SINGLEJOB\"\n' 2030 c+='#PBS -o '+job.outfile+'\n' 2031 c+='#PBS -e '+job.errfile+'\n' 2032 if job.user_env: 2033 c+='#PBS -V\n' 2034 #end if 2035 c+=''' 2036echo $PBS_O_WORKDIR 2037cd $PBS_O_WORKDIR 2038''' 2039 return c 2040 #end def write_job_header 2041 2042 2043 def read_process_id(self,output): 2044 pid = None 2045 lines = output.splitlines() 2046 for line in lines: 2047 if 'oic.ornl.gov' in line: 2048 spid = line.split('.')[0] 2049 if spid.isdigit(): 2050 pid = int(spid) 2051 #end if 2052 #end if 2053 #end for 2054 return pid 2055 #end def read_process_id 2056#end class OIC5 2057 2058 2059 2060 2061class NerscMachine(Supercomputer): 2062 batch_capable = True 2063 2064 def write_job_header(self,job): 2065 if job.queue is None: 2066 job.queue = 'regular' 2067 #end if 2068 c='#!/bin/bash\n' 2069 c+='#SBATCH -p '+job.queue+'\n' 2070 c+='#SBATCH -J '+str(job.name)+'\n' 2071 c+='#SBATCH -t '+job.sbatch_walltime()+'\n' 2072 c+='#SBATCH -N '+str(job.nodes)+'\n' 2073 c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node) 2074 c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads) 2075 c+='#SBATCH -o '+job.outfile+'\n' 2076 c+='#SBATCH -e '+job.errfile+'\n' 2077 if job.user_env: 2078 c+='#SBATCH --export=ALL\n' # equiv to PBS -V 2079 else: 2080 c+='#SBATCH --export=NONE\n' 2081 #end if 2082 c+=''' 2083echo $SLURM_SUBMIT_DIR 2084cd $SLURM_SUBMIT_DIR 2085''' 2086 return c 2087 #end def write_job_header 2088#end class NerscMachine 2089 2090 2091class Edison(NerscMachine): 2092 name = 'edison' 2093#end class Edison 2094 2095 2096class Cori(NerscMachine): 2097 name = 'cori' 2098 2099 def pre_process_job(self,job): 2100 if job.queue is None: 2101 job.queue = 'regular' 2102 #end if 2103 if job.constraint is None: 2104 job.constraint = 'knl' 2105 #end if 2106 # account for dual nature of Cori 2107 if 'knl' in job.constraint: 2108 self.nodes = 9688 2109 self.procs_per_node = 1 2110 self.cores_per_node = 68 2111 self.ram_per_node = 96 2112 elif 'haswell' in job.constraint: 2113 self.nodes = 2388 2114 self.procs_per_node = 2 2115 self.cores_per_node = 32 2116 self.ram_per_node = 128 2117 elif 'amd' in job.constraint: 2118 self.nodes = 20 2119 self.procs_per_node = 2 2120 self.cores_per_node = 32 2121 self.ram_per_node = 2048 2122 else: 2123 self.error('SLURM input "constraint" must contain either "knl", "haswell", or "amd" on Cori\nyou provided: {0}'.format(job.constraint)) 2124 #end if 2125 if job.core_spec is not None: 2126 self.cores_per_node -= job.core_spec 2127 #end if 2128 #end def pre_process_job 2129 2130 def write_job_header(self,job): 2131 self.pre_process_job(job) # sync machine view with job 2132 if 'knl' in job.constraint: 2133 hyperthreads = 4 2134 elif 'haswell' in job.constraint: 2135 hyperthreads = 2 2136 elif 'amd' in job.constraint: 2137 hyperthreads = 2 2138 else: 2139 self.error('SLURM input "constraint" must contain either "knl", "haswell" or "amd" on Cori\nyou provided: {0}'.format(job.constraint)) 2140 #end if 2141 cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))*hyperthreads 2142 c='#!/bin/bash\n' 2143 if job.account is not None: 2144 c+= '#SBATCH -A '+job.account+'\n' 2145 #end if 2146 c+='#SBATCH -p '+job.queue+'\n' 2147 c+='#SBATCH -C '+str(job.constraint)+'\n' 2148 c+='#SBATCH -J '+str(job.name)+'\n' 2149 c+='#SBATCH -t '+job.sbatch_walltime()+'\n' 2150 c+='#SBATCH -N '+str(job.nodes)+'\n' 2151 c+='#SBATCH --tasks-per-node={0}\n'.format(job.processes_per_node) 2152 c+='#SBATCH --cpus-per-task={0}\n'.format(cpus_per_task) 2153 c+='#SBATCH -o '+job.outfile+'\n' 2154 c+='#SBATCH -e '+job.errfile+'\n' 2155 if job.user_env: 2156 c+='#SBATCH --export=ALL\n' # equiv to PBS -V 2157 else: 2158 c+='#SBATCH --export=NONE\n' 2159 #end if 2160 c+=''' 2161echo $SLURM_SUBMIT_DIR 2162cd $SLURM_SUBMIT_DIR 2163''' 2164 if job.threads>1: 2165 c+=''' 2166export OMP_PROC_BIND=true 2167export OMP_PLACES=threads 2168''' 2169 #end if 2170 return c 2171 #end def write_job_header 2172#end class Cori 2173 2174 2175 2176class BlueWatersXK(Supercomputer): 2177 2178 name = 'bluewaters_xk' 2179 requires_account = False 2180 batch_capable = True 2181 2182 def write_job_header(self,job): 2183 c='#!/bin/bash\n' 2184 c+='#PBS -N '+str(job.name)+'\n' 2185 c+='#PBS -l walltime='+job.pbs_walltime()+'\n' 2186 c+='#PBS -l nodes={0}:ppn={1}:xk\n'.format(job.nodes,job.ppn) 2187 c+='#PBS -o '+job.outfile+'\n' 2188 c+='#PBS -e '+job.errfile+'\n' 2189 if job.user_env: 2190 c+='#PBS -V\n' 2191 #end if 2192 c+=''' 2193echo $PBS_O_WORKDIR 2194cd $PBS_O_WORKDIR 2195''' 2196 return c 2197 #end def write_job_header 2198#end class BlueWatersXK 2199 2200 2201 2202 2203class BlueWatersXE(Supercomputer): 2204 2205 name = 'bluewaters_xe' 2206 requires_account = False 2207 batch_capable = True 2208 2209 def write_job_header(self,job): 2210 c='#!/bin/bash\n' 2211 c+='#PBS -N '+str(job.name)+'\n' 2212 c+='#PBS -l walltime='+job.pbs_walltime()+'\n' 2213 c+='#PBS -l nodes={0}:ppn={1}:xe\n'.format(job.nodes,job.ppn) 2214 c+='#PBS -o '+job.outfile+'\n' 2215 c+='#PBS -e '+job.errfile+'\n' 2216 if job.user_env: 2217 c+='#PBS -V\n' 2218 #end if 2219 c+=''' 2220echo $PBS_O_WORKDIR 2221cd $PBS_O_WORKDIR 2222''' 2223 return c 2224 #end def write_job_header 2225#end class BlueWatersXE 2226 2227 2228 2229 2230class Titan(Supercomputer): 2231 2232 name = 'titan' 2233 requires_account = True 2234 batch_capable = True 2235 2236 def write_job_header(self,job): 2237 if job.queue is None: 2238 job.queue = 'batch' 2239 #end if 2240 c= '#!/bin/bash\n' 2241 c+='#PBS -A {0}\n'.format(job.account) 2242 c+='#PBS -q {0}\n'.format(job.queue) 2243 c+='#PBS -N {0}\n'.format(job.name) 2244 c+='#PBS -o {0}\n'.format(job.outfile) 2245 c+='#PBS -e {0}\n'.format(job.errfile) 2246 c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime()) 2247 c+='#PBS -l nodes={0}\n'.format(job.nodes) 2248 #c+='#PBS -l gres=widow3\n' 2249 c+='#PBS -l gres=atlas1\n' 2250 if job.user_env: 2251 c+='#PBS -V\n' 2252 #end if 2253 c+=''' 2254echo $PBS_O_WORKDIR 2255cd $PBS_O_WORKDIR 2256''' 2257 return c 2258 #end def write_job_header 2259#end class Titan 2260 2261 2262 2263class EOS(Supercomputer): 2264 2265 name = 'eos' 2266 requires_account = True 2267 batch_capable = True 2268 2269 def post_process_job(self,job): 2270 if job.threads>1: 2271 if job.threads<=8: 2272 job.run_options.add(ss='-ss') 2273 #end if 2274 job.run_options.add(cc='-cc numa_node') 2275 #end if 2276 #end def post_process_job 2277 2278 2279 def write_job_header(self,job): 2280 if job.queue is None: 2281 job.queue = 'batch' 2282 #end if 2283 c= '#!/bin/bash\n' 2284 c+='#PBS -A {0}\n'.format(job.account) 2285 c+='#PBS -q {0}\n'.format(job.queue) 2286 c+='#PBS -N {0}\n'.format(job.name) 2287 c+='#PBS -o {0}\n'.format(job.outfile) 2288 c+='#PBS -e {0}\n'.format(job.errfile) 2289 c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime()) 2290 c+='#PBS -l nodes={0}\n'.format(job.nodes) 2291 c+='#PBS -l gres=atlas1\n' 2292 if job.user_env: 2293 c+='#PBS -V\n' 2294 #end if 2295 c+=''' 2296echo $PBS_O_WORKDIR 2297cd $PBS_O_WORKDIR 2298''' 2299 return c 2300 #end def write_job_header 2301#end class EOS 2302 2303 2304 2305class ALCF_Machine(Supercomputer): 2306 requires_account = True 2307 batch_capable = True 2308 executable_subfile = True 2309 2310 prefixed_output = True 2311 outfile_extension = '.output' 2312 errfile_extension = '.error' 2313 2314 base_partition = None 2315 2316 def post_process_job(self,job): 2317 job.sub_options.add( 2318 env = '--env BG_SHAREDMEMSIZE=32', 2319 mode = '--mode script' 2320 ) 2321 #if job.processes<job.nodes: # seems like a good idea, but breaks idempotency 2322 # job.processes_per_node=1 2323 ##end if 2324 if job.nodes<self.base_partition: 2325 self.warn('!!! ATTENTION !!!\n number of nodes on {0} should not be less than {1}\n you requested: {2}'.format(self.name,self.base_partition,job.nodes)) 2326 else: 2327 partition = log(float(job.nodes)/self.base_partition)/log(2.) 2328 if abs(partition-int(partition))>1e-6: 2329 self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be {1} times a power of two\n you requested: {2}\n nearby valid node count: {3}'.format(self.name,self.base_partition,job.nodes,self.base_partition*2**int(round(partition)))) 2330 #end if 2331 #end if 2332 valid_ppn = (1,2,4,8,16,32,64) 2333 if job.processes_per_node is None: 2334 self.warn('job may not run properly\nplease specify processes_per_node in each job to be launched with runjob on {0}'.format(self.name)) 2335 elif job.processes_per_node not in valid_ppn: 2336 self.warn('job may not run properly\nprocesses_per_node is not a valid value for {0}\nprocesses_per_node provided: {1}\nvalid options are: {2}'.format(self.name,job.processes_per_node,valid_ppn)) 2337 #end if 2338 #end def post_process_job 2339 2340 def write_job_header(self,job): 2341 if job.queue is None: 2342 job.queue = 'default' 2343 #end if 2344 c= '#!/bin/bash\n' 2345 c+='#COBALT -q {0}\n'.format(job.queue) 2346 c+='#COBALT -A {0}\n'.format(job.account) 2347 c+='#COBALT -n {0}\n'.format(job.nodes) 2348 c+='#COBALT -t {0}\n'.format(job.total_minutes()) 2349 c+='#COBALT -O {0}\n'.format(job.identifier) 2350 c+='\nLOCARGS="--block $COBALT_PARTNAME ${COBALT_CORNER:+--corner} $COBALT_CORNER ${COBALT_SHAPE:+--shape} $COBALT_SHAPE"\n' 2351 c+='echo "Cobalt location args: $LOCARGS" >&2\n\n' 2352 return c 2353 #end def write_job_header 2354#end class ALCF_Machine 2355 2356 2357class Vesta(ALCF_Machine): 2358 name = 'vesta' 2359 base_partition = 32 2360#end class Vesta 2361 2362class Cetus(ALCF_Machine): 2363 name = 'cetus' 2364 base_partition = 128 2365#end class Cetus 2366 2367class Mira(ALCF_Machine): 2368 name = 'mira' 2369 base_partition = 512 2370#end class Mira 2371 2372 2373 2374class Cooley(Supercomputer): 2375 name = 'cooley' 2376 requires_account = True 2377 batch_capable = True 2378 executable_subfile = True 2379 2380 prefixed_output = True 2381 outfile_extension = '.output' 2382 errfile_extension = '.error' 2383 2384 def post_process_job(self,job): 2385 #if job.processes_per_node is None and job.threads!=1: 2386 # self.error('threads must be 1,2,3,4,6, or 12 on Cooley\nyou provided: {0}'.format(job.threads)) 2387 ##end if 2388 2389 #job.run_options.add( 2390 #f = '-f $COBALT_NODEFILE', 2391 #ppn = '-ppn {0}'.format(job.processes_per_node), 2392 #) 2393 return 2394 #end def post_process_job 2395 2396 def write_job_header(self,job): 2397 if job.queue is None: 2398 job.queue = 'default' 2399 #end if 2400 c= '#!/bin/bash\n' 2401 c+='#COBALT -q {0}\n'.format(job.queue) 2402 c+='#COBALT -A {0}\n'.format(job.account) 2403 c+='#COBALT -n {0}\n'.format(job.nodes) 2404 c+='#COBALT -t {0}\n'.format(job.total_minutes()) 2405 c+='#COBALT -O {0}\n'.format(job.identifier) 2406 return c 2407 #end def write_job_header 2408#end class Cooley 2409 2410 2411class Theta(Supercomputer): 2412 name = 'theta' 2413 requires_account = True 2414 batch_capable = True 2415 executable_subfile = True 2416 2417 prefixed_output = True 2418 outfile_extension = '.output' 2419 errfile_extension = '.error' 2420 2421 def post_process_job(self,job): 2422 if job.hyperthreads is None: 2423 job.hyperthreads = 1 2424 #end if 2425 job.run_options.add( 2426 N = '-N {0}'.format(job.processes_per_node), 2427 cc = '-cc depth', 2428 d = '-d {0}'.format(job.threads), 2429 j = '-j {0}'.format(job.hyperthreads), 2430 e = '-e OMP_NUM_THREADS={0}'.format(job.threads), 2431 ) 2432 #end def post_process_job 2433 2434 def write_job_header(self,job): 2435 if job.queue is None: 2436 job.queue = 'default' 2437 #end if 2438 c= '#!/bin/bash\n' 2439 c+='#COBALT -q {0}\n'.format(job.queue) 2440 c+='#COBALT -A {0}\n'.format(job.account) 2441 c+='#COBALT -n {0}\n'.format(job.nodes) 2442 c+='#COBALT -t {0}\n'.format(job.total_minutes()) 2443 c+='#COBALT -O {0}\n'.format(job.identifier) 2444 c+='#COBALT --attrs mcdram=cache:numa=quad\n' 2445 return c 2446 #end def write_job_header 2447#end class Theta 2448 2449 2450 2451class Lonestar(Supercomputer): # Lonestar contribution from Paul Young 2452 2453 name = 'lonestar' # will be converted to lowercase anyway 2454 requires_account = False 2455 batch_capable = True 2456 2457 def write_job_header(self,job): 2458 if job.queue is None: 2459 job.queue = 'batch' 2460 #end if 2461 c= '#!/bin/bash\n' 2462 #c+='#$ -A {0}\n'.format(job.account) 2463 c+='#$ -q {0}\n'.format(job.queue) 2464 c+='#$ -N {0}\n'.format(job.name) 2465 c+='#$ -o {0}\n'.format(job.outfile) 2466 c+='#$ -e {0}\n'.format(job.errfile) 2467 c+='#$ -l h_rt={0}\n'.format(job.pbs_walltime()) 2468 c+='#$ -pe 12way {0}\n'.format(job.nodes*12) 2469 c+='#$ -cwd\n' 2470 if job.user_env: 2471 c+='#$ -V\n' 2472 #end if 2473 return c 2474 #end def write_job_header 2475 2476 2477 def read_process_id(self,output): 2478 pid = None 2479 lines = output.splitlines() 2480 2481 for line in lines: 2482 if 'Your job' in line: 2483 spid = line.split(' ')[2] 2484 if spid.isdigit(): 2485 pid = int(spid) 2486 #end if 2487 #end if 2488 #end for 2489 return pid 2490 #end def read_process_id 2491#end class Lonestar 2492 2493 2494 2495class ICMP_Machine(Supercomputer): # ICMP and Amos contributions from Ryan McAvoy 2496 batch_capable = True 2497 executable_subfile = True 2498 2499 prefixed_output = True 2500 outfile_extension = '.output' 2501 errfile_extension = '.error' 2502 2503 def write_job_header(self,job): 2504 if job.queue is None: 2505 job.queue = 'defq' 2506 #end if 2507 c= '#!/bin/bash -x\n' 2508 c+='#SBATCH --export=ALL\n' 2509 c+='#SBATCH -J {0}\n'.format(job.identifier) 2510 c+='#SBATCH -p {0}\n'.format(job.queue) 2511 c+='#SBATCH -o {0}\n'.format(job.outfile) 2512 c+='#SBATCH -e {0}\n'.format(job.errfile) 2513 c+='#SBATCH --nodes {0}\n'.format(job.nodes) 2514 c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node) 2515 c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads) 2516 c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2)) 2517 return c 2518 #end def write_job_header 2519#end class ICMP_Machine 2520 2521 2522class Komodo(ICMP_Machine): 2523 name = 'komodo' 2524#end class Komodo 2525 2526class Matisse(ICMP_Machine): 2527 name = 'matisse' 2528#end class Matisse 2529 2530 2531 2532class Amos(Supercomputer): 2533 name = 'amos' 2534 2535 #requires_account = True 2536 batch_capable = True 2537 executable_subfile = True 2538 2539 prefixed_output = True 2540 outfile_extension = '.output' 2541 errfile_extension = '.error' 2542 2543 def write_job_header(self,job): 2544 if job.queue is None: 2545 job.queue = 'debug' 2546 #end if 2547 if job.queue == 'debug': 2548 base_partition = 1 2549 max_partition = 32 2550 max_time =1 2551 elif job.queue == 'small': 2552 base_partition = 1 2553 max_partition = 64 2554 max_time =24 2555 elif job.queue == 'medium': 2556 base_partition = 128 2557 max_partition = 512 2558 max_time =12 2559 elif job.queue == 'large': 2560 base_partition = 1024 2561 max_partition = 2048 2562 max_time =6 2563 elif job.queue == 'verylarge': 2564 base_partition = 3072 2565 max_partition = 4096 2566 max_time =6 2567 #end if 2568 job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0 2569 if job.total_hours > max_time: 2570 self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours)) 2571 job.hours = max_time 2572 job.minutes =0 2573 job.seconds =0 2574 #end if 2575 if job.nodes<base_partition: 2576 self.warn('!!! ATTENTION !!!\n number of nodes in {0} should not be less than {1}\n you requested: {2}'.format(job.queue,base_partition,job.nodes)) 2577 elif job.nodes>max_partition: 2578 self.warn('!!! ATTENTION !!!\n number of nodes in {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_partition,job.nodes)) 2579 else: 2580 if job.queue != 'verylarge': 2581 partition = log(float(job.nodes)/base_partition)/log(2.) 2582 if abs(partition-int(partition))>1e-6: 2583 self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be {1} times a power of two\n you requested: {2}\n nearby valid node count: {3}'.format(self.name,base_partition,job.nodes,base_partition*2**int(round(partition)))) 2584 elif job.nodes != 3072 and job.nodes != 4096: 2585 self.warn('!!! ATTENTION !!!\n number of nodes on {0} must be 3072 or 4096 you requested {1}'.format(self.name,job.nodes)) 2586 #end if 2587 #end if 2588 2589 c= '#!/bin/bash -x\n' 2590 c+='#SBATCH --export=ALL\n' 2591 #c+=#SBATCH -D /gpfs/sb/data/<project>/<user>/ 2592 c+='#SBATCH -J {0}\n'.format(job.identifier) 2593 c+='#SBATCH -p {0}\n'.format(job.queue) 2594 c+='#SBATCH -o {0}\n'.format(job.outfile) 2595 c+='#SBATCH -e {0}\n'.format(job.errfile) 2596 c+='#SBATCH --nodes {0}\n'.format(job.nodes) 2597 c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node) 2598 c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads) 2599 c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2)) 2600 # c+='#SBATCH --mail-type=ALL' 2601 # c+='#SBATCH --mail-user=<{0}>' 2602 2603 return c 2604 #end def write_job_header 2605#end class Amos 2606 2607 2608class SnlMachine(Supercomputer): 2609 requires_account = True 2610 batch_capable = True 2611 #executable_subfile = True 2612 2613 prefixed_output = True 2614 outfile_extension = '.output' 2615 errfile_extension = '.error' 2616 2617 def write_job_header(self,job): 2618 if job.queue is None: 2619 job.queue='batch' 2620 #end if 2621 2622 cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node)) 2623 2624 if job.qos == 'long': 2625 max_time = 96 2626 elif 'short' in job.queue: 2627 max_time = 4 2628 else: 2629 max_time = 48 2630 #end if 2631 2632 job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0 2633 if job.total_hours > max_time: # warn if job will take more than 48 hrs. 2634 if job.qos == 'long': 2635 self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1} with --qos=\'long\'\n you requested: {2}'.format(job.queue,max_time,job.total_hours)) 2636 elif 'short' in job.queue: 2637 self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1} with -p short[,batch]\n you requested: {2}'.format(job.queue,max_time,job.total_hours)) 2638 else: 2639 self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours)) 2640 #end if 2641 job.hours = max_time 2642 job.minutes = 0 2643 job.seconds = 0 2644 #end if 2645 2646 c='#!/bin/bash\n' 2647 c+='#SBATCH -p '+str(job.queue)+'\n' 2648 c+='#SBATCH --job-name '+str(job.name)+'\n' 2649 c+='#SBATCH --account='+str(job.account)+'\n' 2650 c+='#SBATCH -N '+str(job.nodes)+'\n' 2651 c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node) 2652 c+='#SBATCH --cpus-per-task={0}\n'.format(cpus_per_task) 2653 c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2)) 2654 c+='#SBATCH -o {0}\n'.format(job.outfile) 2655 c+='#SBATCH -e {0}\n'.format(job.errfile) 2656 if job.qos: 2657 c+='#SBATCH --qos={}\n'.format(job.qos) 2658 c+='\n' 2659 return c 2660 #end def write_job_header 2661#end class SnlMachine 2662 2663class Chama(SnlMachine): 2664 name = 'chama' 2665#end class Chama 2666 2667class Skybridge(SnlMachine): 2668 name = 'skybridge' 2669#end class Skybridge 2670 2671class Eclipse(SnlMachine): 2672 name = 'eclipse' 2673#end class Eclipse 2674 2675class Attaway(SnlMachine): 2676 name = 'attaway' 2677#end class Attaway 2678 2679class Uno(SnlMachine): 2680 name = 'uno' 2681#end class Uno 2682 2683class Solo(SnlMachine): 2684 name = 'solo' 2685#end class Solo 2686 2687# machines at LRZ https://www.lrz.de/english/ 2688class SuperMUC(Supercomputer): 2689 name = 'supermuc' 2690 requires_account = False 2691 batch_capable = True 2692 query_with_username = False 2693 2694 def write_job_header(self,job): 2695 if job.queue is None: 2696 job.queue = 'general' 2697 #end if 2698 if job.type is None: 2699 job.type = 'MPICH' 2700 else: 2701 job.type = job.type.lower() 2702 if job.type=='mpich': 2703 job.type=job.type.upper() 2704 #end if 2705 #end if 2706 ibm = job.type=='parallel' 2707 intel = job.type=='MPICH' 2708 omp = isinstance(job.threads,int) and job.threads>1 2709 if not ibm and not intel: 2710 self.error('the only types of MPI supported are "parallel" and "MPICH"\nreceived MPI with type: {0}'.format(job.type)) 2711 #end if 2712 c ='#!/bin/bash\n' 2713 c+='#@ job_name = {0}\n'.format(job.name) 2714 c+='#@ job_type = {0}\n'.format(job.type) 2715 c+='#@ class = {0}\n'.format(job.queue) 2716 c+='#@ node = {0}\n'.format(job.nodes) 2717 if job.nodes<512: 2718 icmin = 1 2719 icmax = 1 2720 else: 2721 icmin = int(job.nodes//512)+1 2722 icmax = icmin+1 2723 #end if 2724 c+='#@ island_count = {0},{1}\n'.format(icmin,icmax) 2725 if intel and omp: 2726 c+='#@ tasks_per_node = {0}\n'.format(job.processes_per_node) 2727 else: 2728 c+='#@ total_tasks = {0}\n'.format(job.processes) 2729 #end if 2730 c+='#@ wall_clock_limit = {0}\n'.format(job.ll_walltime()) 2731 c+='#@ network.MPI = sn_all,not_shared,us\n' 2732 c+='#@ initialdir = {0}\n'.format(job.abs_dir) 2733 c+='#@ output = {0}\n'.format(job.outfile) 2734 c+='#@ error = {0}\n'.format(job.errfile) 2735 c+='#@ energy_policy_tag = my_energy_tag\n' 2736 c+='#@ minimize_time_to_solution = yes\n' 2737 if job.email is None: 2738 c+='#@ notification = never\n' 2739 else: 2740 c+='#@ notification = always\n' 2741 c+='#@ notify_user = {0}\n'.format(job.email) 2742 #end if 2743 c+='#@ queue\n' 2744 c+='. /etc/profile\n' 2745 c+='. /etc/profile.d/modules.sh\n' 2746 if ibm and omp: 2747 c+='export MP_SINGLE_THREAD=no\n' 2748 c+='export MP_TASK_AFFINITY=core:{0}\n'.format(job.threads) 2749 elif intel and not omp: 2750 c+='module unload mpi.ibm\n' 2751 c+='module load mpi.intel\n' 2752 elif intel and omp: 2753 c+='module unload mpi.ibm\n' 2754 c+='module load mpi.intel\n' 2755 c+='export OMP_NUM_THREADS={0}\n'.format(job.threads) 2756 #c+='module load mpi_pinning/hybrid_blocked\n' 2757 #end if 2758 return c 2759 #end def write_job_header 2760#end class SuperMUC 2761 2762 2763 2764class SuperMUC_NG(Supercomputer): 2765 name = 'supermucng' 2766 requires_account = True 2767 batch_capable = True 2768 query_with_username = True 2769 2770 def write_job_header(self,job): 2771 if job.queue is None: 2772 job.queue = 'general' 2773 #end if 2774 if job.hyperthreads is None: 2775 job.hyperthreads = job.processes_per_node//48 2776 if job.hyperthreads==0: 2777 job.hyperthreads=None 2778 #end if 2779 #end if 2780 if job.constraint is None: 2781 job.contraint = 'scratch&work' 2782 #end if 2783 c ='#!/bin/bash\n' 2784 c+='#SBATCH --account={}\n'.format(job.account) 2785 c+='#SBATCH --partition={}\n'.format(job.queue) 2786 c+='#SBATCH -J {}\n'.format(job.name) 2787 c+='#SBATCH --time={}\n'.format(job.sbatch_walltime()) 2788 c+='#SBATCH -o ./{}\n'.format(job.outfile) 2789 c+='#SBATCH -e ./{}\n'.format(job.errfile) 2790 if job.switches is not None: 2791 c+='#SBATCH --switches={}\n'.format(job.switches) 2792 #end if 2793 c+='#SBATCH --nodes={}\n'.format(job.nodes) 2794 c+='#SBATCH --ntasks-per-node={}\n'.format(job.processes_per_node) 2795 if job.ntasks_per_core is not None: 2796 c+='#SBATCH --ntasks-per-core={}\n'.format(job.ntasks_per_core) 2797 elif job.hyperthreads is not None: 2798 c+='#SBATCH --ntasks-per-core={}\n'.format(job.hyperthreads) 2799 #end if 2800 if not job.default_cpus_per_task: 2801 if job.cpus_per_task is None: 2802 c+='#SBATCH --cpus-per-task={}\n'.format(job.threads) 2803 else: 2804 c+='#SBATCH --cpus-per-task={}\n'.format(job.cpus_per_task) 2805 #end if 2806 #end if 2807 c+='#SBATCH -D ./\n' 2808 c+='#SBATCH --no-requeue\n' 2809 if job.constraint is not None: 2810 c+='#--constraint="{}"\n'.format(job.constraint) 2811 #end if 2812 if job.email is not None: 2813 c+='#SBATCH --mail-type=ALL\n' 2814 c+='#SBATCH --mail-user={}\n'.format(job.email) 2815 #end if 2816 c+='#SBATCH --export=NONE\n' 2817 if job.user_env: 2818 c+='#SBATCH --get-user-env\n' 2819 #end if 2820 return c 2821 #end def write_job_header 2822#end class SuperMUC_NG 2823 2824 2825 2826class Stampede2(Supercomputer): 2827 name = 'stampede2' 2828 2829 requires_account = True 2830 batch_capable = True 2831 #executable_subfile = True 2832 2833 prefixed_output = True 2834 outfile_extension = '.output' 2835 errfile_extension = '.error' 2836 2837 def write_job_header(self,job): 2838 if job.queue is None: 2839 job.queue='normal' 2840 #end if 2841 2842 if job.queue == 'development': 2843 max_nodes = 16 2844 max_time = 2 2845 elif job.queue == 'normal': 2846 max_nodes = 256 2847 max_time = 48 2848 elif job.queue == 'large': 2849 max_nodes = 2048 2850 max_time = 48 2851 elif job.queue == 'long': 2852 max_nodes = 32 2853 max_time = 96 2854 elif job.queue == 'flat_quadrant': 2855 max_nodes = 24 2856 max_time = 48 2857 elif job.queue == 'skx-dev': 2858 max_nodes = 4 2859 max_time = 2 2860 elif job.queue == 'skx-normal': 2861 max_nodes = 128 2862 max_time = 48 2863 elif job.queue == 'skx-large': 2864 max_nodes = 868 2865 max_time = 48 2866 #end if 2867 2868 if 'skx' in job.queue: 2869 max_processes_per_node = 48 2870 else: 2871 max_processes_per_node = 68 2872 #end if 2873 job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0 2874 if job.total_hours > max_time: 2875 self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours)) 2876 job.hours = max_time 2877 job.minutes =0 2878 job.seconds =0 2879 #end if 2880 2881 if job.nodes > max_nodes: 2882 self.warn('!!! ATTENTION !!!\n the maximum nodes on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_nodes,job.nodes)) 2883 job.nodes = max_nodes 2884 #end if 2885 2886 if job.processes_per_node > max_processes_per_node: 2887 self.warn('!!! ATTENTION !!!\n the maximum number of processes per node on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_processes_per_node,job.processes_per_node)) 2888 job.processes_per_node = max_processes_per_node 2889 #end if 2890 2891 c='#!/bin/bash\n' 2892 c+='#SBATCH --job-name '+str(job.name)+'\n' 2893 c+='#SBATCH --account='+str(job.account)+'\n' 2894 c+='#SBATCH -N '+str(job.nodes)+'\n' 2895 c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node) 2896 c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads) 2897 c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2)) 2898 c+='#SBATCH -o {0}\n'.format(job.outfile) 2899 c+='#SBATCH -e {0}\n'.format(job.errfile) 2900 c+='#SBATCH -p {0}\n'.format(job.queue) 2901 c+='\n' 2902 return c 2903 #end def write_job_header 2904#end class Stampede2 2905 2906 2907 2908class CadesMoab(Supercomputer): 2909 name = 'cades_moab' 2910 requires_account = True 2911 batch_capable = True 2912 2913 def post_process_job(self,job): 2914 ppn = job.processes_per_node 2915 if job.threads>1 and ppn is not None and ppn>1: 2916 processes_per_socket = int(floor(job.processes_per_node/2)) 2917 job.run_options.add(npersocket='--npersocket {0}'.format(processes_per_socket)) 2918 #end if 2919 #end def post_process_job 2920 2921 def write_job_header(self,job): 2922 if job.queue is None: 2923 job.queue = 'skylake' 2924 #end if 2925 if job.qos is None: 2926 job.qos = 'std' 2927 #end if 2928 if job.group_list is None: 2929 job.group_list = 'cades-'+job.account 2930 #end if 2931 c= '#!/bin/bash\n' 2932 c+='#PBS -A {0}\n'.format(job.account) 2933 c+='#PBS -W group_list={0}\n'.format(job.group_list) 2934 c+='#PBS -q {0}\n'.format(job.queue) 2935 c+='#PBS -N {0}\n'.format(job.name) 2936 c+='#PBS -o {0}\n'.format(job.outfile) 2937 c+='#PBS -e {0}\n'.format(job.errfile) 2938 c+='#PBS -l qos={0}\n'.format(job.qos) # This could be qos=burst as well, but then it can be cancelled by others 2939 c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime()) 2940 c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes, job.ppn) 2941 c+=''' 2942echo $PBS_O_WORKDIR 2943cd $PBS_O_WORKDIR 2944''' 2945 return c 2946 #end def write_job_header 2947#end class CadesMoab 2948 2949 2950 2951class CadesSlurm(Supercomputer): 2952 name = 'cades' 2953 requires_account = True 2954 batch_capable = True 2955 2956 def write_job_header(self,job): 2957 if job.queue is None: 2958 job.queue = 'skylake' 2959 #end if 2960 2961 c = '#!/bin/bash\n' 2962 c += '#SBATCH -A {}\n'.format(job.account) 2963 c += '#SBATCH -p {}\n'.format(job.queue) 2964 c += '#SBATCH -J {}\n'.format(job.name) 2965 c += '#SBATCH -t {}\n'.format(job.sbatch_walltime()) 2966 c += '#SBATCH -N {}\n'.format(job.nodes) 2967 c += '#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node) 2968 c += '#SBATCH --cpus-per-task={0}\n'.format(job.threads) 2969 c += '#SBATCH --mem=0\n' # required on Cades 2970 c += '#SBATCH -o '+job.outfile+'\n' 2971 c += '#SBATCH -e '+job.errfile+'\n' 2972 c += '#SBATCH --exclusive\n' 2973 if job.user_env: 2974 c += '#SBATCH --export=ALL\n' # equiv to PBS -V 2975 else: 2976 c += '#SBATCH --export=NONE\n' 2977 #end if 2978 2979 return c 2980 #end def write_job_header 2981#end class CadesSlurm 2982 2983 2984 2985class Summit(Supercomputer): 2986 2987 name = 'summit' 2988 requires_account = True 2989 batch_capable = True 2990 2991 def post_process_job(self,job): 2992 # add the options only if the user has not supplied options 2993 if len(job.run_options)==0: 2994 opt = obj( 2995 launch_dist = '-d packed', 2996 bind = '-b rs', 2997 ) 2998 if job.gpus is None: 2999 job.gpus = 6 # gpus to use per node 3000 #end if 3001 if job.alloc_flags is None: 3002 job.alloc_flags = 'smt1' 3003 #end if 3004 if job.gpus==0: 3005 if job.processes%2==0: 3006 resource_sets_per_node = 2 3007 else: 3008 resource_sets_per_node = 1 3009 #end if 3010 nrs = job.nodes*resource_sets_per_node 3011 pprs = job.processes_per_node//resource_sets_per_node 3012 gpurs = 0 3013 else: 3014 ppn = job.processes_per_node 3015 if ppn is None: 3016 self.warn('job may not run properly on Summit\nat least one mpi process should be present for each node\nplease check the generated bsub file for correctness') 3017 ppn = 0 3018 #end if 3019 if ppn%job.gpus!=0: 3020 self.warn('job may not run properly on Summit\nprocesses per node should divide evenly into number of gpus requested\nprocesses per node requested: {0}\ngpus per node requested: {1}\nplease check the generated bsub file for correctness'.format(job.processes_per_node,job.gpus)) 3021 #end if 3022 resource_sets_per_node = job.gpus 3023 nrs = job.nodes*resource_sets_per_node 3024 pprs = ppn//resource_sets_per_node 3025 gpurs = 1 3026 #end if 3027 opt.set( 3028 resource_sets= '-n {0}'.format(nrs), 3029 rs_per_node = '-r {0}'.format(resource_sets_per_node), 3030 tasks_per_rs = '-a {0}'.format(pprs), 3031 cpus_per_rs = '-c {0}'.format(pprs*job.threads), 3032 gpus_per_rs = '-g {0}'.format(gpurs), 3033 ) 3034 job.run_options.add(**opt) 3035 #end if 3036 #end def post_process_job 3037 3038 3039 def write_job_header(self,job): 3040 c ='#!/bin/bash\n' 3041 c+='#BSUB -P {0}\n'.format(job.account) 3042 c+='#BSUB -J {0}\n'.format(job.name) 3043 c+='#BSUB -o {0}\n'.format(job.outfile) 3044 c+='#BSUB -e {0}\n'.format(job.errfile) 3045 c+='#BSUB -W {0}\n'.format(job.lsf_walltime()) 3046 c+='#BSUB -nnodes {0}\n'.format(job.nodes) 3047 if job.alloc_flags is not None: 3048 c+='#BSUB -alloc_flags "{0}"\n'.format(job.alloc_flags) 3049 #end if 3050 return c 3051 #end def write_job_header 3052 3053 3054 def read_process_id(self,output): 3055 pid = None 3056 tokens = output.split() 3057 for t in tokens: 3058 if t.startswith('<'): 3059 spid = t.strip('<>').strip() 3060 if spid.isdigit(): 3061 pid = int(spid) 3062 break 3063 #end if 3064 #end if 3065 #end for 3066 return pid 3067 #end def read_process_id 3068#end class Summit 3069 3070 3071## Added 28/11/2019 by A Zen 3072class Rhea(Supercomputer): 3073 3074 name = 'rhea' 3075 requires_account = True 3076 batch_capable = True 3077 #executable_subfile = True 3078 prefixed_output = True 3079 outfile_extension = '.output' 3080 errfile_extension = '.error' 3081 3082 def post_process_job(self,job): 3083 job.run_options.add( 3084 N='-N {}'.format(job.nodes), 3085 n='-n {}'.format(job.processes), 3086 ) 3087 if job.threads>1: 3088 job.run_options.add( 3089 c = '-c {}'.format(job.threads), 3090 ) 3091 if 'cpu_bind' not in job.run_options: 3092 if job.processes_per_node==self.cores_per_node: 3093 cpu_bind = '--cpu-bind=threads' 3094 else: 3095 cpu_bind = '--cpu-bind=cores' 3096 #end if 3097 job.run_options.add( 3098 cpu_bind = cpu_bind 3099 ) 3100 #end if 3101 #end if 3102 #end def post_process_job 3103 3104 def write_job_header(self,job): 3105 if job.queue is None: 3106 job.queue='batch' 3107 #end if 3108 base_partition = None 3109 max_partition = 384 3110 if job.nodes <= 16: 3111 max_time = 48 3112 elif job.nodes <= 64: 3113 max_time = 36 3114 else: 3115 max_time = 3 3116 job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0 3117 if job.total_hours > max_time: # warn if job will take more than 96 hrs. 3118 self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours)) 3119 job.hours = max_time 3120 job.minutes =0 3121 job.seconds =0 3122 #end if 3123 3124 c='#!/bin/bash\n' 3125 c+='#SBATCH --job-name '+str(job.name)+'\n' 3126 c+='#SBATCH --account='+str(job.account)+'\n' 3127 c+='#SBATCH -N '+str(job.nodes)+'\n' 3128 c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2)) 3129 c+='#SBATCH -o {0}\n'.format(job.outfile) 3130 c+='#SBATCH -e {0}\n'.format(job.errfile) 3131 if job.email is not None: 3132 c+='#SBATCH --mail-user {}\n'.format(job.email) 3133 c+='#SBATCH --mail-type ALL\n' 3134 #c+='#SBATCH --mail-type FAIL\n' 3135 #end if 3136 c+='\n' 3137 c+='cd $SLURM_SUBMIT_DIR\n' 3138 c+='\n' 3139 c+='echo JobID : $SLURM_JOBID \n' 3140 c+='echo Number of nodes requested: $SLURM_JOB_NUM_NODES \n' 3141 c+='echo List of nodes assigned to the job: $SLURM_NODELIST \n' 3142 c+='\n' 3143 return c 3144 #end def write_job_header 3145#end class Rhea 3146 3147 3148## Added 19/03/2021 by A Zen 3149class Andes(Supercomputer): 3150 3151 name = 'andes' 3152 requires_account = True 3153 batch_capable = True 3154 #executable_subfile = True 3155 prefixed_output = True 3156 outfile_extension = '.output' 3157 errfile_extension = '.error' 3158 3159 def post_process_job(self,job): 3160 job.run_options.add( 3161 N='-N {}'.format(job.nodes), 3162 n='-n {}'.format(job.processes), 3163 ) 3164 if job.threads>1: 3165 job.run_options.add( 3166 c = '-c {}'.format(job.threads), 3167 ) 3168 if 'cpu_bind' not in job.run_options: 3169 if job.processes_per_node==self.cores_per_node: 3170 cpu_bind = '--cpu-bind=threads' 3171 else: 3172 cpu_bind = '--cpu-bind=cores' 3173 #end if 3174 job.run_options.add( 3175 cpu_bind = cpu_bind 3176 ) 3177 #end if 3178 #end if 3179 #end def post_process_job 3180 3181 def write_job_header(self,job): 3182 if job.queue is None: 3183 job.queue='batch' 3184 #end if 3185 base_partition = None 3186 max_partition = 384 3187 if job.nodes <= 16: 3188 max_time = 48 3189 elif job.nodes <= 64: 3190 max_time = 36 3191 else: 3192 max_time = 3 3193 job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0 3194 if job.total_hours > max_time: # warn if job will take more than 96 hrs. 3195 self.warn('!!! ATTENTION !!!\n the maximum runtime on {0} should not be more than {1}\n you requested: {2}'.format(job.queue,max_time,job.total_hours)) 3196 job.hours = max_time 3197 job.minutes =0 3198 job.seconds =0 3199 #end if 3200 3201 c='#!/bin/bash\n' 3202 c+='#SBATCH --job-name '+str(job.name)+'\n' 3203 c+='#SBATCH --account='+str(job.account)+'\n' 3204 c+='#SBATCH -N '+str(job.nodes)+'\n' 3205 c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2)) 3206 c+='#SBATCH -o {0}\n'.format(job.outfile) 3207 c+='#SBATCH -e {0}\n'.format(job.errfile) 3208 if job.email is not None: 3209 c+='#SBATCH --mail-user {}\n'.format(job.email) 3210 c+='#SBATCH --mail-type ALL\n' 3211 #c+='#SBATCH --mail-type FAIL\n' 3212 #end if 3213 c+='\n' 3214 c+='cd $SLURM_SUBMIT_DIR\n' 3215 c+='\n' 3216 c+='echo JobID : $SLURM_JOBID \n' 3217 c+='echo Number of nodes requested: $SLURM_JOB_NUM_NODES \n' 3218 c+='echo List of nodes assigned to the job: $SLURM_NODELIST \n' 3219 c+='\n' 3220 return c 3221 #end def write_job_header 3222#end class Andes 3223 3224 3225class Tomcat3(Supercomputer): 3226 name = 'tomcat3' 3227 requires_account = False 3228 batch_capable = True 3229 redirect_output = True 3230 3231 def write_job_header(self,job): 3232 if job.queue is None: 3233 job.queue = 'tomcat' 3234 #end if 3235 c = '#!/bin/bash -l\n' 3236 c+='#SBATCH -J {}\n'.format(job.name) 3237 c+='#SBATCH -N {}\n'.format(job.nodes) 3238 c+='#SBATCH -t {}\n'.format(job.sbatch_walltime()) 3239 c+='#SBATCH -p {}\n'.format(job.queue) 3240 if job.email is not None: 3241 c+='#SBATCH --mail-user {}\n'.format(job.email) 3242 c+='#SBATCH --mail-type ALL\n' 3243 #end if 3244 c+='#. /home/rcohen/.bashrc\n' 3245 if len(job.presub)==0: 3246 c+='unalias cd; source /mnt/beegfs/intel/parallel_studio_xe_2019.3.062/bin/psxevars.sh\n' 3247 c+='ulimit -a\n' 3248 #end if 3249 return c 3250 #end def write_job_header 3251#end class Tomcat3 3252 3253 3254 3255 3256#Known machines 3257# workstations 3258for cores in range(1,128+1): 3259 Workstation('ws'+str(cores),cores,'mpirun'), 3260#end for 3261# supercomputers and clusters 3262# nodes sockets cores ram qslots qlaunch qsubmit qstatus qdelete 3263Jaguar( 18688, 2, 8, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel') 3264Kraken( 9408, 2, 6, 16, 100, 'aprun', 'qsub', 'qstat', 'qdel') 3265Golub( 512, 2, 6, 32, 1000, 'mpirun', 'qsub', 'qstat', 'qdel') 3266OIC5( 28, 2, 16, 128, 1000, 'mpirun', 'qsub', 'qstat', 'qdel') 3267Edison( 664, 2, 12, 64, 100, 'srun', 'sbatch', 'squeue', 'scancel') 3268Cori( 9688, 1, 68, 96, 100, 'srun', 'sbatch', 'squeue', 'scancel') 3269BlueWatersXK( 3072, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel') 3270BlueWatersXE(22640, 2, 16, 64, 100, 'aprun', 'qsub', 'qstat', 'qdel') 3271Titan( 18688, 1, 16, 32, 100, 'aprun', 'qsub', 'qstat', 'qdel') 3272EOS( 744, 2, 8, 64, 1000, 'aprun', 'qsub', 'qstat', 'qdel') 3273Vesta( 2048, 1, 16, 16, 10, 'runjob', 'qsub', 'qstata', 'qdel') 3274Cetus( 1024, 1, 16, 16, 10, 'runjob', 'qsub', 'qstata', 'qdel') 3275Mira( 49152, 1, 16, 16, 10, 'runjob', 'qsub', 'qstata', 'qdel') 3276Cooley( 126, 2, 6, 384, 10, 'mpirun', 'qsub', 'qstata', 'qdel') 3277Theta( 4392, 1, 64, 192, 1000, 'aprun', 'qsub', 'qstata', 'qdel') 3278Lonestar( 22656, 2, 6, 12, 128, 'ibrun', 'qsub', 'qstat', 'qdel') 3279Matisse( 20, 2, 8, 64, 2, 'mpirun', 'sbatch', 'sacct', 'scancel') 3280Komodo( 24, 2, 6, 48, 2, 'mpirun', 'sbatch', 'sacct', 'scancel') 3281Amos( 5120, 1, 16, 16, 128, 'srun', 'sbatch', 'sacct', 'scancel') 3282Chama( 1232, 2, 8, 64, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3283Uno( 168, 2, 8, 128, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3284Eclipse( 1488, 2, 18, 128, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3285Attaway( 1488, 2, 18, 192, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3286Skybridge( 1848, 2, 8, 64, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3287Solo( 374, 2, 18, 128, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3288SuperMUC( 512, 1, 28, 256, 8,'mpiexec', 'llsubmit', 'llq','llcancel') 3289Stampede2( 4200, 1, 68, 96, 50, 'ibrun', 'sbatch', 'squeue', 'scancel') 3290CadesMoab( 156, 2, 18, 128, 100, 'mpirun', 'qsub', 'qstat', 'qdel') 3291CadesSlurm( 156, 2, 18, 128, 100, 'mpirun', 'sbatch', 'squeue', 'scancel') 3292Summit( 4608, 2, 21, 512, 100, 'jsrun', 'bsub', 'bjobs', 'bkill') 3293Rhea( 512, 2, 8, 128, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3294Andes( 704, 2, 16, 256, 1000, 'srun', 'sbatch', 'squeue', 'scancel') 3295Tomcat3( 8, 1, 64, 192, 1000, 'mpirun', 'sbatch', 'sacct', 'scancel') 3296SuperMUC_NG( 6336, 1, 48, 96, 1000,'mpiexec', 'sbatch', 'sacct', 'scancel') 3297 3298 3299#machine accessor functions 3300get_machine_name = Machine.get_hostname 3301get_machine = Machine.get 3302 3303#rename Job with lowercase 3304job=Job 3305 3306 3307 3308 3309