1# Run 'dispycosnode.py' program to start processes to execute computations sent
2# by this client, along with this program.
3
4# This example illustrates in-memory processing with 'server_available' to read
5# date in to memory by each (remote) server process. Remote tasks ('compute' in
6# this case) then process data in memory. This example works with POSIX (Linux,
7# OS X etc.) and Windows. Note that, as data is read in to each server process,
8# a node may have multiple copies of same data in memory of each process on that
9# node, so this approach is not practical / efficient when data is large. See
10# 'dispycos_client9_node.py' which uses 'node_available' and 'node_setup' to
11# read data in to memory at node (and thus only one copy is in memory).
12
13# In this example different files are sent to remote servers to compute checksum
14# of their data (thus there is no duplicate data in servers at a node in this
15# case).
16
17import pycos
18import pycos.netpycos
19from pycos.dispycos import *
20
21
22def server_available(location, data_file, task=None):
23    # 'server_available' is executed locally (at client) when a server process
24    # is available. 'location' is Location instance of server. When this task is
25    # executed, 'depends' of computation would've been transferred.  data_file
26    # could've been sent with the computation 'depends'; however, to illustrate
27    # how files can be sent separately (to distribute data fragments among
28    # servers), files are transferred to servers in this example
29
30    print('  Sending %s to %s' % (data_file, location))
31    if (yield pycos.Pycos().send_file(location, data_file, timeout=5, overwrite=True)) < 0:
32        print('Could not send data file "%s" to %s' % (data_file, location))
33        raise StopIteration(-1)
34
35    # 'setup_server' is executed on remote server at 'location' with argument
36    # data_file
37    yield computation.enable_server(location, data_file)
38    raise StopIteration(0)
39
40
41# 'setup_server' is executed at remote server process to read the data in given
42# file (transferred by client) in to memory (global variable). 'compute' then
43# uses the data in memory instead of reading from file every time.
44def setup_server(data_file, task=None):  # executed on remote server
45    # variables declared as 'global' will be available in tasks for read/write
46    # to all computations on a server.
47    global hashlib, data, file_name
48    import os, hashlib
49    file_name = data_file
50    print('%s processing %s' % (task.location, data_file))
51    # note that files transferred to server are in the directory where
52    # computations are executed (cf 'node_setup' in dispycos_client9_node.py)
53    with open(data_file, 'rb') as fd:
54        data = fd.read()
55    os.remove(data_file)  # data_file is not needed anymore
56    # generator functions must have at least one 'yield'
57    yield 0  # indicate successful initialization with exit value 0
58
59
60# 'compute' is executed at remote server process repeatedly to compute checksum
61# of data in memory, initialized by 'setup_server'
62def compute(alg, n, task=None):
63    global data, hashlib, file_name
64    yield task.sleep(n)
65    checksum = getattr(hashlib, alg)()
66    checksum.update(data)
67    raise StopIteration((file_name, alg, checksum.hexdigest()))
68
69
70# local task to process status messages from scheduler
71def status_proc(task=None):
72    task.set_daemon()
73    i = 0
74    while 1:
75        msg = yield task.receive()
76        if not isinstance(msg, DispycosStatus):
77            continue
78        if msg.status == Scheduler.ServerDiscovered:
79            pycos.Task(server_available, msg.info, data_files[i])
80            i += 1
81
82
83def client_proc(computation, task=None):
84    if (yield computation.schedule()):
85        raise Exception('Could not schedule computation')
86
87    # execute 10 jobs (tasks) and get their results. Note that number of jobs
88    # created can be more than number of server processes available; the
89    # scheduler will use as many processes as necessary/available, running one
90    # job at a server process
91    algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
92    args = [(algorithms[i % len(algorithms)], random.uniform(5, 10)) for i in range(15)]
93    results = yield computation.run_results(compute, args)
94    for i, result in enumerate(results):
95        if isinstance(result, tuple) and len(result) == 3:
96            print('    %ssum for %s: %s' % (result[1], result[0], result[2]))
97        else:
98            print('  rtask failed for %s: %s' % (args[i][0], str(result)))
99
100    yield computation.close()
101
102
103if __name__ == '__main__':
104    import sys, os, random, glob
105    pycos.logger.setLevel(pycos.Logger.DEBUG)
106    # PyPI / pip packaging adjusts assertion below for Python 3.7+
107    if sys.version_info.major == 3:
108        assert sys.version_info.minor < 7, \
109            ('"%s" is not suitable for Python version %s.%s; use file installed by pip instead' %
110             (__file__, sys.version_info.major, sys.version_info.minor))
111
112    if os.path.dirname(sys.argv[0]):
113        os.chdir(os.path.dirname(sys.argv[0]))
114    # if scheduler is not already running (on a node as a program), start
115    # private scheduler:
116    Scheduler()
117
118    data_files = glob.glob('dispycos_client*.py')
119
120    # send 'compute' generator function; the client sends data files when server
121    # is discovered (to illustrate how client can distribute data).
122    computation = Computation([compute], status_task=pycos.Task(status_proc),
123                              disable_servers=True, server_setup=setup_server)
124    pycos.Task(client_proc, computation)
125