1# Run 'dispycosnode.py' program to start processes to execute computations sent 2# by this client, along with this program. 3 4# This example illustrates in-memory processing with 'server_available' to read 5# date in to memory by each (remote) server process. Remote tasks ('compute' in 6# this case) then process data in memory. This example works with POSIX (Linux, 7# OS X etc.) and Windows. Note that, as data is read in to each server process, 8# a node may have multiple copies of same data in memory of each process on that 9# node, so this approach is not practical / efficient when data is large. See 10# 'dispycos_client9_node.py' which uses 'node_available' and 'node_setup' to 11# read data in to memory at node (and thus only one copy is in memory). 12 13# In this example different files are sent to remote servers to compute checksum 14# of their data (thus there is no duplicate data in servers at a node in this 15# case). 16 17import pycos 18import pycos.netpycos 19from pycos.dispycos import * 20 21 22def server_available(location, data_file, task=None): 23 # 'server_available' is executed locally (at client) when a server process 24 # is available. 'location' is Location instance of server. When this task is 25 # executed, 'depends' of computation would've been transferred. data_file 26 # could've been sent with the computation 'depends'; however, to illustrate 27 # how files can be sent separately (to distribute data fragments among 28 # servers), files are transferred to servers in this example 29 30 print(' Sending %s to %s' % (data_file, location)) 31 if (yield pycos.Pycos().send_file(location, data_file, timeout=5, overwrite=True)) < 0: 32 print('Could not send data file "%s" to %s' % (data_file, location)) 33 return(-1) 34 35 # 'setup_server' is executed on remote server at 'location' with argument 36 # data_file 37 yield computation.enable_server(location, data_file) 38 return(0) 39 40 41# 'setup_server' is executed at remote server process to read the data in given 42# file (transferred by client) in to memory (global variable). 'compute' then 43# uses the data in memory instead of reading from file every time. 44def setup_server(data_file, task=None): # executed on remote server 45 # variables declared as 'global' will be available in tasks for read/write 46 # to all computations on a server. 47 global hashlib, data, file_name 48 import os, hashlib 49 file_name = data_file 50 print('%s processing %s' % (task.location, data_file)) 51 # note that files transferred to server are in the directory where 52 # computations are executed (cf 'node_setup' in dispycos_client9_node.py) 53 with open(data_file, 'rb') as fd: 54 data = fd.read() 55 os.remove(data_file) # data_file is not needed anymore 56 # generator functions must have at least one 'yield' 57 yield 0 # indicate successful initialization with exit value 0 58 59 60# 'compute' is executed at remote server process repeatedly to compute checksum 61# of data in memory, initialized by 'setup_server' 62def compute(alg, n, task=None): 63 global data, hashlib, file_name 64 yield task.sleep(n) 65 checksum = getattr(hashlib, alg)() 66 checksum.update(data) 67 return((file_name, alg, checksum.hexdigest())) 68 69 70# local task to process status messages from scheduler 71def status_proc(task=None): 72 task.set_daemon() 73 i = 0 74 while 1: 75 msg = yield task.receive() 76 if not isinstance(msg, DispycosStatus): 77 continue 78 if msg.status == Scheduler.ServerDiscovered: 79 pycos.Task(server_available, msg.info, data_files[i]) 80 i += 1 81 82 83def client_proc(computation, task=None): 84 if (yield computation.schedule()): 85 raise Exception('Could not schedule computation') 86 87 # execute 10 jobs (tasks) and get their results. Note that number of jobs 88 # created can be more than number of server processes available; the 89 # scheduler will use as many processes as necessary/available, running one 90 # job at a server process 91 algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512'] 92 args = [(algorithms[i % len(algorithms)], random.uniform(5, 10)) for i in range(15)] 93 results = yield computation.run_results(compute, args) 94 for i, result in enumerate(results): 95 if isinstance(result, tuple) and len(result) == 3: 96 print(' %ssum for %s: %s' % (result[1], result[0], result[2])) 97 else: 98 print(' rtask failed for %s: %s' % (args[i][0], str(result))) 99 100 yield computation.close() 101 102 103if __name__ == '__main__': 104 import sys, os, random, glob 105 pycos.logger.setLevel(pycos.Logger.DEBUG) 106 # PyPI / pip packaging adjusts assertion below for Python 3.7+ 107 if sys.version_info.major == 3: 108 assert sys.version_info.minor >= 7, \ 109 ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' % 110 (__file__, sys.version_info.major, sys.version_info.minor)) 111 112 if os.path.dirname(sys.argv[0]): 113 os.chdir(os.path.dirname(sys.argv[0])) 114 # if scheduler is not already running (on a node as a program), start 115 # private scheduler: 116 Scheduler() 117 118 data_files = glob.glob('dispycos_client*.py') 119 120 # send 'compute' generator function; the client sends data files when server 121 # is discovered (to illustrate how client can distribute data). 122 computation = Computation([compute], status_task=pycos.Task(status_proc), 123 disable_servers=True, server_setup=setup_server) 124 pycos.Task(client_proc, computation) 125