1# Simple program that distributes 'delegate' function' to execute different 2# computations with jobs. In this example, client sends a function with each job 3# and 'delegate' executes that function. 4 5# 'delegate' is sent to nodes with cluster initialization 6def delegate(func_name, n): 7 # get function with given name (this function would've been sent with 8 # 'dispy_job_depends' and available in global scope) 9 func = globals()[func_name] 10 return func(n) 11 12# in this case two different functions (the only difference is in return value) 13# are sent with jobs and executed at nodes (with 'delegate') 14def func1(n): 15 import time 16 time.sleep(n) 17 return (dispy_node_name + ': func1', n) 18 19def func2(n): 20 import time 21 time.sleep(n) 22 return (dispy_node_name + ': func2', n) 23 24if __name__ == '__main__': 25 import dispy, random, time 26 # above functions can be sent with 'depends' so they are available for jobs 27 # always; instead, here, requird function is sent with 'dispy_job_depends' 28 # to illustrate how to send functions with 'submit' (dynamically) 29 cluster = dispy.JobCluster(delegate, loglevel=dispy.logger.DEBUG) 30 jobs = [] 31 for i in range(4): 32 # run above functions (computations) alternately 33 if i % 2 == 0: 34 func = func1 35 else: 36 func = func2 37 # send function with 'dispy_job_depends'; this function is specific to 38 # this job - it is discarded when job is over 39 job = cluster.submit(func.__name__, random.randint(5, 10), dispy_job_depends=[func]) 40 if not job: 41 print('Failed to create job %s' % i) 42 continue 43 jobs.append(job) 44 45 for job in jobs: 46 host, n = job() # waits for job to finish and returns results 47 print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n)) 48 # other fields of 'job' that may be useful: 49 # print(job.stdout, job.stderr, job.exception, job.ip_addr, job.start_time, job.end_time) 50 cluster.print_status() 51