1# When jobs are submitted, they are stored in scheduler's queue. Thus, jobs' 2# arguments are kept in memory until all references to jobs and DispyJob 3# instances (which also store arguments) are gone from both dispy scheduler and 4# client program (if it saves DispyJob instances). This can take up memory at 5# the client, causing issues especially when submitting many jobs, or jobs with 6# large arguments. 7 8# Instead of submitting all jobs at once, this program submits between 9# 'lower_bound' and 'upper_bound' jobs. If number of pending jobs drops below 10# 'lower_bound', more jobs are submitted, up to 'upper_bound'. Adjust bounds as 11# appropriate; e.g., 'lower_bound' should be at least as many CPUs available in 12# the cluster, and 'uppper_bound' to, say, 3x that. Use NodeAlloate or 13# cluster_status callback to dynamically update bounds depending on available 14# CPUs in cluster. 15 16# Note also that submitting even not that many jobs but with large arguemnts 17# (e.g., arrays, lists) can also be a problem with memory. In that case, 18# consider saving argument data in a file and use 'dispy_job_depends' to send 19# the file and have computation load from that file. 20 21def compute(n): # executed on nodes 22 import time 23 time.sleep(n) 24 return n 25 26# dispy calls this function to indicate change in job status 27def job_callback(job): # executed at the client 28 global pending_jobs, jobs_cond 29 if (job.status == dispy.DispyJob.Finished # most usual case 30 or job.status in (dispy.DispyJob.Terminated, dispy.DispyJob.Cancelled, 31 dispy.DispyJob.Abandoned)): 32 # 'pending_jobs' is shared between two threads, so access it with 33 # 'jobs_cond' (see below) 34 jobs_cond.acquire() 35 if job.id: # job may have finished before 'main' assigned id 36 pending_jobs.pop(job.id) 37 # dispy.logger.info('job "%s" done with %s: %s', job.id, job.result, len(pending_jobs)) 38 if len(pending_jobs) <= lower_bound: 39 jobs_cond.notify() 40 jobs_cond.release() 41 42if __name__ == '__main__': 43 import dispy, threading, random, logging 44 45 # set lower and upper bounds as appropriate; assuming there are 30 46 # processors in a cluster, bounds are set to 50 to 100 47 lower_bound, upper_bound = 50, 100 48 # use Condition variable to protect access to pending_jobs, as 49 # 'job_callback' is executed in another thread 50 jobs_cond = threading.Condition() 51 cluster = dispy.JobCluster(compute, callback=job_callback, loglevel=logging.INFO) 52 pending_jobs = {} 53 # submit 1000 jobs 54 for i in range(1000): 55 job = cluster.submit(random.uniform(3, 7)) 56 jobs_cond.acquire() 57 # there is a chance the job may have finished and job_callback called by 58 # this time, so put it in 'pending_jobs' only if job is pending 59 if job.status == dispy.DispyJob.Created or job.status == dispy.DispyJob.Running: 60 pending_jobs[i] = job 61 # dispy.logger.info('job "%s" submitted: %s', i, len(pending_jobs)) 62 if len(pending_jobs) >= upper_bound: 63 while len(pending_jobs) > lower_bound: 64 jobs_cond.wait() 65 jobs_cond.release() 66 67 cluster.wait() 68 cluster.print_status() 69 cluster.close() 70