1# Example program that uses 'setup' and 'cleanup' functions to 2# initialize/de-initialize global variables on each node before 3# computations are executed. Computations use data in global variables 4# instead of reading input for each job. 5 6# Under Windows global variables must be serializable, so modules 7# can't be global variables: See 8# https://docs.python.org/2/library/multiprocessing.html#windows for 9# details. 10 11def setup(data_file): 12 # read data in file to global variable 13 global data, algorithms, hashlib 14 15 import hashlib 16 data = open(data_file).read() # read file in to memory; data_file can now be deleted 17 if sys.version_info.major > 2: 18 data = data.encode() # convert to bytes 19 algorithms = list(hashlib.algorithms_guaranteed) 20 else: 21 algorithms = hashlib.algorithms 22 # if running under Windows, modules can't be global, as they are not 23 # serializable; instead, they must be loaded in 'compute' (jobs); under 24 # Posix (Linux, OS X and other Unix variants), modules declared global in 25 # 'setup' will be available in 'compute' 26 27 # 'os' module is already available (loaded by dispynode) 28 if os.name == 'nt': # remove modules under Windows 29 del hashlib 30 return 0 31 32def cleanup(): 33 global data, algorithms, hashlib 34 del data, algorithms 35 if os.name != 'nt': 36 del hashlib 37 38def compute(n): 39 global hashlib 40 if os.name == 'nt': # Under Windows modules must be loaded in jobs 41 import hashlib 42 # 'data' and 'algorithms' global variables are initialized in 'setup' 43 alg = algorithms[n % len(algorithms)] 44 csum = getattr(hashlib, alg)() 45 csum.update(data) 46 return (alg, csum.hexdigest()) 47 48if __name__ == '__main__': 49 import dispy, sys, functools 50 # if no data file name is given, use this file as data file 51 data_file = sys.argv[1] if len(sys.argv) > 1 else sys.argv[0] 52 cluster = dispy.JobCluster(compute, depends=[data_file], 53 setup=functools.partial(setup, data_file), cleanup=cleanup) 54 jobs = [] 55 for n in range(10): 56 job = cluster.submit(n) 57 jobs.append(job) 58 59 for job in jobs: 60 job() 61 if job.status == dispy.DispyJob.Finished: 62 print('%s: %s : %s' % (job.id, job.result[0], job.result[1])) 63 else: 64 print(job.exception) 65 cluster.print_status() 66 cluster.close() 67