1# Simple program that distributes 'delegate' function' to execute different
2# computations with jobs. In this example, client sends a function with each job
3# and 'delegate' executes that function.
4
5# 'delegate' is sent to nodes with cluster initialization
6def delegate(func_name, n):
7    # get function with given name (this function would've been sent with
8    # 'dispy_job_depends' and available in global scope)
9    func = globals()[func_name]
10    return func(n)
11
12# in this case two different functions (the only difference is in return value)
13# are sent with jobs and executed at nodes (with 'delegate')
14def func1(n):
15    import time
16    time.sleep(n)
17    return (dispy_node_name + ': func1', n)
18
19def func2(n):
20    import time
21    time.sleep(n)
22    return (dispy_node_name + ': func2', n)
23
24if __name__ == '__main__':
25    import dispy, random, time
26    # above functions can be sent with 'depends' so they are available for jobs
27    # always; instead, here, requird function is sent with 'dispy_job_depends'
28    # to illustrate how to send functions with 'submit' (dynamically)
29    cluster = dispy.JobCluster(delegate, loglevel=dispy.logger.DEBUG)
30    jobs = []
31    for i in range(4):
32        # run above functions (computations) alternately
33        if i % 2 == 0:
34            func = func1
35        else:
36            func = func2
37        # send function with 'dispy_job_depends'; this function is specific to
38        # this job - it is discarded when job is over
39        job = cluster.submit(func.__name__, random.randint(5, 10), dispy_job_depends=[func])
40        if not job:
41            print('Failed to create job %s' % i)
42            continue
43        jobs.append(job)
44
45    for job in jobs:
46        host, n = job() # waits for job to finish and returns results
47        print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
48        # other fields of 'job' that may be useful:
49        # print(job.stdout, job.stderr, job.exception, job.ip_addr, job.start_time, job.end_time)
50    cluster.print_status()
51