1##################################################################
2##  (c) Copyright 2015-  by Jaron T. Krogel                     ##
3##################################################################
4
5
6#====================================================================#
7#  machines.py                                                       #
8#    Representations of local machine environments including         #
9#    workstations and supercomputers and the jobs that will be       #
10#    executed on them.                                               #
11#                                                                    #
12#  Content summary:                                                  #
13#    Job                                                             #
14#      Class to represent a generic simulation job.                  #
15#                                                                    #
16#    Machine                                                         #
17#      Represents a generic machine.                                 #
18#      Base class for specific machine types.                        #
19#                                                                    #
20#    Workstation                                                     #
21#      Represents a workstation with a fixed number of cores.        #
22#                                                                    #
23#    InteractiveCluster                                              #
24#      Represents a supercomputer in interactive mode.               #
25#      Similar to a workstation with many cores.                     #
26#                                                                    #
27#    Supercomputer                                                   #
28#      Represents a generic supercomputer with a batch queue.        #
29#      Base class for specific supercomputers.                       #
30#      See Jaguar, Kraken, Golub, OIC5, Hopper, Edison, BlueWatersXE,#
31#        BlueWatersXK, Titan, EOS, Vesta, Cetus, Mira, Lonestar,     #
32#        Matisse, Komodo, and Amos                                   #
33#                                                                    #
34#    cpu_count                                                       #
35#      Function to return the number of cores on the local machine.  #
36#                                                                    #
37#    Options                                                         #
38#      Class representing command line options for a simulation job, #
39#      including arguments to the simulation executable,             #
40#      the run launcher (aprun/mpirun, etc), and the job submitter   #
41#      (e.g. qsub).                                                  #
42#                                                                    #
43#====================================================================#
44
45
46import os
47import time
48#from multiprocessing import cpu_count
49from socket import gethostname
50from subprocess import Popen,PIPE
51from numpy import array,mod,floor,ceil,round,log,empty
52from generic import obj
53from developer import DevBase,to_str
54from nexus_base import NexusCore,nexus_core
55from execute import execute
56from debug import *
57from imp import load_source
58
59
60import re,subprocess
61def cpu_count():
62    """ Number of virtual or physical CPUs on this system, i.e.
63    user/real as output by time(1) when called with an optimally scaling
64    userspace-only program"""
65
66    # Python 2.6+
67    try:
68        import multiprocessing
69        return multiprocessing.cpu_count()
70    except (ImportError,NotImplementedError):
71        None
72    #end try
73
74    # POSIX
75    try:
76        res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
77
78        if res > 0:
79            return res
80        #end if
81    except (AttributeError,ValueError):
82        None
83    #end try
84#end def cpu_count
85
86
87
88class Options(DevBase):
89    def __init__(self,**kwargs):
90        self.add(**kwargs)
91    #end def __init__
92
93
94    def add(self,**kwargs):
95        self.transfer_from(kwargs)
96    #end def add
97
98
99    def read(self,options):
100        nopts = 0
101        intext = False
102        nstart = -2
103        nend   = -2
104        n = 0
105        for c in options:
106            if c=='-' and nstart!=n-1 and not intext:
107                prevdash=True
108                if nopts>0:
109                    opt  = options[nstart:n].strip()
110                    name = opt.replace('-','').replace('=',' ').split()[0]
111                    self[name]=opt
112                #end if
113                nopts+=1
114                nstart = n
115            elif c=='"' or c=="'":
116                intext=not intext
117            #end if
118            n+=1
119        #end for
120        if nopts>0:
121            opt  = options[nstart:n].strip()
122            name = opt.replace('-','').replace('=',' ').split()[0]
123            self[name]=opt
124        #end if
125    #end def read
126
127
128    def write(self):
129        s = ''
130        for k in sorted(self.keys()):
131            s += ' '+str(self[k])
132        #end for
133        return s
134    #end def write
135#end class Options
136
137
138
139job_defaults_assign = obj(
140    name               = 'jobname',
141    type               = None,
142    directory          = None,
143    subdir             = None,
144    app_name           = None, # name of/path to application
145    app_command        = None, # command used to launch application
146    app_props          = None,
147    full_command       = None, # custom command including e.g. mpirun
148    outfile            = None,
149    errfile            = None,
150    env                = None,
151    user_env           = True, # import user environment
152    presub             = '',   # shell text executed just prior to submission
153    postsub            = '',   # shell text executed just after submission
154    queue              = None,
155    bundled_jobs       = None,
156    relative           = False,
157    cores              = None, # number of cores for the job
158    nodes              = None, # number of nodes for the job
159    threads            = 1,    # number of openmp threads for the job
160    hyperthreads       = None,
161    ppn                = None,
162    gpus               = None, # number of gpus per node
163    serial             = False, # run job serially, no mpi
164    local              = False, # run job locally, no queue submission
165    days               = 0,
166    hours              = 0,
167    minutes            = 0,
168    seconds            = 0,
169    subfile            = None,
170    grains             = None,
171    procs              = None,
172    processes          = None,
173    processes_per_proc = None,
174    processes_per_node = None,
175    account            = None,
176    email              = None,
177    constraint         = None, # slurm specific, Cori
178    core_spec          = None, # slurm specific, Cori
179    switches           = None, # slurm specific, SuperMUC-NG
180    alloc_flags        = None, # lsf specific, Summit
181    qos                = None,
182    group_list         = None,
183    default_cpus_per_task = False, # optionally bypass typical nexus processing for supermucng
184    ntasks_per_core    = None,
185    cpus_per_task      = None,
186    )
187
188    # these are not assigned directly
189job_defaults_nonassign = obj(
190    fake               = False,
191    app                = None, # name of/path to application
192    machine            = None,
193    options            = None,
194    app_flags          = None,
195    app_options        = None,
196    run_options        = None,
197    sub_options        = None,
198    skip_machine       = False,
199    )
200
201job_defaults = obj(job_defaults_assign,job_defaults_nonassign)
202
203class Job(NexusCore):
204
205    machine = None #default machine if none is specified in settings
206
207    states = obj(
208        none      = 0,
209        waiting   = 1,
210        submitted = 2,
211        running   = 3,
212        finished  = 4
213        )
214    state_names = states.inverse()
215
216    job_count = 0
217
218
219    @staticmethod
220    def restore_default_settings():
221        Job.machine = None
222    #end def restore_default_settings
223
224
225    @staticmethod
226    def generate_jobid():
227        Job.job_count += 1
228        return Job.job_count
229    #end def generate_jobid
230
231
232    @classmethod
233    def zero_time(cls):
234        time = obj(days=0,hours=0,minutes=0,seconds=0)
235        return time
236    #end def zero_time
237
238
239    def __init__(self,**kwargs):
240        # rewrap keyword arguments
241        kw = obj(**kwargs)
242
243        # save information used to initialize job object
244        self.init_info = kw.copy()
245
246        # set defaults
247        kw.set_optional(**job_defaults)
248
249        # extract keywords not assigned
250        app          = kw.delete('app')
251        machine      = kw.delete('machine')
252        options      = kw.delete('options')
253        app_flags    = kw.delete('app_flags')
254        app_options  = kw.delete('app_options')
255        run_options  = kw.delete('run_options')
256        sub_options  = kw.delete('sub_options')
257        env          = kw.delete('env')
258        fake         = kw.delete('fake')
259        skip_machine = kw.delete('skip_machine')
260
261        # assign keywords
262        self.set(**kw)
263
264        # assign fake job
265        self.fake_job           = fake
266
267        # initialize other internal variables
268        self.app_options        = Options()
269        self.run_options        = Options()
270        self.sub_options        = Options()
271        self.env                = None
272        self.internal_id        = None
273        self.system_id          = None
274        self.tot_cores          = None
275        self.identifier         = None
276        self.submitted          = False
277        self.status             = self.states.none
278        self.crashed            = False
279        self.overtime           = False
280        self.successful         = False
281        self.finished           = False
282
283        self.user_app_command = self.app_command is not None
284
285        if app is not None:
286            self.app_name = app
287        #end if
288        if app_options is not None:
289            self.app_options.read(app_options)
290        #end if
291        if run_options is not None:
292            self.run_options.read(run_options)
293        #end if
294        if sub_options is not None:
295            self.sub_options.read(sub_options)
296        #end if
297        if app_flags is not None:
298            self.app_options.read(app_flags)
299        #end if
300        if options is not None:
301            self.run_options.read(options)
302        #end if
303
304        if self.app_props is None:
305            self.app_props = []
306        #end if
307
308        if self.serial:
309            self.cores = 1
310            self.nodes = None
311        #end if
312
313        if skip_machine:
314            self.machine = machine
315        else:
316            if machine is not None:
317                self.machine = machine
318            #end if
319            if env is not None:
320                self.set_environment(**env)
321            #end if
322            #check that the machine exists and have it complete the job info
323            self.process()
324
325            machine = self.get_machine()
326            self.batch_mode = machine.in_batch_mode()
327
328            if self.bundled_jobs is not None and not machine.batch_capable:
329                self.error('running batched/bundled jobs on {0} is either not possible or not yet implemented, sorry.'.format(machine.name))
330            #end if
331        #end if
332
333        self.normalize_time()
334    #end def __init__
335
336
337    def get_machine(self):
338        return Machine.get(self.machine)
339    #end def get_machine
340
341
342    def process(self,machine=None):
343        if machine is None:
344            machine = self.get_machine()
345        #end if
346        machine.process_job(self)
347    #end def process
348
349
350    # test needed
351    def process_options(self,machine=None):
352        if machine is None:
353            machine = self.get_machine()
354        #end if
355        machine.process_job_options(self)
356    #end def process_options
357
358
359    # test needed
360    def initialize(self,sim):
361        self.set_id()
362        self.identifier = sim.identifier
363        machine = self.get_machine()
364        if machine.prefixed_output:
365            sim.outfile = sim.identifier + machine.outfile_extension
366            sim.errfile = sim.identifier + machine.errfile_extension
367        #end if
368        if self.directory is None:
369            self.directory = sim.locdir
370            self.abs_dir   = os.path.abspath(sim.locdir)
371        elif self.abs_dir is None:
372            self.abs_dir = os.path.abspath(self.directory)
373        #end if
374        if self.subdir is None:
375            if machine.local_directory!=None:
376                self.subdir = os.path.join(machine.local_directory,nexus_core.runs,sim.path)
377                self.abs_subdir = self.subdir
378            else:
379                self.subdir = self.directory
380                self.abs_subdir = self.abs_dir
381            #end if
382        #end if
383        if self.app_name is None:
384            app_name = sim.app_name
385        else:
386            app_name = self.app_name
387        #end if
388        if app_name!=None and not '/' in app_name:
389            ads = machine.app_directories
390            ad  = machine.app_directory
391            new_app_name = None
392            if ads!=None and self.app_name in ads:
393                new_app_name = os.path.join(ads[self.app_name],app_name)
394            elif ad!=None:
395                new_app_name = os.path.join(ad,app_name)
396            #end if
397            if new_app_name!=None:
398                self.app_name = new_app_name
399            #end if
400        #end if
401        sim.set_app_name(app_name)
402        self.set(
403            name    = sim.identifier,
404            simid   = sim.simid,
405            outfile = sim.outfile,
406            errfile = sim.errfile
407            )
408        if self.app_command is None:
409            self.app_command = sim.app_command()
410        #end if
411        if self.app_props==None:
412            self.app_props   = list(sim.app_props)
413        #end if
414        # ensure job is processed properly by this initialization stage
415        self.process()
416    #end def initialize
417
418
419    # test needed
420    def renew_app_command(self,sim):
421        if not self.user_app_command:
422            self.app_command = sim.app_command()
423        #end if
424    #end def renew_app_command
425
426
427    def set_id(self):
428        self.internal_id = Job.generate_jobid()
429    #end def set_id
430
431
432    # remove?
433    def set_processes(self):
434        if self.processes is None:
435            self.error('processes should have been set before now\ncontact the developers and have them fix this','Developer')
436            self.processes = int(ceil(float(self.cores)/self.threads))
437        #end if
438    #end def set_processes
439
440
441    def set_environment(self,limited_env=False,clear_env=False,**env):
442        machine = self.get_machine()
443        if isinstance(machine,Supercomputer):
444            limited_env = True
445        #end if
446        if self.env is None:
447            self.env = os.environ.copy()
448            if limited_env:
449                self.env.clear()
450            #end if
451        #end if
452        if clear_env:
453            self.env.clear()
454        #end if
455        for n,v in env.items():
456            self.env[n]=str(v)
457        #end for
458    #end def set_environment
459
460
461    def divert_out_err(self):
462        self.identifier += '_divert'
463    #end def divert_out_err
464
465
466    def get_time(self):
467        time = obj(
468            days = self.days,
469            hours = self.hours,
470            minutes = self.minutes,
471            seconds = self.seconds
472            )
473        return time
474    #end def get_time
475
476
477    def max_time(self,time):
478        t  = time.seconds + 60*(time.minutes+60*(time.hours+24*time.days))
479        ts = self.seconds + 60*(self.minutes+60*(self.hours+24*self.days))
480        if ts>t:
481            time.days    = self.days
482            time.hours   = self.hours
483            time.minutes = self.minutes
484            time.seconds = self.seconds
485        #end if
486        return time
487    #end def max_time
488
489
490    def serial_only(self):
491        return 'serial' in self.app_props and len(self.app_props)==1
492    #end if
493
494
495    # remove?
496    def determine_end_status(self,status):
497        if not nexus_core.generate_only:
498            self.successful = False # not really implemented yet
499        #end if
500    #end def determine_end_status
501
502
503    # test needed
504    def write(self,file=False):
505        machine = self.get_machine()
506        return machine.write_job(self,file=file)
507    #end def write
508
509
510    # test needed
511    def submit(self):
512        machine = self.get_machine()
513        machine.add_job(self)
514        self.submitted = True
515    #end def submit
516
517
518    # test needed
519    def reenter_queue(self):
520        machine = self.get_machine()
521        machine.requeue_job(self)
522    #end def reenter_queue
523
524
525    def run_command(self,launcher=None,redirect=False,serial=False):
526        machine = self.get_machine()
527        if launcher is None:
528            launcher = machine.app_launcher
529        #end if
530        c = ''
531        if self.bundled_jobs is None:
532            if self.full_command is not None:
533                c = self.full_command
534            else:
535                if self.app_command is None:
536                    self.error('app_command has not been provided')
537                #end if
538                if launcher=='runjob':
539                    separator = ' : '
540                else:
541                    separator = ' '
542                #end if
543                if self.serial and self.processes==1:
544                    c = ''
545                else:
546                    c = launcher + self.run_options.write() + separator
547                #end if
548                c+=self.app_command+self.app_options.write()
549                if redirect:
550                    c+=' >'+self.outfile+' 2>'+self.errfile
551                    if not serial:
552                        c+='&'
553                    #end if
554                elif machine.redirect_output and self.outfile is not None:
555                    c+=' >'+self.outfile+' 2>'+self.errfile
556                #end if
557            #end if
558        elif self.relative:
559            cdir = self.abs_subdir
560            c+='\n'
561            for job in self.bundled_jobs:
562                c+='\ncd '+os.path.relpath(job.abs_subdir,cdir)+'\n'
563                c+=job.run_command(launcher,redirect=True,serial=serial)+'\n'
564                cdir = job.abs_subdir
565            #end for
566            c+='\nwait\n'
567        else:
568            c+='\n'
569            for job in self.bundled_jobs:
570                c+='\ncd '+job.abs_subdir+'\n'
571                c+=job.run_command(launcher,redirect=True,serial=serial)+'\n'
572            #end for
573            c+='\nwait\n'
574        #end if
575        return c
576    #end def run_command
577
578
579    def pbs_walltime(self):
580        walltime=\
581            str(int(self.hours   )).zfill(2)+':'\
582            +str(int(self.minutes)).zfill(2)+':'\
583            +str(int(self.seconds)).zfill(2)
584        if self.days!=0:
585            walltime = str(self.days)+':'+walltime
586        #end if
587        return walltime
588    #end def pbs_walltime
589
590
591    def sbatch_walltime(self):
592        walltime=\
593            str(int(24*self.days+self.hours)).zfill(2)+':'\
594            +str(int(self.minutes)).zfill(2)+':'\
595            +str(int(self.seconds)).zfill(2)
596        return walltime
597    #end def sbatch_walltime
598
599
600    def ll_walltime(self):
601        walltime=\
602            str(int(24*self.days+self.hours)).zfill(2)+':'\
603            +str(int(self.minutes)).zfill(2)+':'\
604            +str(int(self.seconds)).zfill(2)
605        return walltime
606    #end def ll_walltime
607
608
609    def lsf_walltime(self):
610        walltime=\
611            str(int(24*self.days+self.hours)).zfill(2)+':'\
612            +str(int(self.minutes)).zfill(2)
613        return walltime
614    #end def lsf_walltime
615
616
617    def normalize_time(self):
618        t = self.total_seconds()
619        d = int(t/(24*3600))
620        t -= d*24*3600
621        h = int(t/3600)
622        t -= h*3600
623        m = int(t/60)
624        t -= m*60
625        s = int(t)
626        self.days    = d
627        self.hours   = h
628        self.minutes = m
629        self.seconds = s
630    #end def normalize_time
631
632
633    def total_seconds(self):
634        return self.seconds+60*(self.minutes+60*(self.hours+24*self.days))
635    #end def total_seconds
636
637
638    def total_minutes(self):
639        return int(self.total_seconds()/60)
640    #end def total_minutes
641
642
643    def total_hours(self):
644        return int(self.total_seconds()/3600)
645    #end def total_hours
646
647
648    def total_days(self):
649        return int(self.total_seconds()/(24*3600))
650    #end def total_days
651
652
653    def clone(self):
654        job = self.copy()
655        job.set_id()
656        return job
657    #end def clone
658
659
660    def serial_clone(self):
661        kw = self.init_info.copy()
662        kw.serial=True
663        return Job(**kw)
664    #end def serial_clone
665
666
667    def split_nodes(self,n):
668        run_options = self.run_options
669        if not isinstance(n,int):
670            self.error('cannot split job by nodes\nrequested split value must be an integer\nreceived type: {0}\nwith value: {1}'.format(n.__class__.__name__,n))
671        elif n<1 or n>=self.nodes:
672            self.error('cannot split job by nodes\nrequested split must be in the range [1,{0})\nrequested split: {1}'.format(self.nodes,n))
673        #end if
674        m = self.get_machine()
675        if m.app_launcher=='srun':
676            self.error('splitting jobs by nodes is not currently supported on machine "{0}" (SLURM)'.format(m.name))
677        #end if
678        job1 = self.clone()
679        job2 = self.clone()
680        job1.nodes = n
681        job2.nodes = self.nodes - n
682        m.process_job(job1)
683        m.process_job(job2)
684        return job1,job2
685    #end def split_nodes
686#end class Job
687
688
689
690
691class Machine(NexusCore):
692
693    machines = obj()
694
695    modes = obj(
696        none        = 0,
697        interactive = 1,
698        batch       = 2
699        )
700    mode = modes.none
701
702    batch_capable       = False
703    requires_account    = False
704    executable_subfile  = False
705    redirect_output     = False
706    query_with_username = False
707
708    prefixed_output    = False
709    outfile_extension  = None
710    errfile_extension  = None
711
712    allow_warnings = True
713
714    @staticmethod
715    def get_hostname():
716        hostname = gethostname()
717        if '.' in hostname:
718            machine_name = hostname.split('.')[0]
719        else:
720            machine_name = hostname
721        #end if
722        return machine_name.lower()
723    #end def get_hostname
724
725
726    @staticmethod
727    def exists(machine_name):
728        return machine_name in Machine.machines
729    #end def exists
730
731
732    @staticmethod
733    def is_unique(machine):
734        return id(machine)==id(Machine.machines[machine.name])
735    #end def is_unique
736
737
738    @staticmethod
739    def add(machine):
740        if not isinstance(machine,Machine):
741            Machine.class_error('attempted to add non-machine instance')
742        #end if
743        if not 'name' in machine:
744            Machine.class_error('attempted to add a machine without a name')
745        #end if
746        name = machine.name
747        if not name in Machine.machines:
748            Machine.machines[name] = machine
749        else:
750            Machine.class_error('attempted to create machine {0}, but it already exists'.format(name))
751        #end if
752    #end def add
753
754
755    @staticmethod
756    def get(machine_name):
757        if isinstance(machine_name,str):
758            machine_name = machine_name.lower()
759        else:
760            Machine.class_error('machine name must be a string, you provided a '+machine_name.__class__.__name__)
761        #end if
762        if Machine.exists(machine_name):
763            machine = Machine.machines[machine_name]
764        else:
765            machs = sorted(Machine.machines.keys())
766            Machine.class_error('attempted to get machine '+machine_name+', but it is unknown\nknown options are '+str(machs))
767        #end if
768        return machine
769    #end def get
770
771
772    def warn(self,*args,**kwargs):
773        if Machine.allow_warnings:
774            NexusCore.warn(self,*args,**kwargs)
775        #end if
776    #end def warn
777
778
779    def validate(self):
780        if Machine.exists(self.name):
781            if not Machine.is_unique(self):
782                self.error('duplicate instance of machine '+self.name+' encountered\n  this is either a developer error, or you have created a duplicate machine')
783            #end if
784        else:
785            self.error('machine {0} id {1} was created without calling Machine.__init__() and is therefore invalid'.format(self.name,id(self)))
786        #end if
787    #end def validate
788
789
790    def in_batch_mode(self):
791        return self.mode==self.modes.batch
792    #end def in_batch_mode
793
794
795    def query_queue(self):
796        self.not_implemented()
797    #end def query_queue
798
799    def submit_jobs(self):
800        self.not_implemented()
801    #end def submit_jobs
802
803    # update all job information, must be idempotent
804    def process_job(self,job):
805        self.not_implemented()
806    #end def process_job
807
808    def process_job_options(self,job):
809        self.not_implemented()
810    #end def process_job_options
811
812    def write_job(self,job,file=False):
813        self.not_implemented()
814    #end def write_job
815
816    def submit_job(self,job):
817        self.not_implemented()
818    #end def submit_job
819
820
821    def __init__(self,name,queue_size=0):
822        self.name = name
823        self.queue_size = queue_size
824        self.processes = obj()
825        self.jobs = obj()
826        self.waiting = set()
827        self.running = set()
828        self.finished= set()
829
830        #user defined variables
831        self.account         = None
832        self.user            = None
833        self.local_directory = None
834        self.app_directory   = None
835        self.app_directories = None
836
837        if not isinstance(name,str):
838            self.error('machine name must be a string\nyou provided '+str(name))
839        #end if
840
841        Machine.add(self)
842    #end def __init__
843
844
845    def restore_default_settings(self):
846        self.account         = None
847        self.user            = None
848        self.local_directory = None
849        self.app_directory   = None
850        self.app_directories = None
851    #end def restore_default_settings
852
853
854    def add_job(self,job):
855        if isinstance(job,Job):
856            self.process_job(job)
857            self.write_job(job)
858            jid = job.internal_id
859            self.jobs[jid] = job
860            job.status = job.states.waiting
861            self.waiting.add(jid)
862            #self.write_job_states('add_job')
863        else:
864            self.error('add_job received non-Job instance '+job.__class__.__name__)
865        #end if
866    #end def add_job
867
868
869    def requeue_job(self,job):
870        None
871    #end def requeue_job
872
873
874    allowed_user_info = set(['account','local_directory','app_directory','app_directories'])
875    def incorporate_user_info(self,infoin):
876        info = obj(**infoin)
877        vars = set(info.keys())
878        invalid = vars-self.allowed_user_info
879        if len(invalid)>0:
880            self.error('invalid inputs encountered in incorporate_user_info\nallowed inputs: {0}\n  invalid inputs: {1}'.format(list(self.allowed_user_info),list(invalid)))
881        #end if
882        if 'app_directories' in info:
883            ad = info.app_directories
884            if not isinstance(ad,dict) and not isinstance(ad,obj):
885                self.error('app_directories must be of type dict or obj\nyou provided '+ad.__class__.__name__)
886            #end if
887        #end if
888        self.transfer_from(info)
889    #end def incorporate_user_info
890#end class Machine
891
892
893
894
895class Workstation(Machine):
896
897    mode = Machine.modes.interactive
898
899    batch_capable = False
900
901    def __init__(self,
902                 name                = 'workstation',
903                 cores               = None,
904                 app_launcher        = 'mpirun',
905                 process_granularity = 1
906                 ):
907        Machine.__init__(self,name)
908        self.app_launcher = app_launcher
909        if cores==None:
910            self.cores = cpu_count()
911        else:
912            self.cores = cores
913        #end if
914        self.queue_size = cores
915        self.process_granularity = process_granularity
916    #end def __init__
917
918
919    def process_job(self,job):
920        if job.serial_only():
921            job.cores=1
922        elif job.cores==None:
923            if job.processes!=None:
924                job.cores = job.processes*job.threads
925            else:
926                job.cores = self.cores
927            #end if
928        #end if
929        job.processes = max(1,int(floor(float(job.cores)/job.threads)))
930        grains = int(ceil(float(job.cores)/self.process_granularity))
931        if abs(grains-1-float(job.cores)/self.process_granularity)<1e-6:
932            grains-=1
933        #end if
934        job.grains = grains
935        job.cores = grains*self.process_granularity
936
937        self.process_job_options(job)
938    #end def process_job
939
940
941    def process_job_options(self,job):
942        job.run_options.add(np='-np '+str(job.processes))
943    #end def process_job_options
944
945
946    def write_job_states(self,title=''):
947        self.log(title,n=2)
948        n=3
949        self.log('{0} {1} {2} job states'.format(self.__class__.__name__,self.name,id(self)),n=n )
950        self.log('processes',n=n+1)
951        for process in self.processes:
952            job = process.job
953            self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
954        #end for
955        self.log('jobs',n=n+1)
956        jobids = list(self.jobs.keys())
957        jobids.sort()
958        for jobid in jobids:
959            job = self.jobs[jobid]
960            self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
961        #end for
962        self.log('waiting',n=n+1)
963        jobids = list(self.waiting)
964        jobids.sort()
965        for jobid in jobids:
966            job = self.jobs[jobid]
967            self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
968        #end for
969        self.log('running',n=n+1)
970        jobids = list(self.running)
971        jobids.sort()
972        for jobid in jobids:
973            job = self.jobs[jobid]
974            self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
975        #end for
976        self.log('finished',n=n+1)
977        jobids = list(self.finished)
978        jobids.sort()
979        for jobid in jobids:
980            job = self.jobs[jobid]
981            self.log('{0:>4} {1:>10} {2:>4} {3}'.format(job.internal_id,job.name,job.simid,job.directory),n=n+2)
982        #end for
983        self.log('end job states',n=1)
984    #end def write_job_states
985
986
987    def query_queue(self):
988        #self.write_job_states('query queue')
989        self.validate()
990        done = []
991        for pid,process in self.processes.items():
992            if nexus_core.generate_only or not nexus_core.monitor:
993                qpid,status = pid,0
994            else:
995                qpid,status = os.waitpid(pid,os.WNOHANG)
996            #end if
997            if pid==qpid:
998                job = process.job
999                job.status = job.states.finished
1000                job.finished = True
1001                job.determine_end_status(status)
1002                iid = job.internal_id
1003                self.running.remove(iid)
1004                self.finished.add(iid)
1005                done.append(pid)
1006                if not nexus_core.generate_only:
1007                    job.out.close()
1008                    job.err.close()
1009                #end if
1010            #end if
1011        #end for
1012        for pid in done:
1013            del self.processes[pid]
1014        #end for
1015    #end def query_queue
1016
1017
1018    def submit_jobs(self):
1019        cores_used = 0
1020        for process in self.processes:
1021            cores_used += process.job.cores
1022        #end for
1023        cores_available = self.cores-cores_used
1024
1025        core_req = []
1026        job_req  = []
1027        for iid in self.waiting:
1028            job = self.jobs[iid]
1029            job_req.append(job)
1030            core_req.append(job.cores)
1031        #end for
1032        core_req = array(core_req,dtype=int)
1033
1034        # The following line does not work correctly under Numpy 1.10 or greater.
1035        # It should create an ndarray of Job objects from a list of Job objects.
1036        # Instead it creates nested ndarray's with inner type of bool
1037        #job_req  = array(job_req ,dtype=object)
1038
1039        job_req_tmp = job_req
1040        job_req  = empty(len(job_req_tmp) ,dtype=object)
1041        for idx,job in enumerate(job_req_tmp):
1042            job_req[idx] = job
1043        #end for
1044
1045        order    = core_req.argsort()
1046        job_req  = job_req[order]
1047
1048        for job in job_req:
1049            if job.cores>self.cores and not nexus_core.generate_only:
1050                self.error('job '+str(job.internal_id)+' is too large to run on this machine\ncores requested: '+str(job.cores)+'\nmachine cores: '+str(self.cores))
1051            #end if
1052            if job.cores<=cores_available:
1053                iid = job.internal_id
1054                self.waiting.remove(iid)
1055                self.running.add(iid)
1056                self.submit_job(job)
1057                cores_available-=job.cores
1058            elif job.cores>self.cores:
1059                self.error('job requested more cores than are present on '+self.name+'\ncores requested: {0}\ncores present: {1}'.format(job.cores,self.cores))
1060            else:
1061                break
1062            #end if
1063        #end for
1064    #end def submit_jobs
1065
1066
1067    def job_command(self,job,pad=None):
1068        command = 'export OMP_NUM_THREADS='+str(job.threads)+'\n'
1069        if len(job.presub)>0:
1070            command += job.presub+'\n'
1071        #end if
1072        if job.serial is not None:
1073            command += job.run_command(self.app_launcher,serial=job.serial)
1074        else:
1075            command += job.run_command(self.app_launcher)
1076        #end if
1077        if len(job.postsub)>0:
1078            command += job.postsub+'\n'
1079        #end if
1080        if pad!=None:
1081            command = ('\n'+command).replace('\n','\n  '+pad)
1082        #end if
1083        return command
1084    #end def job_command
1085
1086
1087    def write_job(self,job,file=False):
1088        c = self.job_command(job)
1089        return c
1090    #end def write_job
1091
1092
1093    def submit_job(self,job):
1094        pad = self.enter(job.directory,msg=job.simid)
1095        command = self.job_command(job,pad=pad)
1096        job.status = job.states.running
1097        process = obj()
1098        process.job = job
1099        if nexus_core.generate_only:
1100            self.log(pad+'Would have executed:  '+command)
1101            job.system_id = job.internal_id
1102        else:
1103            if nexus_core.monitor:
1104                self.log(pad+'Executing:  '+command)
1105                job.out = open(job.outfile,'w')
1106                job.err = open(job.errfile,'w')
1107                p = Popen(command,env=job.env,stdout=job.out,stderr=job.err,shell=True)
1108                process.popen = p
1109                job.system_id = p.pid
1110            else:
1111                command+=' >'+job.outfile+' 2>'+job.errfile+'&'
1112                self.log(pad+'Executing:  '+command)
1113                os.system(command)
1114                job.system_id = job.internal_id
1115            #end if
1116        #end if
1117        self.processes[job.system_id] = process
1118        self.leave()
1119    #end def submit_job
1120#end class Workstation
1121
1122
1123
1124
1125# test needed
1126class InteractiveCluster(Workstation):
1127
1128    def __init__(self,*args,**kwargs):
1129        if len(args)==0 or not isinstance(args[0],Supercomputer):
1130            self.init_from_args(*args,**kwargs)
1131        else:
1132            super = args[0]
1133            cores = args[1]
1134            self.init_from_supercomputer(super,cores)
1135        #end if
1136        Machine.__init__(self,self.name,self.queue_size)
1137    #end def __init__
1138
1139
1140    def init_from_args(self,
1141                       name                = 'icluster',
1142                       nodes               = None,
1143                       procs_per_node      = None,
1144                       cores_per_proc      = None,
1145                       process_granularity = None,
1146                       ram_per_node        = None,
1147                       app_launcher        = None
1148                       ):
1149        self.name           = name
1150        self.nodes          = nodes
1151        self.procs_per_node = procs_per_node
1152        self.cores_per_proc = cores_per_proc
1153        self.process_granularity = process_granularity
1154        self.ram_per_node   = ram_per_node
1155        self.app_launcher   = app_launcher
1156
1157        self.cores_per_node = self.cores_per_proc*self.procs_per_node
1158        if process_granularity is None:
1159            self.process_granularity = self.cores_per_node
1160        #end if
1161
1162        self.procs = self.procs_per_node*self.nodes
1163        self.cores = self.cores_per_proc*self.procs
1164        self.ram   = self.ram_per_node*self.nodes
1165
1166        self.queue_size = self.cores
1167    #end def init_from_args
1168
1169
1170    def init_from_supercomputer(self,super,cores):
1171        nodes = cores//super.cores_per_node
1172        if cores-nodes*super.cores_per_node!=0:
1173            self.error('interactive cores corresponds to a fractional number of nodes\n  cores '+str(cores)+'\n  cores per node '+str(super.cores_per_node))
1174        #end if
1175        self.init_from_args(super.name+'_interactive',nodes,super.procs_per_node,
1176                            super.cores_per_proc,super.cores_per_node,
1177                            super.ram_per_node,super.app_launcher)
1178    #end def init_from_supercomputer
1179
1180
1181    def process_job(self,job):
1182        job.cores = min(job.cores,self.cores)
1183
1184        Workstation.process_job(self,job)
1185
1186        job.nodes = job.grains
1187        job.procs = job.nodes*self.procs_per_node
1188
1189        if mod(job.processes,job.nodes)!=0:
1190            job.processes_per_node = None
1191        else:
1192            job.processes_per_node = job.processes//job.nodes
1193        #end if
1194
1195        if mod(job.processes,job.procs)!=0:
1196            job.processes_per_proc = None
1197        else:
1198            job.processes_per_proc = job.processes//job.procs
1199        #end if
1200    #end def process_job
1201#end class InteractiveCluster
1202
1203
1204
1205
1206class Supercomputer(Machine):
1207    mode = Machine.modes.batch
1208    name = 'supercomputer'
1209
1210    batch_capable = False #only set to true for specific machines
1211
1212    aprun_options = set(['n','d'])
1213
1214    required_inputs = [
1215        'nodes',
1216        'procs_per_node',
1217        'cores_per_proc',
1218        'ram_per_node',
1219        'queue_size',
1220        'app_launcher',
1221        'sub_launcher',
1222        'queue_querier',
1223        'job_remover'
1224        ]
1225
1226    def __init__(self,
1227                 nodes          = None,
1228                 procs_per_node = None,
1229                 cores_per_proc = None,
1230                 ram_per_node   = None,
1231                 queue_size     = 0,
1232                 app_launcher   = None,
1233                 sub_launcher   = None,
1234                 queue_querier  = None,
1235                 job_remover    = None,
1236                 name           = None,
1237                 ):
1238        if name is None:
1239            if self.name is not None:
1240                name = self.name
1241            else:
1242                name = self.__class__.__name__.lower()
1243            #end if
1244        #end if
1245        Machine.__init__(self,name)
1246        self.nodes          = nodes          #  # of nodes
1247        self.procs_per_node = procs_per_node #  # of processors/sockets on a node
1248        self.cores_per_proc = cores_per_proc #  # of cores on a processor/socket
1249        self.ram_per_node   = ram_per_node
1250        self.queue_size     = queue_size
1251        self.app_launcher   = app_launcher
1252        self.sub_launcher   = sub_launcher
1253        self.queue_querier  = queue_querier
1254        self.job_remover    = job_remover
1255
1256        for var in Supercomputer.required_inputs:
1257            if self[var] is None:
1258                self.error('input variable '+var+' is required to initialize Supercomputer object.')
1259            #end if
1260        #end for
1261
1262        self.cores_per_node = self.cores_per_proc*self.procs_per_node
1263
1264        self.procs = self.procs_per_node*self.nodes
1265        self.cores = self.cores_per_proc*self.procs
1266        self.ram   = self.ram_per_node*self.nodes
1267
1268        # 'complete' is the only actively used status so far
1269        #   At least one queue state should correspond to 'complete',
1270        #   though even this is not strictly necessary.
1271        #   In general if a pid is missing from the queue,
1272        #   that job is assumed to be complete.
1273
1274        self.system_queue = obj()
1275        if self.queue_querier=='qstat':
1276            self.job_states=dict(E = 'exiting',
1277                                 H = 'held',
1278                                 Q = 'queued',
1279                                 R = 'running',
1280                                 S = 'suspended',
1281                                 T = 'transferring',
1282                                 W = 'waiting',
1283                                 C = 'complete'
1284                                 )
1285        elif self.queue_querier=='qstata':
1286            #already gives status as queued, running, etc.
1287            None
1288        elif  self.queue_querier=='squeue':
1289            self.job_states=dict(CG = 'exiting',
1290                                 TO = 'timeout',
1291                                 CA = 'failed',
1292                                 F = 'failed',
1293                                 NF = 'node_fail',
1294                                 PD = 'waiting',
1295                                 R = 'running',
1296                                 S = 'suspended',
1297                                 CD = 'complete',
1298                                 RD = 'held',
1299                                 BF = 'failed',
1300                                 CF = 'configuring',
1301                                 DL = 'deadline',
1302                                 OOM= 'out_of_memory',
1303                                 PR = 'preempted',
1304                                 RF = 'requeue_fed',
1305                                 RH = 'requeue_hold',
1306                                 RQ = 'requeued',
1307                                 RS = 'resizing',
1308                                 RV = 'revoked',
1309                                 SI = 'signaling',
1310                                 SE = 'special_exit',
1311                                 SO = 'stage_out',
1312                                 ST = 'stopped',
1313                                 )
1314        elif self.queue_querier=='sacct':
1315            self.job_states=dict(CANCELLED = 'failed',  #long form
1316                                 COMPLETED = 'complete',
1317                                 COMPLETING = 'exiting',
1318                                 CONFIGURING = 'waiting',
1319                                 FAILED = 'failed',
1320                                 PREMEEMPTED = 'failed',
1321                                 PENDING = 'waiting',
1322                                 NODE_FAIL = 'failed',
1323                                 RESIZING = 'resizing',
1324                                 RUNNING = 'running',
1325                                 SUSPENDED = 'suspended',
1326                                 TIMEOUT = 'failed',
1327                                 CA = 'failed',        #short form
1328                                 CD = 'complete',
1329                                 CG = 'exiting',
1330                                 CF = 'waiting',
1331                                 F = 'failed',
1332                                 PR = 'failed',
1333                                 PD = 'waiting',
1334                                 NF = 'failed',
1335                                 RS = 'resizing',
1336                                 R = 'running',
1337                                 S = 'suspended',
1338                                 TO = 'failed'
1339                                 )
1340        elif self.queue_querier=='llq':
1341            self.job_states=dict(I  = 'idle',
1342                                 NQ = 'not_queued',
1343                                 H  = 'user_hold',
1344                                 S  = 'system_hold',
1345                                 HS = 'user_system_hold',
1346                                 D  = 'deferred',
1347                                 R  = 'running',
1348                                 P  = 'pending',
1349                                 ST = 'starting',
1350                                 C  = 'complete',
1351                                 CA = 'canceled',
1352                                 E  = 'preempted',
1353                                 EP = 'preempt_pending',
1354                                 MP = 'resume_pending',
1355                                 )
1356        elif self.queue_querier=='bjobs':
1357            self.job_states=dict(PEND  = 'pending',
1358                                 RUN   = 'running',
1359                                 DONE  = 'complete',
1360                                 EXIT  = 'failed',
1361                                 PSUSP = 'suspended',
1362                                 USUSP = 'suspended',
1363                                 SSUSP = 'suspended',
1364                                 )
1365        elif self.queue_querier=='test_query':
1366            None
1367        else:
1368            self.error('ability to query queue with '+self.queue_querier+' has not yet been implemented')
1369        #end if
1370
1371    #end def __init__
1372
1373
1374    # test needed
1375    def interactive_representation(self,cores):
1376        return InteractiveCluster(self,cores)
1377    #end def interactive_representation
1378
1379
1380    # test needed
1381    def requeue_job(self,job):
1382        if isinstance(job,Job):
1383            jid = job.internal_id
1384            pid = job.system_id
1385            if pid is None:
1386                self.error('job {0} does not have a process id issued by the scheduler'.format(jid))
1387            #end if
1388            self.process_job(job)
1389            self.jobs[jid] = job
1390            job.status = job.states.running
1391            self.running.add(jid)
1392            process = obj(job=job)
1393            self.processes[pid] = process
1394        else:
1395            self.error('requeue_job received non-Job instance '+job.__class__.__name__)
1396        #end if
1397    #end def requeue_job
1398
1399
1400    def process_job(self,job):
1401        if job.fake_job:
1402            return
1403        #end if
1404
1405        self.pre_process_job(job)
1406
1407        job.subfile = job.name+'.'+self.sub_launcher+'.in'
1408        no_cores = job.cores is None
1409        no_nodes = job.nodes is None
1410        if no_cores and no_nodes:
1411            self.error('job did not specify cores or nodes\nAt least one must be provided')
1412        elif no_cores:
1413            job.cores = self.cores_per_node*job.nodes
1414        elif no_nodes:
1415            job.nodes = int(ceil(float(job.cores)/self.cores_per_node))
1416            if abs(job.nodes-1-float(job.cores)/self.cores_per_node)<1e-6:
1417                job.nodes-=1
1418            #end if
1419        else:
1420            job.cores = min(job.cores,job.nodes*self.cores_per_node)
1421        #end if
1422        if job.processes_per_node is not None:
1423            job.processes = job.nodes*job.processes_per_node
1424        else:
1425            job.processes = max(1,int(float(job.cores)/job.threads))
1426        #end if
1427        job.tot_cores = job.nodes*self.cores_per_node
1428        job.procs = job.nodes*self.procs_per_node
1429
1430        if mod(job.processes,job.nodes)!=0:
1431            job.processes_per_node = None
1432        else:
1433            job.processes_per_node = job.processes//job.nodes
1434        #end if
1435
1436        if mod(job.processes,job.procs)!=0:
1437            job.processes_per_proc = None
1438        else:
1439            job.processes_per_proc = job.processes//job.procs
1440        #end if
1441
1442        if job.ppn is None:
1443            job.ppn = self.cores_per_node
1444        #end if
1445
1446        if job.account is None:
1447            if self.account is not None:
1448                job.account = self.account
1449            elif self.requires_account:
1450                self.error('account not specified for job on '+self.name)
1451            #end if
1452        #end if
1453
1454        self.post_process_job(job)
1455
1456        job.set_environment(OMP_NUM_THREADS=job.threads)
1457
1458        self.process_job_options(job)
1459    #end def process_job
1460
1461
1462    def process_job_options(self,job):
1463        launcher = self.app_launcher
1464        if launcher=='mpirun':
1465            job.run_options.add(np='-np '+str(job.processes))
1466        elif launcher=='mpiexec':
1467            job.run_options.add(n='-n '+str(job.processes))
1468        elif launcher=='aprun':
1469            if 'n' in self.aprun_options:
1470                job.run_options.add(n='-n '+str(job.processes))
1471            #end if
1472            if 'd' in self.aprun_options and job.threads>1:
1473                job.run_options.add(d='-d '+str(job.threads))
1474            #end if
1475            if 'N' in self.aprun_options and job.processes_per_node is not None:
1476                job.run_options.add(N='-N '+str(job.processes_per_node))
1477            #end if
1478            if 'S' in self.aprun_options and job.processes_per_proc is not None:
1479                job.run_options.add(S='-S '+str(job.processes_per_proc))
1480            #end if
1481        elif launcher=='runjob':
1482            #bypass setup_environment
1483            if job.env is not None:
1484                envs='--envs'
1485                for name,value in job.env.items():
1486                    envs+=' {0}={1}'.format(name,value)
1487                #end for
1488                job.env = None
1489            elif 'envs' in job.run_options:
1490                envs = job.run_options.envs
1491            else:
1492                self.error('failed to set env options for runjob')
1493            #end if
1494            job.run_options.add(
1495                np       = '--np '+str(job.processes),
1496                p        = '-p '+str(job.processes_per_node),
1497                xlocargs = '$LOCARGS',
1498                verbose  = '--verbose=INFO',
1499                envs     = envs
1500                )
1501        elif launcher=='srun':  # Amos contribution from Ryan McAvoy
1502            None
1503        elif launcher=='ibrun': # Lonestar contribution from Paul Young
1504            job.run_options.add(
1505	        np	= '-n '+str(job.processes),
1506	        p	= '-o '+str(0),
1507	        )
1508        elif launcher=='jsrun': # Summit
1509            None # Summit class takes care of this in post_process_job
1510        else:
1511            self.error(launcher+' is not yet implemented as an application launcher')
1512        #end if
1513    #end def process_job_options
1514
1515
1516    def pre_process_job(self,job):
1517        None
1518    #end def pre_process_job
1519
1520
1521    def post_process_job(self,job):
1522        None
1523    #end def post_process_job
1524
1525
1526    def query_queue(self,out=None):
1527        self.system_queue.clear()
1528        if self.query_with_username and self.user is None:
1529            self.error('querying queue on machine "{}" requires user name\nplease provide username via the "user" keyword in settings'.format(self.name))
1530        #end if
1531        if self.queue_querier=='qstat':
1532            if out is None:
1533                out,err,rc = execute('qstat -a')
1534            #end if
1535            lines = out.splitlines()
1536            for line in lines:
1537                tokens=line.split()
1538                if len(tokens)>0:
1539                    if '.' in tokens[0]:
1540                        spid = tokens[0].split('.')[0]
1541                    else:
1542                        spid = tokens[0]
1543                    #endif
1544                    if spid.isdigit() and len(tokens)==11:
1545                        pid = int(spid)
1546                        jid,uname,queue,jname,sessid,nodes,tasks,mem,rtime,status,etime = tokens
1547                        if status in self.job_states:
1548                            self.system_queue[pid] = self.job_states[status]
1549                        else:
1550                            self.error('job state '+status+' is unrecognized')
1551                        #end if
1552                    #end if
1553                #end if
1554            #end for
1555        elif self.queue_querier=='qstata':
1556            if out is None:
1557                out,err,rc = execute('qstat')
1558            #end if
1559            lines = out.splitlines()
1560            for line in lines:
1561                tokens=line.split()
1562                if len(tokens)>0:
1563                    if '.' in tokens[0]:
1564                        spid = tokens[0].split('.')[0]
1565                    else:
1566                        spid = tokens[0]
1567                    #endif
1568                    if spid.isdigit() and len(tokens)==6:
1569                        pid = int(spid)
1570                        jid,uname,wtime,nodes,status,loc = tokens
1571                        self.system_queue[pid] = status
1572                    #end if
1573                #end if
1574            #end for
1575        elif self.queue_querier=='squeue': # contributed by Ryan McAvoy
1576            if out is None:
1577                extra = ''
1578                if self.user is not None:
1579                    extra = ' -u {}'.format(self.user)
1580                #end if
1581                out,err,rc = execute('squeue'+extra)
1582            #end if
1583            lines = out.splitlines()
1584            for line in lines:
1585                tokens=line.split()
1586                if len(tokens)>0:
1587                    if '.' in tokens[0]:
1588                        spid = tokens[0].split('.')[0]
1589                    else:
1590                        spid = tokens[0]
1591                    #endif
1592                    if spid.isdigit():
1593                        pid = int(spid)
1594                        status = None
1595                        jid,loc,name,uname,status,wtime,nodes,reason = tokens[:8]
1596                        if status is not None:
1597                            if status in self.job_states:
1598                                self.system_queue[pid] = self.job_states[status]
1599                            else:
1600                                self.error('job state '+status+' is unrecognized')
1601                            #end if
1602                        #end if
1603                    #end if
1604                #end if
1605            #end for
1606        elif self.queue_querier=='sacct': # contributed by Ryan McAvoy
1607            if out is None:
1608                out,err,rc = execute('sacct')
1609            #end if
1610            lines = out.splitlines()
1611            for line in lines:
1612                tokens=line.split()
1613                if len(tokens)>0:
1614                    if '.' in tokens[0]:
1615                        spid = tokens[0].split('.')[0]
1616                    else:
1617                        spid = tokens[0]
1618                    #endif
1619                    if spid.isdigit() and len(tokens)==6:  #if account is empty, only 6 tokens.
1620
1621                        pid = int(spid)
1622                        jid,name,loc,cores,status,exit_code = tokens
1623                        status = status.split('+')[0]  ## get rid of '+' in the end
1624                        if status in self.job_states:
1625                            self.system_queue[pid] = self.job_states[status]
1626                        else:
1627                            self.error('job state '+status+' is unrecognized')
1628                        #end if
1629                    elif spid.isdigit() and len(tokens)==7:
1630
1631                        pid = int(spid)
1632                        jid,name,loc,uname,cores,status,exit_code = tokens
1633                        status = status.split('+')[0]  ## get rid of '+' in the end
1634                        if status in self.job_states:
1635                            self.system_queue[pid] = self.job_states[status]
1636                        else:
1637                            self.error('job state '+status+' is unrecognized')
1638                        #end if
1639                    #end if
1640                #end if
1641            #end for
1642        elif self.queue_querier=='llq':
1643            if out is None:
1644                out,err,rc = execute('sacct')
1645            #end if
1646            lines = out.splitlines()
1647            for line in lines:
1648                tokens=line.split()
1649                if len(tokens)>0:
1650                    if '.' in tokens[0]:
1651                        spid = tokens[0].split('.')[1]
1652                    else:
1653                        spid = tokens[0]
1654                    #endif
1655                    if spid.isdigit() and (len(tokens)==7 or len(tokens)==8):
1656                        pid = int(spid)
1657                        if len(tokens)==7:
1658                            jid,owner,subdate,subtime,status,pri,class_ = tokens
1659                        elif len(tokens)==8:
1660                            jid,owner,subdate,subtime,status,pri,class_,running_on = tokens
1661                        #end if
1662                        if status in self.job_states:
1663                            self.system_queue[pid] = self.job_states[status]
1664                        else:
1665                            self.error('job state '+status+' is unrecognized')
1666                        #end if
1667                    #end if
1668                #end if
1669            #end for
1670        elif self.queue_querier=='bjobs':
1671            if out is None:
1672                out,err,rc = execute('bjobs')
1673            #end if
1674            lines = out.splitlines()
1675            for line in lines:
1676                tokens=line.split()
1677                if len(tokens)>0:
1678                    spid = tokens[0]
1679                    if spid.isdigit() and len(tokens)==8:
1680                        pid = int(spid)
1681                        jid,uname,status,slots,queue,start,finish,jname = tokens
1682                        if status in self.job_states:
1683                            self.system_queue[pid] = self.job_states[status]
1684                        else:
1685                            self.error('job state '+status+' is unrecognized')
1686                        #end if
1687                    #end if
1688                #end if
1689            #end for
1690        elif self.queue_querier=='test_query': # for testing
1691            # pretend that all jobs have finished
1692            for pid in self.processes.keys():
1693                self.system_queue[pid] = 'complete'
1694            #end for
1695        else:
1696            self.error('ability to query queue with '+self.queue_querier+' has not yet been implemented')
1697        #end if
1698        done = []
1699        for pid,process in self.processes.items():
1700            if not pid in self.system_queue or self.system_queue[pid]=='complete' or nexus_core.generate_only:
1701                job = process.job
1702                job.status = job.states.finished
1703                job.finished = True
1704                iid = job.internal_id
1705                self.running.remove(iid)
1706                self.finished.add(iid)
1707                done.append(pid)
1708            #end if
1709        #end for
1710        for pid in done:
1711            del self.processes[pid]
1712        #end for
1713        return self.system_queue
1714    #end def query_queue
1715
1716
1717    def submit_jobs(self):
1718        nprocesses_running = len(self.processes)
1719        queue_slots_available = self.queue_size-nprocesses_running
1720        remove = []
1721        for iid in self.waiting:
1722            if queue_slots_available>0:
1723                remove.append(iid)
1724                self.running.add(iid)
1725                job = self.jobs[iid]
1726                self.submit_job(job)
1727                queue_slots_available -= 1
1728            else:
1729                break
1730            #end if
1731        #end for
1732        for iid in remove:
1733            self.waiting.remove(iid)
1734        #end for
1735    #end def submit_jobs
1736
1737
1738    def submit_job(self,job):
1739        pad = self.enter(job.directory,msg=job.internal_id)
1740        if job.subfile==None:
1741            self.error('submission file not specified for job')
1742        elif not os.path.exists(job.subfile):
1743            self.error('job submission file was not written prior to submission\n  submission file: '+os.path.join(job.directory,job.subfile))
1744        #end if
1745        command = self.sub_command(job)
1746        if nexus_core.generate_only:
1747            self.log(pad+'Would have executed:  '+command)
1748            job.status = job.states.running
1749            process = obj()
1750            process.job = job
1751            self.processes[job.internal_id] = process
1752        else:
1753            self.log(pad+'Executing:  '+command)
1754            job.status = job.states.running
1755            process = obj()
1756            process.job = job
1757            out,err,rc = execute(command)
1758            output=out+'\n'+err
1759            pid = self.read_process_id(output)
1760            if pid is None:
1761                self.error('process id could not be determined from submission output\n  output:\n'+output)
1762            else:
1763                self.log(pad+'  pid: {0}'.format(pid))
1764            #end if
1765            #pid = 'fakepid_'+str(job.internal_id)
1766            job.system_id = pid
1767            self.processes[pid] = process
1768        #end if
1769        self.leave()
1770    #end def submit_job
1771
1772
1773    def sub_command(self,job):
1774        return self.sub_launcher+job.sub_options.write()+' '+job.subfile
1775    #end def sub_command
1776
1777
1778    def remove_job(self,job):
1779        if self.job_remover=='qdel':
1780            command = 'qdel '+str(job.system_id)
1781        elif self.job_remover=='scancel':
1782            command = 'scancel '+str(job.system_id)
1783        else:
1784            self.error('ability to remove job using '+self.job_remover+' has not yet been implemented')
1785        #endif
1786        os.system(command)
1787    #end def remove_job
1788
1789
1790    def setup_environment(self,job):
1791        env = ''
1792        if job.env!=None:
1793            for name,val in job.env.items():
1794                env +='export {0}={1}\n'.format(name,val)
1795            #end for
1796        #end if
1797        return env
1798    #end def setup_environment
1799
1800
1801    def write_job(self,job,file=False):
1802        job.subfile = job.name+'.'+self.sub_launcher+'.in'
1803        env = self.setup_environment(job)
1804        command = job.run_command(self.app_launcher,serial=job.serial)
1805
1806        c = self.write_job_header(job)+'\n'
1807        if len(job.presub)>0:
1808            c+=job.presub+'\n'
1809        #end if
1810        c+=env
1811        c+=command+'\n'
1812        if len(job.postsub)>0:
1813            c+=job.postsub+'\n'
1814        #end if
1815        if file:
1816            filepath = os.path.join(job.directory,job.subfile)
1817            fobj = open(filepath,'w')
1818            fobj.write(c)
1819            fobj.close()
1820            if self.executable_subfile:
1821                os.system('chmod +x '+filepath)
1822            #end if
1823        #end if
1824        return c
1825    #end def write_job
1826
1827
1828    def write_job_header(self,job):
1829        self.not_implemented()
1830    #end def write_job_header
1831
1832
1833    def read_process_id(self,output):
1834        pid = None
1835        lines = output.splitlines()
1836        if self.sub_launcher=='llsubmit': # specialization for load leveler (SuperMUC)
1837            for line in lines:
1838                if 'llsubmit: The job' in line and '"' in line:
1839                    spid = line.split('"')[1].split('.')[1].strip()
1840                    if spid.isdigit():
1841                        pid = int(spid)
1842                        break
1843                    #end if
1844                #end if
1845            #end for
1846        else:  # most other machines follow the pattern below
1847            for line in lines:
1848                ls = line.strip()
1849                if ls.isdigit():
1850                    pid = int(ls)
1851                    break
1852                elif '.' in line:
1853                    spid = line.split('.')[0]
1854                    if spid.isdigit():
1855                        pid = int(spid)
1856                        break
1857                    #end if
1858                elif ' ' in line: # specialized for Amos?
1859                    spid = line.split(' ')[-1]
1860                    if spid.isdigit():
1861                        pid = int(spid)
1862                        break
1863                    #end if
1864                #end if
1865            #end for
1866        #end if
1867        return pid
1868    #end def read_process_id
1869
1870#end class Supercomputer
1871
1872
1873
1874# Load local class for local cluster's setting from ~/.nexus/local.py
1875# The following is an example of this file (see machines.py for more examples):
1876'''
1877from machines import Supercomputer
1878class Clustername(Supercomputer):
1879    name = 'clustername'
1880    requires_account = False
1881    batch_capable    = True
1882    def write_job_header(self,job):
1883        if job.queue is None:
1884            job.queue='batch'
1885        #end if
1886        c= '#!/bin/bash\n'
1887        c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,job.ppn)
1888        c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
1889        c+='#PBS -N '+job.name +'\n'
1890        c+='#PBS -o '+job.outfile+'\n'
1891        c+='#PBS -e '+job.errfile+'\n'
1892        c+='#PBS -V\n'
1893        c+='#PBS -q '+job.queue+'\n'
1894        c+=' \n '
1895        return c
1896    #end def write_job_header
1897#end class Clustername
1898#            nodes sockets cores ram qslots  qlaunch  qsubmit     qstatus   qdelete
1899Clustername(      4,   1,    16,   24,    4, 'mpirun',     'qsub',   'qstat',    'qdel')
1900'''
1901try:
1902    load_source('*',os.path.expanduser('~/.nexus/local_machines.py'))
1903except IOError:
1904    pass
1905except:
1906    raise
1907#end try
1908
1909
1910class Kraken(Supercomputer):
1911
1912    name = 'kraken'
1913
1914    requires_account = True
1915
1916    def write_job_header(self,job):
1917        c='#!/bin/bash\n'
1918        c+='#PBS -A '+str(job.account)+'\n'
1919        c+='#PBS -N '+str(job.name)+'\n'
1920        c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
1921        c+='#PBS -l size='+str(job.tot_cores)+'\n'
1922        c+='#PBS -o '+job.outfile+'\n'
1923        c+='#PBS -e '+job.errfile+'\n'
1924        if job.user_env:
1925            c+='#PBS -V\n'
1926        #end if
1927        c+='''
1928cd $PBS_O_WORKDIR
1929export MPICH_PTL_SEND_CREDITS=-1
1930export MPICH_MAX_SHORT_MSG_SIZE=1024
1931export MPICH_PTL_UNEX_EVENTS=800000
1932export MPICH_UNEX_BUFFER_SIZE=16M
1933export MPI_MSGS_PER_PROC=32768
1934'''
1935        return c
1936    #end def write_job_header
1937#end class Kraken
1938
1939
1940
1941class Jaguar(Supercomputer):
1942    name = 'jaguar'
1943
1944    requires_account = True
1945
1946    def write_job_header(self,job):
1947        if job.queue is None:
1948            job.queue='batch'
1949        #end if
1950        c='#!/bin/bash\n'
1951        c+='#PBS -A '+str(job.account)+'\n'
1952        c+='#PBS -q '+job.queue+'\n'
1953        c+='#PBS -N '+str(job.name)+'\n'
1954        c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
1955        c+='#PBS -l size='+str(job.tot_cores)+'\n'
1956        c+='#PBS -l gres=widow2%widow3\n'
1957        c+='#PBS -o '+job.outfile+'\n'
1958        c+='#PBS -e '+job.errfile+'\n'
1959        if job.user_env:
1960            c+='#PBS -V\n'
1961        #end if
1962        c+='''
1963echo $PBS_O_WORKDIR
1964cd $PBS_O_WORKDIR
1965export MPICH_PTL_SEND_CREDITS=-1
1966export MPICH_MAX_SHORT_MSG_SIZE=1024
1967export MPICH_PTL_UNEX_EVENTS=800000
1968export MPICH_UNEX_BUFFER_SIZE=16M
1969export MPI_MSGS_PER_PROC=32768
1970'''
1971        return c
1972    #end def write_job_header
1973#end class Jaguar
1974
1975
1976
1977
1978class Golub(Supercomputer):
1979
1980    name = 'golub'
1981
1982    def write_job_header(self,job):
1983        if job.queue is None:
1984            job.queue='secondary'
1985        #end if
1986        c=''
1987        c+='#PBS -q '+job.queue+'\n'
1988        c+='#PBS -N '+job.name+'\n'
1989        c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,job.ppn)
1990        c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
1991        c+='#PBS -e '+job.errfile+'\n'
1992        c+='#PBS -o '+job.outfile+'\n'
1993        if job.user_env:
1994            c+='#PBS -V\n'
1995        #end if
1996        c+='''
1997cd ${PBS_O_WORKDIR}
1998
1999'''
2000        return c
2001    #end def write_job_header
2002
2003#end class Taub
2004
2005
2006
2007
2008class OIC5(Supercomputer):
2009
2010    name = 'oic5'
2011    batch_capable = True
2012
2013    def write_job_header(self,job):
2014        if job.queue is None:
2015            job.queue = 'mstqmc13q'
2016        #end if
2017
2018        ppn = job.processes_per_node
2019        #ppn = 32/job.threads
2020        #if ppn*job.threads!=32:
2021        #    self.error('ppn is not being set properly for OIC5\n  perhaps the number of threads requested does not evenly divide the 32 cores\n  you requested {0} threads'.format(job.threads))
2022        ##end if
2023
2024        c='#!/bin/bash\n'
2025        c+='#PBS -q '+job.queue+'\n'
2026        c+='#PBS -N '+str(job.name)+'\n'
2027        c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
2028        c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes,ppn)
2029        c+='#PBS -W x=\"NACCESSPOLICY:SINGLEJOB\"\n'
2030        c+='#PBS -o '+job.outfile+'\n'
2031        c+='#PBS -e '+job.errfile+'\n'
2032        if job.user_env:
2033            c+='#PBS -V\n'
2034        #end if
2035        c+='''
2036echo $PBS_O_WORKDIR
2037cd $PBS_O_WORKDIR
2038'''
2039        return c
2040    #end def write_job_header
2041
2042
2043    def read_process_id(self,output):
2044        pid = None
2045        lines = output.splitlines()
2046        for line in lines:
2047            if 'oic.ornl.gov' in line:
2048                spid = line.split('.')[0]
2049                if spid.isdigit():
2050                    pid = int(spid)
2051                #end if
2052            #end if
2053        #end for
2054        return pid
2055    #end def read_process_id
2056#end class OIC5
2057
2058
2059
2060
2061class NerscMachine(Supercomputer):
2062    batch_capable = True
2063
2064    def write_job_header(self,job):
2065        if job.queue is None:
2066            job.queue = 'regular'
2067        #end if
2068        c='#!/bin/bash\n'
2069        c+='#SBATCH -p '+job.queue+'\n'
2070        c+='#SBATCH -J '+str(job.name)+'\n'
2071        c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
2072        c+='#SBATCH -N '+str(job.nodes)+'\n'
2073        c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
2074        c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
2075        c+='#SBATCH -o '+job.outfile+'\n'
2076        c+='#SBATCH -e '+job.errfile+'\n'
2077        if job.user_env:
2078            c+='#SBATCH --export=ALL\n'   # equiv to PBS -V
2079        else:
2080            c+='#SBATCH --export=NONE\n'
2081        #end if
2082        c+='''
2083echo $SLURM_SUBMIT_DIR
2084cd $SLURM_SUBMIT_DIR
2085'''
2086        return c
2087    #end def write_job_header
2088#end class NerscMachine
2089
2090
2091class Edison(NerscMachine):
2092    name = 'edison'
2093#end class Edison
2094
2095
2096class Cori(NerscMachine):
2097    name = 'cori'
2098
2099    def pre_process_job(self,job):
2100        if job.queue is None:
2101            job.queue = 'regular'
2102        #end if
2103        if job.constraint is None:
2104            job.constraint = 'knl'
2105        #end if
2106        # account for dual nature of Cori
2107        if 'knl' in job.constraint:
2108            self.nodes          = 9688
2109            self.procs_per_node = 1
2110            self.cores_per_node = 68
2111            self.ram_per_node   = 96
2112        elif 'haswell' in job.constraint:
2113            self.nodes = 2388
2114            self.procs_per_node = 2
2115            self.cores_per_node = 32
2116            self.ram_per_node   = 128
2117        elif 'amd' in job.constraint:
2118            self.nodes = 20
2119            self.procs_per_node = 2
2120            self.cores_per_node = 32
2121            self.ram_per_node   = 2048
2122        else:
2123            self.error('SLURM input "constraint" must contain either "knl", "haswell", or "amd" on Cori\nyou provided: {0}'.format(job.constraint))
2124        #end if
2125        if job.core_spec is not None:
2126            self.cores_per_node -= job.core_spec
2127        #end if
2128    #end def pre_process_job
2129
2130    def write_job_header(self,job):
2131        self.pre_process_job(job) # sync machine view with job
2132        if 'knl' in job.constraint:
2133            hyperthreads   = 4
2134        elif 'haswell' in job.constraint:
2135            hyperthreads   = 2
2136        elif 'amd' in job.constraint:
2137            hyperthreads   = 2
2138        else:
2139            self.error('SLURM input "constraint" must contain either "knl", "haswell" or "amd" on Cori\nyou provided: {0}'.format(job.constraint))
2140        #end if
2141        cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))*hyperthreads
2142        c='#!/bin/bash\n'
2143        if job.account is not None:
2144            c+= '#SBATCH -A '+job.account+'\n'
2145        #end if
2146        c+='#SBATCH -p '+job.queue+'\n'
2147        c+='#SBATCH -C '+str(job.constraint)+'\n'
2148        c+='#SBATCH -J '+str(job.name)+'\n'
2149        c+='#SBATCH -t '+job.sbatch_walltime()+'\n'
2150        c+='#SBATCH -N '+str(job.nodes)+'\n'
2151        c+='#SBATCH --tasks-per-node={0}\n'.format(job.processes_per_node)
2152        c+='#SBATCH --cpus-per-task={0}\n'.format(cpus_per_task)
2153        c+='#SBATCH -o '+job.outfile+'\n'
2154        c+='#SBATCH -e '+job.errfile+'\n'
2155        if job.user_env:
2156            c+='#SBATCH --export=ALL\n'   # equiv to PBS -V
2157        else:
2158            c+='#SBATCH --export=NONE\n'
2159        #end if
2160        c+='''
2161echo $SLURM_SUBMIT_DIR
2162cd $SLURM_SUBMIT_DIR
2163'''
2164        if job.threads>1:
2165            c+='''
2166export OMP_PROC_BIND=true
2167export OMP_PLACES=threads
2168'''
2169        #end if
2170        return c
2171    #end def write_job_header
2172#end class Cori
2173
2174
2175
2176class BlueWatersXK(Supercomputer):
2177
2178    name = 'bluewaters_xk'
2179    requires_account = False
2180    batch_capable    = True
2181
2182    def write_job_header(self,job):
2183        c='#!/bin/bash\n'
2184        c+='#PBS -N '+str(job.name)+'\n'
2185        c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
2186        c+='#PBS -l nodes={0}:ppn={1}:xk\n'.format(job.nodes,job.ppn)
2187        c+='#PBS -o '+job.outfile+'\n'
2188        c+='#PBS -e '+job.errfile+'\n'
2189        if job.user_env:
2190            c+='#PBS -V\n'
2191        #end if
2192        c+='''
2193echo $PBS_O_WORKDIR
2194cd $PBS_O_WORKDIR
2195'''
2196        return c
2197    #end def write_job_header
2198#end class BlueWatersXK
2199
2200
2201
2202
2203class BlueWatersXE(Supercomputer):
2204
2205    name = 'bluewaters_xe'
2206    requires_account = False
2207    batch_capable    = True
2208
2209    def write_job_header(self,job):
2210        c='#!/bin/bash\n'
2211        c+='#PBS -N '+str(job.name)+'\n'
2212        c+='#PBS -l walltime='+job.pbs_walltime()+'\n'
2213        c+='#PBS -l nodes={0}:ppn={1}:xe\n'.format(job.nodes,job.ppn)
2214        c+='#PBS -o '+job.outfile+'\n'
2215        c+='#PBS -e '+job.errfile+'\n'
2216        if job.user_env:
2217            c+='#PBS -V\n'
2218        #end if
2219        c+='''
2220echo $PBS_O_WORKDIR
2221cd $PBS_O_WORKDIR
2222'''
2223        return c
2224    #end def write_job_header
2225#end class BlueWatersXE
2226
2227
2228
2229
2230class Titan(Supercomputer):
2231
2232    name = 'titan'
2233    requires_account = True
2234    batch_capable    = True
2235
2236    def write_job_header(self,job):
2237        if job.queue is None:
2238            job.queue = 'batch'
2239        #end if
2240        c= '#!/bin/bash\n'
2241        c+='#PBS -A {0}\n'.format(job.account)
2242        c+='#PBS -q {0}\n'.format(job.queue)
2243        c+='#PBS -N {0}\n'.format(job.name)
2244        c+='#PBS -o {0}\n'.format(job.outfile)
2245        c+='#PBS -e {0}\n'.format(job.errfile)
2246        c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime())
2247        c+='#PBS -l nodes={0}\n'.format(job.nodes)
2248        #c+='#PBS -l gres=widow3\n'
2249        c+='#PBS -l gres=atlas1\n'
2250        if job.user_env:
2251            c+='#PBS -V\n'
2252        #end if
2253        c+='''
2254echo $PBS_O_WORKDIR
2255cd $PBS_O_WORKDIR
2256'''
2257        return c
2258    #end def write_job_header
2259#end class Titan
2260
2261
2262
2263class EOS(Supercomputer):
2264
2265    name = 'eos'
2266    requires_account = True
2267    batch_capable    = True
2268
2269    def post_process_job(self,job):
2270        if job.threads>1:
2271            if job.threads<=8:
2272                job.run_options.add(ss='-ss')
2273            #end if
2274            job.run_options.add(cc='-cc numa_node')
2275        #end if
2276    #end def post_process_job
2277
2278
2279    def write_job_header(self,job):
2280        if job.queue is None:
2281            job.queue = 'batch'
2282        #end if
2283        c= '#!/bin/bash\n'
2284        c+='#PBS -A {0}\n'.format(job.account)
2285        c+='#PBS -q {0}\n'.format(job.queue)
2286        c+='#PBS -N {0}\n'.format(job.name)
2287        c+='#PBS -o {0}\n'.format(job.outfile)
2288        c+='#PBS -e {0}\n'.format(job.errfile)
2289        c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime())
2290        c+='#PBS -l nodes={0}\n'.format(job.nodes)
2291        c+='#PBS -l gres=atlas1\n'
2292        if job.user_env:
2293            c+='#PBS -V\n'
2294        #end if
2295        c+='''
2296echo $PBS_O_WORKDIR
2297cd $PBS_O_WORKDIR
2298'''
2299        return c
2300    #end def write_job_header
2301#end class EOS
2302
2303
2304
2305class ALCF_Machine(Supercomputer):
2306    requires_account   = True
2307    batch_capable      = True
2308    executable_subfile = True
2309
2310    prefixed_output    = True
2311    outfile_extension  = '.output'
2312    errfile_extension  = '.error'
2313
2314    base_partition = None
2315
2316    def post_process_job(self,job):
2317        job.sub_options.add(
2318            env  = '--env BG_SHAREDMEMSIZE=32',
2319            mode = '--mode script'
2320            )
2321        #if job.processes<job.nodes: # seems like a good idea, but breaks idempotency
2322        #    job.processes_per_node=1
2323        ##end if
2324        if job.nodes<self.base_partition:
2325            self.warn('!!! ATTENTION !!!\n  number of nodes on {0} should not be less than {1}\n  you requested: {2}'.format(self.name,self.base_partition,job.nodes))
2326        else:
2327            partition = log(float(job.nodes)/self.base_partition)/log(2.)
2328            if abs(partition-int(partition))>1e-6:
2329                self.warn('!!! ATTENTION !!!\n  number of nodes on {0} must be {1} times a power of two\n  you requested: {2}\n  nearby valid node count: {3}'.format(self.name,self.base_partition,job.nodes,self.base_partition*2**int(round(partition))))
2330            #end if
2331        #end if
2332        valid_ppn = (1,2,4,8,16,32,64)
2333        if job.processes_per_node is None:
2334            self.warn('job may not run properly\nplease specify processes_per_node in each job to be launched with runjob on {0}'.format(self.name))
2335        elif job.processes_per_node not in valid_ppn:
2336            self.warn('job may not run properly\nprocesses_per_node is not a valid value for {0}\nprocesses_per_node provided: {1}\nvalid options are: {2}'.format(self.name,job.processes_per_node,valid_ppn))
2337        #end if
2338    #end def post_process_job
2339
2340    def write_job_header(self,job):
2341        if job.queue is None:
2342            job.queue = 'default'
2343        #end if
2344        c= '#!/bin/bash\n'
2345        c+='#COBALT -q {0}\n'.format(job.queue)
2346        c+='#COBALT -A {0}\n'.format(job.account)
2347        c+='#COBALT -n {0}\n'.format(job.nodes)
2348        c+='#COBALT -t {0}\n'.format(job.total_minutes())
2349        c+='#COBALT -O {0}\n'.format(job.identifier)
2350        c+='\nLOCARGS="--block $COBALT_PARTNAME ${COBALT_CORNER:+--corner} $COBALT_CORNER ${COBALT_SHAPE:+--shape} $COBALT_SHAPE"\n'
2351        c+='echo "Cobalt location args: $LOCARGS" >&2\n\n'
2352        return c
2353    #end def write_job_header
2354#end class ALCF_Machine
2355
2356
2357class Vesta(ALCF_Machine):
2358    name = 'vesta'
2359    base_partition = 32
2360#end class Vesta
2361
2362class Cetus(ALCF_Machine):
2363    name = 'cetus'
2364    base_partition = 128
2365#end class Cetus
2366
2367class Mira(ALCF_Machine):
2368    name = 'mira'
2369    base_partition = 512
2370#end class Mira
2371
2372
2373
2374class Cooley(Supercomputer):
2375    name = 'cooley'
2376    requires_account   = True
2377    batch_capable      = True
2378    executable_subfile = True
2379
2380    prefixed_output    = True
2381    outfile_extension  = '.output'
2382    errfile_extension  = '.error'
2383
2384    def post_process_job(self,job):
2385        #if job.processes_per_node is None and job.threads!=1:
2386        #    self.error('threads must be 1,2,3,4,6, or 12 on Cooley\nyou provided: {0}'.format(job.threads))
2387        ##end if
2388
2389        #job.run_options.add(
2390            #f   = '-f $COBALT_NODEFILE',
2391            #ppn = '-ppn {0}'.format(job.processes_per_node),
2392            #)
2393        return
2394    #end def post_process_job
2395
2396    def write_job_header(self,job):
2397        if job.queue is None:
2398            job.queue = 'default'
2399        #end if
2400        c= '#!/bin/bash\n'
2401        c+='#COBALT -q {0}\n'.format(job.queue)
2402        c+='#COBALT -A {0}\n'.format(job.account)
2403        c+='#COBALT -n {0}\n'.format(job.nodes)
2404        c+='#COBALT -t {0}\n'.format(job.total_minutes())
2405        c+='#COBALT -O {0}\n'.format(job.identifier)
2406        return c
2407    #end def write_job_header
2408#end class Cooley
2409
2410
2411class Theta(Supercomputer):
2412    name = 'theta'
2413    requires_account   = True
2414    batch_capable      = True
2415    executable_subfile = True
2416
2417    prefixed_output    = True
2418    outfile_extension  = '.output'
2419    errfile_extension  = '.error'
2420
2421    def post_process_job(self,job):
2422        if job.hyperthreads is None:
2423            job.hyperthreads = 1
2424        #end if
2425        job.run_options.add(
2426            N  = '-N {0}'.format(job.processes_per_node),
2427            cc = '-cc depth',
2428            d  = '-d {0}'.format(job.threads),
2429            j  = '-j {0}'.format(job.hyperthreads),
2430            e  = '-e OMP_NUM_THREADS={0}'.format(job.threads),
2431            )
2432    #end def post_process_job
2433
2434    def write_job_header(self,job):
2435        if job.queue is None:
2436            job.queue = 'default'
2437        #end if
2438        c= '#!/bin/bash\n'
2439        c+='#COBALT -q {0}\n'.format(job.queue)
2440        c+='#COBALT -A {0}\n'.format(job.account)
2441        c+='#COBALT -n {0}\n'.format(job.nodes)
2442        c+='#COBALT -t {0}\n'.format(job.total_minutes())
2443        c+='#COBALT -O {0}\n'.format(job.identifier)
2444        c+='#COBALT --attrs mcdram=cache:numa=quad\n'
2445        return c
2446    #end def write_job_header
2447#end class Theta
2448
2449
2450
2451class Lonestar(Supercomputer):  # Lonestar contribution from Paul Young
2452
2453    name = 'lonestar' # will be converted to lowercase anyway
2454    requires_account = False
2455    batch_capable    = True
2456
2457    def write_job_header(self,job):
2458        if job.queue is None:
2459            job.queue = 'batch'
2460        #end if
2461        c= '#!/bin/bash\n'
2462        #c+='#$ -A {0}\n'.format(job.account)
2463        c+='#$ -q {0}\n'.format(job.queue)
2464        c+='#$ -N {0}\n'.format(job.name)
2465        c+='#$ -o {0}\n'.format(job.outfile)
2466        c+='#$ -e {0}\n'.format(job.errfile)
2467        c+='#$ -l h_rt={0}\n'.format(job.pbs_walltime())
2468        c+='#$ -pe 12way {0}\n'.format(job.nodes*12)
2469        c+='#$ -cwd\n'
2470        if job.user_env:
2471            c+='#$ -V\n'
2472        #end if
2473        return c
2474    #end def write_job_header
2475
2476
2477    def read_process_id(self,output):
2478        pid = None
2479        lines = output.splitlines()
2480
2481        for line in lines:
2482            if 'Your job' in line:
2483                spid = line.split(' ')[2]
2484                if spid.isdigit():
2485                    pid = int(spid)
2486                #end if
2487            #end if
2488        #end for
2489        return pid
2490    #end def read_process_id
2491#end class Lonestar
2492
2493
2494
2495class ICMP_Machine(Supercomputer): # ICMP and Amos contributions from Ryan McAvoy
2496    batch_capable      = True
2497    executable_subfile = True
2498
2499    prefixed_output    = True
2500    outfile_extension  = '.output'
2501    errfile_extension  = '.error'
2502
2503    def write_job_header(self,job):
2504        if job.queue is None:
2505            job.queue = 'defq'
2506        #end if
2507        c= '#!/bin/bash -x\n'
2508        c+='#SBATCH --export=ALL\n'
2509        c+='#SBATCH -J {0}\n'.format(job.identifier)
2510        c+='#SBATCH -p {0}\n'.format(job.queue)
2511        c+='#SBATCH -o {0}\n'.format(job.outfile)
2512        c+='#SBATCH -e {0}\n'.format(job.errfile)
2513        c+='#SBATCH --nodes {0}\n'.format(job.nodes)
2514        c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
2515        c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
2516        c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
2517        return c
2518    #end def write_job_header
2519#end class ICMP_Machine
2520
2521
2522class Komodo(ICMP_Machine):
2523    name = 'komodo'
2524#end class Komodo
2525
2526class Matisse(ICMP_Machine):
2527    name = 'matisse'
2528#end class Matisse
2529
2530
2531
2532class Amos(Supercomputer):
2533    name = 'amos'
2534
2535    #requires_account   = True
2536    batch_capable      = True
2537    executable_subfile = True
2538
2539    prefixed_output    = True
2540    outfile_extension  = '.output'
2541    errfile_extension  = '.error'
2542
2543    def write_job_header(self,job):
2544        if job.queue is None:
2545            job.queue = 'debug'
2546        #end if
2547        if job.queue == 'debug':
2548            base_partition = 1
2549            max_partition = 32
2550            max_time =1
2551        elif job.queue == 'small':
2552            base_partition = 1
2553            max_partition = 64
2554            max_time =24
2555        elif job.queue == 'medium':
2556            base_partition = 128
2557            max_partition = 512
2558            max_time =12
2559        elif job.queue == 'large':
2560            base_partition = 1024
2561            max_partition = 2048
2562            max_time =6
2563        elif job.queue == 'verylarge':
2564            base_partition = 3072
2565            max_partition = 4096
2566            max_time =6
2567        #end if
2568        job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
2569        if job.total_hours > max_time:
2570            self.warn('!!! ATTENTION !!!\n  the maximum runtime on {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_time,job.total_hours))
2571            job.hours   = max_time
2572            job.minutes =0
2573            job.seconds =0
2574        #end if
2575        if job.nodes<base_partition:
2576            self.warn('!!! ATTENTION !!!\n  number of nodes in {0} should not be less than {1}\n  you requested: {2}'.format(job.queue,base_partition,job.nodes))
2577        elif job.nodes>max_partition:
2578            self.warn('!!! ATTENTION !!!\n  number of nodes in {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_partition,job.nodes))
2579        else:
2580            if job.queue != 'verylarge':
2581                partition = log(float(job.nodes)/base_partition)/log(2.)
2582                if abs(partition-int(partition))>1e-6:
2583                    self.warn('!!! ATTENTION !!!\n  number of nodes on {0} must be {1} times a power of two\n  you requested: {2}\n  nearby valid node count: {3}'.format(self.name,base_partition,job.nodes,base_partition*2**int(round(partition))))
2584            elif job.nodes != 3072 and job.nodes != 4096:
2585                self.warn('!!! ATTENTION !!!\n  number of nodes on {0} must be 3072 or 4096 you requested {1}'.format(self.name,job.nodes))
2586            #end if
2587        #end if
2588
2589        c= '#!/bin/bash -x\n'
2590        c+='#SBATCH --export=ALL\n'
2591        #c+=#SBATCH -D /gpfs/sb/data/<project>/<user>/
2592        c+='#SBATCH -J {0}\n'.format(job.identifier)
2593        c+='#SBATCH -p {0}\n'.format(job.queue)
2594        c+='#SBATCH -o {0}\n'.format(job.outfile)
2595        c+='#SBATCH -e {0}\n'.format(job.errfile)
2596        c+='#SBATCH --nodes {0}\n'.format(job.nodes)
2597        c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
2598        c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
2599        c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
2600        # c+='#SBATCH --mail-type=ALL'
2601        # c+='#SBATCH --mail-user=<{0}>'
2602
2603        return c
2604    #end def write_job_header
2605#end class Amos
2606
2607
2608class SnlMachine(Supercomputer):
2609    requires_account   = True
2610    batch_capable      = True
2611    #executable_subfile = True
2612
2613    prefixed_output    = True
2614    outfile_extension  = '.output'
2615    errfile_extension  = '.error'
2616
2617    def write_job_header(self,job):
2618        if job.queue is None:
2619            job.queue='batch'
2620        #end if
2621
2622        cpus_per_task = int(floor(float(self.cores_per_node)/job.processes_per_node))
2623
2624        if job.qos == 'long':
2625            max_time = 96
2626        elif 'short' in job.queue:
2627            max_time = 4
2628        else:
2629            max_time = 48
2630        #end if
2631
2632        job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
2633        if job.total_hours > max_time:   # warn if job will take more than 48 hrs.
2634            if job.qos == 'long':
2635                self.warn('!!! ATTENTION !!!\n  the maximum runtime on {0} should not be more than {1} with --qos=\'long\'\n  you requested: {2}'.format(job.queue,max_time,job.total_hours))
2636            elif 'short' in job.queue:
2637                self.warn('!!! ATTENTION !!!\n  the maximum runtime on {0} should not be more than {1} with -p short[,batch]\n  you requested: {2}'.format(job.queue,max_time,job.total_hours))
2638            else:
2639                self.warn('!!! ATTENTION !!!\n  the maximum runtime on {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_time,job.total_hours))
2640            #end if
2641            job.hours   = max_time
2642            job.minutes = 0
2643            job.seconds = 0
2644        #end if
2645
2646        c='#!/bin/bash\n'
2647        c+='#SBATCH -p '+str(job.queue)+'\n'
2648        c+='#SBATCH --job-name '+str(job.name)+'\n'
2649        c+='#SBATCH --account='+str(job.account)+'\n'
2650        c+='#SBATCH -N '+str(job.nodes)+'\n'
2651        c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
2652        c+='#SBATCH --cpus-per-task={0}\n'.format(cpus_per_task)
2653        c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
2654        c+='#SBATCH -o {0}\n'.format(job.outfile)
2655        c+='#SBATCH -e {0}\n'.format(job.errfile)
2656        if job.qos:
2657            c+='#SBATCH --qos={}\n'.format(job.qos)
2658        c+='\n'
2659        return c
2660    #end def write_job_header
2661#end class SnlMachine
2662
2663class Chama(SnlMachine):
2664    name = 'chama'
2665#end class Chama
2666
2667class Skybridge(SnlMachine):
2668    name = 'skybridge'
2669#end class Skybridge
2670
2671class Eclipse(SnlMachine):
2672    name = 'eclipse'
2673#end class Eclipse
2674
2675class Attaway(SnlMachine):
2676    name = 'attaway'
2677#end class Attaway
2678
2679class Uno(SnlMachine):
2680    name = 'uno'
2681#end class Uno
2682
2683class Solo(SnlMachine):
2684    name = 'solo'
2685#end class Solo
2686
2687# machines at LRZ  https://www.lrz.de/english/
2688class SuperMUC(Supercomputer):
2689    name = 'supermuc'
2690    requires_account    = False
2691    batch_capable       = True
2692    query_with_username = False
2693
2694    def write_job_header(self,job):
2695        if job.queue is None:
2696            job.queue = 'general'
2697        #end if
2698        if job.type is None:
2699            job.type = 'MPICH'
2700        else:
2701            job.type = job.type.lower()
2702            if job.type=='mpich':
2703                job.type=job.type.upper()
2704            #end if
2705        #end if
2706        ibm   = job.type=='parallel'
2707        intel = job.type=='MPICH'
2708        omp   = isinstance(job.threads,int) and job.threads>1
2709        if not ibm and not intel:
2710            self.error('the only types of MPI supported are "parallel" and "MPICH"\nreceived MPI with type: {0}'.format(job.type))
2711        #end if
2712        c ='#!/bin/bash\n'
2713        c+='#@ job_name         = {0}\n'.format(job.name)
2714        c+='#@ job_type         = {0}\n'.format(job.type)
2715        c+='#@ class            = {0}\n'.format(job.queue)
2716        c+='#@ node             = {0}\n'.format(job.nodes)
2717        if job.nodes<512:
2718            icmin = 1
2719            icmax = 1
2720        else:
2721            icmin = int(job.nodes//512)+1
2722            icmax = icmin+1
2723        #end if
2724        c+='#@ island_count     = {0},{1}\n'.format(icmin,icmax)
2725        if intel and omp:
2726            c+='#@ tasks_per_node   = {0}\n'.format(job.processes_per_node)
2727        else:
2728            c+='#@ total_tasks      = {0}\n'.format(job.processes)
2729        #end if
2730        c+='#@ wall_clock_limit = {0}\n'.format(job.ll_walltime())
2731        c+='#@ network.MPI      = sn_all,not_shared,us\n'
2732        c+='#@ initialdir       = {0}\n'.format(job.abs_dir)
2733        c+='#@ output           = {0}\n'.format(job.outfile)
2734        c+='#@ error            = {0}\n'.format(job.errfile)
2735        c+='#@ energy_policy_tag = my_energy_tag\n'
2736        c+='#@ minimize_time_to_solution = yes\n'
2737        if job.email is None:
2738            c+='#@ notification     = never\n'
2739        else:
2740            c+='#@ notification     = always\n'
2741            c+='#@ notify_user      = {0}\n'.format(job.email)
2742        #end if
2743        c+='#@ queue\n'
2744        c+='. /etc/profile\n'
2745        c+='. /etc/profile.d/modules.sh\n'
2746        if ibm and omp:
2747            c+='export MP_SINGLE_THREAD=no\n'
2748            c+='export MP_TASK_AFFINITY=core:{0}\n'.format(job.threads)
2749        elif intel and not omp:
2750            c+='module unload mpi.ibm\n'
2751            c+='module load mpi.intel\n'
2752        elif intel and omp:
2753            c+='module unload mpi.ibm\n'
2754            c+='module load mpi.intel\n'
2755            c+='export OMP_NUM_THREADS={0}\n'.format(job.threads)
2756            #c+='module load mpi_pinning/hybrid_blocked\n'
2757        #end if
2758        return c
2759    #end def write_job_header
2760#end class SuperMUC
2761
2762
2763
2764class SuperMUC_NG(Supercomputer):
2765    name                = 'supermucng'
2766    requires_account    = True
2767    batch_capable       = True
2768    query_with_username = True
2769
2770    def write_job_header(self,job):
2771        if job.queue is None:
2772            job.queue = 'general'
2773        #end if
2774        if job.hyperthreads is None:
2775            job.hyperthreads = job.processes_per_node//48
2776            if job.hyperthreads==0:
2777                job.hyperthreads=None
2778            #end if
2779        #end if
2780        if job.constraint is None:
2781            job.contraint = 'scratch&work'
2782        #end if
2783        c ='#!/bin/bash\n'
2784        c+='#SBATCH --account={}\n'.format(job.account)
2785        c+='#SBATCH --partition={}\n'.format(job.queue)
2786        c+='#SBATCH -J {}\n'.format(job.name)
2787        c+='#SBATCH --time={}\n'.format(job.sbatch_walltime())
2788        c+='#SBATCH -o ./{}\n'.format(job.outfile)
2789        c+='#SBATCH -e ./{}\n'.format(job.errfile)
2790        if job.switches is not None:
2791            c+='#SBATCH --switches={}\n'.format(job.switches)
2792        #end if
2793        c+='#SBATCH --nodes={}\n'.format(job.nodes)
2794        c+='#SBATCH --ntasks-per-node={}\n'.format(job.processes_per_node)
2795        if job.ntasks_per_core is not None:
2796            c+='#SBATCH --ntasks-per-core={}\n'.format(job.ntasks_per_core)
2797        elif job.hyperthreads is not None:
2798            c+='#SBATCH --ntasks-per-core={}\n'.format(job.hyperthreads)
2799        #end if
2800        if not job.default_cpus_per_task:
2801            if job.cpus_per_task is None:
2802                c+='#SBATCH --cpus-per-task={}\n'.format(job.threads)
2803            else:
2804                c+='#SBATCH --cpus-per-task={}\n'.format(job.cpus_per_task)
2805            #end if
2806        #end if
2807        c+='#SBATCH -D ./\n'
2808        c+='#SBATCH --no-requeue\n'
2809        if job.constraint is not None:
2810            c+='#--constraint="{}"\n'.format(job.constraint)
2811        #end if
2812        if job.email is not None:
2813            c+='#SBATCH --mail-type=ALL\n'
2814            c+='#SBATCH --mail-user={}\n'.format(job.email)
2815        #end if
2816        c+='#SBATCH --export=NONE\n'
2817        if job.user_env:
2818            c+='#SBATCH --get-user-env\n'
2819        #end if
2820        return c
2821    #end def write_job_header
2822#end class SuperMUC_NG
2823
2824
2825
2826class Stampede2(Supercomputer):
2827    name = 'stampede2'
2828
2829    requires_account   = True
2830    batch_capable      = True
2831    #executable_subfile = True
2832
2833    prefixed_output    = True
2834    outfile_extension  = '.output'
2835    errfile_extension  = '.error'
2836
2837    def write_job_header(self,job):
2838        if job.queue is None:
2839            job.queue='normal'
2840        #end if
2841
2842        if job.queue == 'development':
2843            max_nodes = 16
2844            max_time = 2
2845        elif job.queue == 'normal':
2846            max_nodes = 256
2847            max_time = 48
2848        elif job.queue == 'large':
2849            max_nodes = 2048
2850            max_time = 48
2851        elif job.queue == 'long':
2852            max_nodes = 32
2853            max_time = 96
2854        elif job.queue == 'flat_quadrant':
2855            max_nodes = 24
2856            max_time = 48
2857        elif job.queue == 'skx-dev':
2858            max_nodes = 4
2859            max_time = 2
2860        elif job.queue == 'skx-normal':
2861            max_nodes = 128
2862            max_time = 48
2863        elif job.queue == 'skx-large':
2864            max_nodes = 868
2865            max_time = 48
2866        #end if
2867
2868        if 'skx' in job.queue:
2869            max_processes_per_node = 48
2870        else:
2871            max_processes_per_node = 68
2872        #end if
2873        job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
2874        if job.total_hours > max_time:
2875            self.warn('!!! ATTENTION !!!\n  the maximum runtime on {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_time,job.total_hours))
2876            job.hours   = max_time
2877            job.minutes =0
2878            job.seconds =0
2879        #end if
2880
2881        if job.nodes > max_nodes:
2882            self.warn('!!! ATTENTION !!!\n  the maximum nodes on {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_nodes,job.nodes))
2883            job.nodes = max_nodes
2884        #end if
2885
2886        if job.processes_per_node > max_processes_per_node:
2887            self.warn('!!! ATTENTION !!!\n  the maximum number of processes per node on {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_processes_per_node,job.processes_per_node))
2888            job.processes_per_node = max_processes_per_node
2889        #end if
2890
2891        c='#!/bin/bash\n'
2892        c+='#SBATCH --job-name '+str(job.name)+'\n'
2893        c+='#SBATCH --account='+str(job.account)+'\n'
2894        c+='#SBATCH -N '+str(job.nodes)+'\n'
2895        c+='#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
2896        c+='#SBATCH --cpus-per-task={0}\n'.format(job.threads)
2897        c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
2898        c+='#SBATCH -o {0}\n'.format(job.outfile)
2899        c+='#SBATCH -e {0}\n'.format(job.errfile)
2900        c+='#SBATCH -p {0}\n'.format(job.queue)
2901        c+='\n'
2902        return c
2903    #end def write_job_header
2904#end class Stampede2
2905
2906
2907
2908class CadesMoab(Supercomputer):
2909    name = 'cades_moab'
2910    requires_account = True
2911    batch_capable    = True
2912
2913    def post_process_job(self,job):
2914        ppn = job.processes_per_node
2915        if job.threads>1 and ppn is not None and ppn>1:
2916            processes_per_socket = int(floor(job.processes_per_node/2))
2917            job.run_options.add(npersocket='--npersocket {0}'.format(processes_per_socket))
2918        #end if
2919    #end def post_process_job
2920
2921    def write_job_header(self,job):
2922        if job.queue is None:
2923            job.queue = 'skylake'
2924        #end if
2925        if job.qos is None:
2926            job.qos = 'std'
2927        #end if
2928        if job.group_list is None:
2929            job.group_list = 'cades-'+job.account
2930        #end if
2931        c= '#!/bin/bash\n'
2932        c+='#PBS -A {0}\n'.format(job.account)
2933        c+='#PBS -W group_list={0}\n'.format(job.group_list)
2934        c+='#PBS -q {0}\n'.format(job.queue)
2935        c+='#PBS -N {0}\n'.format(job.name)
2936        c+='#PBS -o {0}\n'.format(job.outfile)
2937        c+='#PBS -e {0}\n'.format(job.errfile)
2938        c+='#PBS -l qos={0}\n'.format(job.qos) # This could be qos=burst as well, but then it can be cancelled by others
2939        c+='#PBS -l walltime={0}\n'.format(job.pbs_walltime())
2940        c+='#PBS -l nodes={0}:ppn={1}\n'.format(job.nodes, job.ppn)
2941        c+='''
2942echo $PBS_O_WORKDIR
2943cd $PBS_O_WORKDIR
2944'''
2945        return c
2946    #end def write_job_header
2947#end class CadesMoab
2948
2949
2950
2951class CadesSlurm(Supercomputer):
2952    name = 'cades'
2953    requires_account = True
2954    batch_capable    = True
2955
2956    def write_job_header(self,job):
2957        if job.queue is None:
2958            job.queue = 'skylake'
2959        #end if
2960
2961        c  = '#!/bin/bash\n'
2962        c += '#SBATCH -A {}\n'.format(job.account)
2963        c += '#SBATCH -p {}\n'.format(job.queue)
2964        c += '#SBATCH -J {}\n'.format(job.name)
2965        c += '#SBATCH -t {}\n'.format(job.sbatch_walltime())
2966        c += '#SBATCH -N {}\n'.format(job.nodes)
2967        c += '#SBATCH --ntasks-per-node={0}\n'.format(job.processes_per_node)
2968        c += '#SBATCH --cpus-per-task={0}\n'.format(job.threads)
2969        c += '#SBATCH --mem=0\n' # required on Cades
2970        c += '#SBATCH -o '+job.outfile+'\n'
2971        c += '#SBATCH -e '+job.errfile+'\n'
2972        c += '#SBATCH --exclusive\n'
2973        if job.user_env:
2974            c += '#SBATCH --export=ALL\n'   # equiv to PBS -V
2975        else:
2976            c += '#SBATCH --export=NONE\n'
2977        #end if
2978
2979        return c
2980    #end def write_job_header
2981#end class CadesSlurm
2982
2983
2984
2985class Summit(Supercomputer):
2986
2987    name = 'summit'
2988    requires_account = True
2989    batch_capable    = True
2990
2991    def post_process_job(self,job):
2992        # add the options only if the user has not supplied options
2993        if len(job.run_options)==0:
2994            opt = obj(
2995                launch_dist = '-d packed',
2996                bind        = '-b rs',
2997                )
2998            if job.gpus is None:
2999                job.gpus = 6 # gpus to use per node
3000            #end if
3001            if job.alloc_flags is None:
3002                job.alloc_flags = 'smt1'
3003            #end if
3004            if job.gpus==0:
3005                if job.processes%2==0:
3006                    resource_sets_per_node = 2
3007                else:
3008                    resource_sets_per_node = 1
3009                #end if
3010                nrs   = job.nodes*resource_sets_per_node
3011                pprs  = job.processes_per_node//resource_sets_per_node
3012                gpurs = 0
3013            else:
3014                ppn = job.processes_per_node
3015                if ppn is None:
3016                    self.warn('job may not run properly on Summit\nat least one mpi process should be present for each node\nplease check the generated bsub file for correctness')
3017                    ppn = 0
3018                #end if
3019                if ppn%job.gpus!=0:
3020                    self.warn('job may not run properly on Summit\nprocesses per node should divide evenly into number of gpus requested\nprocesses per node requested: {0}\ngpus per node requested: {1}\nplease check the generated bsub file for correctness'.format(job.processes_per_node,job.gpus))
3021                #end if
3022                resource_sets_per_node = job.gpus
3023                nrs   = job.nodes*resource_sets_per_node
3024                pprs  = ppn//resource_sets_per_node
3025                gpurs = 1
3026            #end if
3027            opt.set(
3028                resource_sets= '-n {0}'.format(nrs),
3029                rs_per_node  = '-r {0}'.format(resource_sets_per_node),
3030                tasks_per_rs = '-a {0}'.format(pprs),
3031                cpus_per_rs  = '-c {0}'.format(pprs*job.threads),
3032                gpus_per_rs  = '-g {0}'.format(gpurs),
3033                )
3034            job.run_options.add(**opt)
3035        #end if
3036    #end def post_process_job
3037
3038
3039    def write_job_header(self,job):
3040        c ='#!/bin/bash\n'
3041        c+='#BSUB -P {0}\n'.format(job.account)
3042        c+='#BSUB -J {0}\n'.format(job.name)
3043        c+='#BSUB -o {0}\n'.format(job.outfile)
3044        c+='#BSUB -e {0}\n'.format(job.errfile)
3045        c+='#BSUB -W {0}\n'.format(job.lsf_walltime())
3046        c+='#BSUB -nnodes {0}\n'.format(job.nodes)
3047        if job.alloc_flags is not None:
3048            c+='#BSUB -alloc_flags "{0}"\n'.format(job.alloc_flags)
3049        #end if
3050        return c
3051    #end def write_job_header
3052
3053
3054    def read_process_id(self,output):
3055        pid = None
3056        tokens = output.split()
3057        for t in tokens:
3058            if t.startswith('<'):
3059                spid = t.strip('<>').strip()
3060                if spid.isdigit():
3061                    pid = int(spid)
3062                    break
3063                #end if
3064            #end if
3065        #end for
3066        return pid
3067    #end def read_process_id
3068#end class Summit
3069
3070
3071## Added 28/11/2019 by A Zen
3072class Rhea(Supercomputer):
3073
3074    name = 'rhea'
3075    requires_account   = True
3076    batch_capable      = True
3077    #executable_subfile = True
3078    prefixed_output    = True
3079    outfile_extension  = '.output'
3080    errfile_extension  = '.error'
3081
3082    def post_process_job(self,job):
3083        job.run_options.add(
3084            N='-N {}'.format(job.nodes),
3085            n='-n {}'.format(job.processes),
3086            )
3087        if job.threads>1:
3088            job.run_options.add(
3089                c = '-c {}'.format(job.threads),
3090                )
3091            if 'cpu_bind' not in job.run_options:
3092                if job.processes_per_node==self.cores_per_node:
3093                    cpu_bind = '--cpu-bind=threads'
3094                else:
3095                    cpu_bind = '--cpu-bind=cores'
3096                #end if
3097                job.run_options.add(
3098                    cpu_bind = cpu_bind
3099                    )
3100            #end if
3101        #end if
3102    #end def post_process_job
3103
3104    def write_job_header(self,job):
3105        if job.queue is None:
3106            job.queue='batch'
3107        #end if
3108        base_partition = None
3109        max_partition = 384
3110        if job.nodes <= 16:
3111            max_time = 48
3112        elif job.nodes <= 64:
3113            max_time = 36
3114        else:
3115            max_time = 3
3116        job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
3117        if job.total_hours > max_time:   # warn if job will take more than 96 hrs.
3118            self.warn('!!! ATTENTION !!!\n  the maximum runtime on {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_time,job.total_hours))
3119            job.hours   = max_time
3120            job.minutes =0
3121            job.seconds =0
3122        #end if
3123
3124        c='#!/bin/bash\n'
3125        c+='#SBATCH --job-name '+str(job.name)+'\n'
3126        c+='#SBATCH --account='+str(job.account)+'\n'
3127        c+='#SBATCH -N '+str(job.nodes)+'\n'
3128        c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
3129        c+='#SBATCH -o {0}\n'.format(job.outfile)
3130        c+='#SBATCH -e {0}\n'.format(job.errfile)
3131        if job.email is not None:
3132            c+='#SBATCH --mail-user {}\n'.format(job.email)
3133            c+='#SBATCH --mail-type ALL\n'
3134            #c+='#SBATCH --mail-type FAIL\n'
3135        #end if
3136        c+='\n'
3137        c+='cd $SLURM_SUBMIT_DIR\n'
3138        c+='\n'
3139        c+='echo JobID : $SLURM_JOBID \n'
3140        c+='echo Number of nodes requested: $SLURM_JOB_NUM_NODES \n'
3141        c+='echo List of nodes assigned to the job: $SLURM_NODELIST \n'
3142        c+='\n'
3143        return c
3144    #end def write_job_header
3145#end class Rhea
3146
3147
3148## Added 19/03/2021 by A Zen
3149class Andes(Supercomputer):
3150
3151    name = 'andes'
3152    requires_account   = True
3153    batch_capable      = True
3154    #executable_subfile = True
3155    prefixed_output    = True
3156    outfile_extension  = '.output'
3157    errfile_extension  = '.error'
3158
3159    def post_process_job(self,job):
3160        job.run_options.add(
3161            N='-N {}'.format(job.nodes),
3162            n='-n {}'.format(job.processes),
3163            )
3164        if job.threads>1:
3165            job.run_options.add(
3166                c = '-c {}'.format(job.threads),
3167                )
3168            if 'cpu_bind' not in job.run_options:
3169                if job.processes_per_node==self.cores_per_node:
3170                    cpu_bind = '--cpu-bind=threads'
3171                else:
3172                    cpu_bind = '--cpu-bind=cores'
3173                #end if
3174                job.run_options.add(
3175                    cpu_bind = cpu_bind
3176                    )
3177            #end if
3178        #end if
3179    #end def post_process_job
3180
3181    def write_job_header(self,job):
3182        if job.queue is None:
3183            job.queue='batch'
3184        #end if
3185        base_partition = None
3186        max_partition = 384
3187        if job.nodes <= 16:
3188            max_time = 48
3189        elif job.nodes <= 64:
3190            max_time = 36
3191        else:
3192            max_time = 3
3193        job.total_hours = job.days*24 + job.hours + job.minutes/60.0 + job.seconds/3600.0
3194        if job.total_hours > max_time:   # warn if job will take more than 96 hrs.
3195            self.warn('!!! ATTENTION !!!\n  the maximum runtime on {0} should not be more than {1}\n  you requested: {2}'.format(job.queue,max_time,job.total_hours))
3196            job.hours   = max_time
3197            job.minutes =0
3198            job.seconds =0
3199        #end if
3200
3201        c='#!/bin/bash\n'
3202        c+='#SBATCH --job-name '+str(job.name)+'\n'
3203        c+='#SBATCH --account='+str(job.account)+'\n'
3204        c+='#SBATCH -N '+str(job.nodes)+'\n'
3205        c+='#SBATCH -t {0}:{1}:{2}\n'.format(str(job.hours+24*job.days).zfill(2),str(job.minutes).zfill(2),str(job.seconds).zfill(2))
3206        c+='#SBATCH -o {0}\n'.format(job.outfile)
3207        c+='#SBATCH -e {0}\n'.format(job.errfile)
3208        if job.email is not None:
3209            c+='#SBATCH --mail-user {}\n'.format(job.email)
3210            c+='#SBATCH --mail-type ALL\n'
3211            #c+='#SBATCH --mail-type FAIL\n'
3212        #end if
3213        c+='\n'
3214        c+='cd $SLURM_SUBMIT_DIR\n'
3215        c+='\n'
3216        c+='echo JobID : $SLURM_JOBID \n'
3217        c+='echo Number of nodes requested: $SLURM_JOB_NUM_NODES \n'
3218        c+='echo List of nodes assigned to the job: $SLURM_NODELIST \n'
3219        c+='\n'
3220        return c
3221    #end def write_job_header
3222#end class Andes
3223
3224
3225class Tomcat3(Supercomputer):
3226    name             = 'tomcat3'
3227    requires_account = False
3228    batch_capable    = True
3229    redirect_output  = True
3230
3231    def write_job_header(self,job):
3232        if job.queue is None:
3233            job.queue = 'tomcat'
3234        #end if
3235        c = '#!/bin/bash -l\n'
3236        c+='#SBATCH -J {}\n'.format(job.name)
3237        c+='#SBATCH -N {}\n'.format(job.nodes)
3238        c+='#SBATCH -t {}\n'.format(job.sbatch_walltime())
3239        c+='#SBATCH -p {}\n'.format(job.queue)
3240        if job.email is not None:
3241            c+='#SBATCH --mail-user {}\n'.format(job.email)
3242            c+='#SBATCH --mail-type ALL\n'
3243        #end if
3244        c+='#. /home/rcohen/.bashrc\n'
3245        if len(job.presub)==0:
3246            c+='unalias cd; source /mnt/beegfs/intel/parallel_studio_xe_2019.3.062/bin/psxevars.sh\n'
3247            c+='ulimit -a\n'
3248        #end if
3249        return c
3250    #end def write_job_header
3251#end class Tomcat3
3252
3253
3254
3255
3256#Known machines
3257#  workstations
3258for cores in range(1,128+1):
3259    Workstation('ws'+str(cores),cores,'mpirun'),
3260#end for
3261#  supercomputers and clusters
3262#            nodes sockets cores ram qslots  qlaunch  qsubmit     qstatus    qdelete
3263Jaguar(      18688,   2,     8,   32,  100,  'aprun',     'qsub',   'qstat',    'qdel')
3264Kraken(       9408,   2,     6,   16,  100,  'aprun',     'qsub',   'qstat',    'qdel')
3265Golub(          512,  2,     6,   32, 1000, 'mpirun',     'qsub',   'qstat',    'qdel')
3266OIC5(           28,   2,    16,  128, 1000, 'mpirun',     'qsub',   'qstat',    'qdel')
3267Edison(        664,   2,    12,   64,  100,   'srun',   'sbatch',  'squeue', 'scancel')
3268Cori(         9688,   1,    68,   96,  100,   'srun',   'sbatch',  'squeue', 'scancel')
3269BlueWatersXK( 3072,   1,    16,   32,  100,  'aprun',     'qsub',   'qstat',    'qdel')
3270BlueWatersXE(22640,   2,    16,   64,  100,  'aprun',     'qsub',   'qstat',    'qdel')
3271Titan(       18688,   1,    16,   32,  100,  'aprun',     'qsub',   'qstat',    'qdel')
3272EOS(           744,   2,     8,   64, 1000,  'aprun',     'qsub',   'qstat',    'qdel')
3273Vesta(        2048,   1,    16,   16,   10, 'runjob',     'qsub',  'qstata',    'qdel')
3274Cetus(        1024,   1,    16,   16,   10, 'runjob',     'qsub',  'qstata',    'qdel')
3275Mira(        49152,   1,    16,   16,   10, 'runjob',     'qsub',  'qstata',    'qdel')
3276Cooley(        126,   2,     6,  384,   10, 'mpirun',     'qsub',  'qstata',    'qdel')
3277Theta(        4392,   1,    64,  192, 1000,  'aprun',     'qsub',  'qstata',    'qdel')
3278Lonestar(    22656,   2,     6,   12,  128,  'ibrun',     'qsub',   'qstat',    'qdel')
3279Matisse(        20,   2,     8,   64,    2, 'mpirun',   'sbatch',   'sacct', 'scancel')
3280Komodo(         24,   2,     6,   48,    2, 'mpirun',   'sbatch',   'sacct', 'scancel')
3281Amos(         5120,   1,    16,   16,  128,   'srun',   'sbatch',   'sacct', 'scancel')
3282Chama(        1232,   2,     8,   64, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3283Uno(           168,   2,     8,  128, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3284Eclipse(      1488,   2,    18,  128, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3285Attaway(      1488,   2,    18,  192, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3286Skybridge(    1848,   2,     8,   64, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3287Solo(          374,   2,    18,  128, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3288SuperMUC(      512,   1,    28,  256,    8,'mpiexec', 'llsubmit',     'llq','llcancel')
3289Stampede2(    4200,   1,    68,   96,   50,  'ibrun',   'sbatch',  'squeue', 'scancel')
3290CadesMoab(     156,   2,    18,  128,  100, 'mpirun',     'qsub',   'qstat',    'qdel')
3291CadesSlurm(    156,   2,    18,  128,  100, 'mpirun',   'sbatch',  'squeue', 'scancel')
3292Summit(       4608,   2,    21,  512,  100,  'jsrun',     'bsub',   'bjobs',   'bkill')
3293Rhea(          512,   2,     8,  128, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3294Andes(         704,   2,    16,  256, 1000,   'srun',   'sbatch',  'squeue', 'scancel')
3295Tomcat3(         8,   1,    64,  192, 1000, 'mpirun',   'sbatch',   'sacct', 'scancel')
3296SuperMUC_NG(  6336,   1,    48,   96, 1000,'mpiexec',   'sbatch',   'sacct', 'scancel')
3297
3298
3299#machine accessor functions
3300get_machine_name = Machine.get_hostname
3301get_machine      = Machine.get
3302
3303#rename Job with lowercase
3304job=Job
3305
3306
3307
3308
3309