1# When jobs are submitted, they are stored in scheduler's queue. Thus, jobs'
2# arguments are kept in memory until all references to jobs and DispyJob
3# instances (which also store arguments) are gone from both dispy scheduler and
4# client program (if it saves DispyJob instances). This can take up memory at
5# the client, causing issues especially when submitting many jobs, or jobs with
6# large arguments.
7
8# Instead of submitting all jobs at once, this program submits between
9# 'lower_bound' and 'upper_bound' jobs. If number of pending jobs drops below
10# 'lower_bound', more jobs are submitted, up to 'upper_bound'. Adjust bounds as
11# appropriate; e.g., 'lower_bound' should be at least as many CPUs available in
12# the cluster, and 'uppper_bound' to, say, 3x that. Use NodeAlloate or
13# cluster_status callback to dynamically update bounds depending on available
14# CPUs in cluster.
15
16# Note also that submitting even not that many jobs but with large arguemnts
17# (e.g., arrays, lists) can also be a problem with memory. In that case,
18# consider saving argument data in a file and use 'dispy_job_depends' to send
19# the file and have computation load from that file.
20
21def compute(n):  # executed on nodes
22    import time
23    time.sleep(n)
24    return n
25
26# dispy calls this function to indicate change in job status
27def job_callback(job): # executed at the client
28    global pending_jobs, jobs_cond
29    if (job.status == dispy.DispyJob.Finished  # most usual case
30        or job.status in (dispy.DispyJob.Terminated, dispy.DispyJob.Cancelled,
31                          dispy.DispyJob.Abandoned)):
32        # 'pending_jobs' is shared between two threads, so access it with
33        # 'jobs_cond' (see below)
34        jobs_cond.acquire()
35        if job.id: # job may have finished before 'main' assigned id
36            pending_jobs.pop(job.id)
37            # dispy.logger.info('job "%s" done with %s: %s', job.id, job.result, len(pending_jobs))
38            if len(pending_jobs) <= lower_bound:
39                jobs_cond.notify()
40        jobs_cond.release()
41
42if __name__ == '__main__':
43    import dispy, threading, random, logging
44
45    # set lower and upper bounds as appropriate; assuming there are 30
46    # processors in a cluster, bounds are set to 50 to 100
47    lower_bound, upper_bound = 50, 100
48    # use Condition variable to protect access to pending_jobs, as
49    # 'job_callback' is executed in another thread
50    jobs_cond = threading.Condition()
51    cluster = dispy.JobCluster(compute, callback=job_callback, loglevel=logging.INFO)
52    pending_jobs = {}
53    # submit 1000 jobs
54    for i in range(1000):
55        job = cluster.submit(random.uniform(3, 7))
56        jobs_cond.acquire()
57        # there is a chance the job may have finished and job_callback called by
58        # this time, so put it in 'pending_jobs' only if job is pending
59        if job.status == dispy.DispyJob.Created or job.status == dispy.DispyJob.Running:
60            pending_jobs[i] = job
61            # dispy.logger.info('job "%s" submitted: %s', i, len(pending_jobs))
62            if len(pending_jobs) >= upper_bound:
63                while len(pending_jobs) > lower_bound:
64                    jobs_cond.wait()
65        jobs_cond.release()
66
67    cluster.wait()
68    cluster.print_status()
69    cluster.close()
70