1# Benchmark three methods of using PyTables with multiple processes, where data
2# is read from a PyTables file in one process and then sent to another
3#
4# 1. using multiprocessing.Pipe
5# 2. using a memory mapped file that's shared between two processes, passed as
6#    out argument to tables.Array.read.
7# 3. using a Unix domain socket (this uses the "abstract namespace" and will
8#    work only on Linux).
9# 4. using an IPv4 socket
10#
11# In all three cases, an array is loaded from a file in one process, sent to
12# another, and then modified by incrementing each array element.  This is meant
13# to simulate retrieving data and then modifying it.
14
15from __future__ import division
16from __future__ import print_function
17from __future__ import unicode_literals
18
19import multiprocessing
20import os
21import random
22import select
23import socket
24import time
25
26import numpy as np
27import tables
28
29
30# create a PyTables file with a single int64 array with the specified number of
31# elements
32def create_file(array_size):
33    array = np.ones(array_size, dtype='i8')
34    with tables.open_file('test.h5', 'w') as fobj:
35        array = fobj.create_array('/', 'test', array)
36        print('file created, size: {0} MB'.format(array.size_on_disk / 1e6))
37
38
39# process to receive an array using a multiprocessing.Pipe connection
40class PipeReceive(multiprocessing.Process):
41
42    def __init__(self, receiver_pipe, result_send):
43        super(PipeReceive, self).__init__()
44        self.receiver_pipe = receiver_pipe
45        self.result_send = result_send
46
47    def run(self):
48        # block until something is received on the pipe
49        array = self.receiver_pipe.recv()
50        recv_timestamp = time.time()
51        # perform an operation on the received array
52        array += 1
53        finish_timestamp = time.time()
54        assert(np.all(array == 2))
55        # send the measured timestamps back to the originating process
56        self.result_send.send((recv_timestamp, finish_timestamp))
57
58
59def read_and_send_pipe(send_type, array_size):
60    # set up Pipe objects to send the actual array to the other process
61    # and receive the timing results from the other process
62    array_recv, array_send = multiprocessing.Pipe(False)
63    result_recv, result_send = multiprocessing.Pipe(False)
64    # start the other process and pause to allow it to start up
65    recv_process = PipeReceive(array_recv, result_send)
66    recv_process.start()
67    time.sleep(0.15)
68    with tables.open_file('test.h5', 'r') as fobj:
69        array = fobj.get_node('/', 'test')
70        start_timestamp = time.time()
71        # read an array from the PyTables file and send it to the other process
72        output = array.read(0, array_size, 1)
73        array_send.send(output)
74        assert(np.all(output + 1 == 2))
75        # receive the timestamps from the other process
76        recv_timestamp, finish_timestamp = result_recv.recv()
77    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
78    recv_process.join()
79
80
81# process to receive an array using a shared memory mapped file
82# for real use, this would require creating some protocol to specify the
83# array's data type and shape
84class MemmapReceive(multiprocessing.Process):
85
86    def __init__(self, path_recv, result_send):
87        super(MemmapReceive, self).__init__()
88        self.path_recv = path_recv
89        self.result_send = result_send
90
91    def run(self):
92        # block until the memmap file path is received from the other process
93        path = self.path_recv.recv()
94        # create a memmap array using the received file path
95        array = np.memmap(path, 'i8', 'r+')
96        recv_timestamp = time.time()
97        # perform an operation on the array
98        array += 1
99        finish_timestamp = time.time()
100        assert(np.all(array == 2))
101        # send the timing results back to the other process
102        self.result_send.send((recv_timestamp, finish_timestamp))
103
104
105def read_and_send_memmap(send_type, array_size):
106    # create a multiprocessing Pipe that will be used to send the memmap
107    # file path to the receiving process
108    path_recv, path_send = multiprocessing.Pipe(False)
109    result_recv, result_send = multiprocessing.Pipe(False)
110    # start the receiving process and pause to allow it to start up
111    recv_process = MemmapReceive(path_recv, result_send)
112    recv_process.start()
113    time.sleep(0.15)
114    with tables.open_file('test.h5', 'r') as fobj:
115        array = fobj.get_node('/', 'test')
116        start_timestamp = time.time()
117        # memmap a file as a NumPy array in 'overwrite' mode
118        output = np.memmap('/tmp/array1', 'i8', 'w+', shape=(array_size, ))
119        # read an array from a PyTables file into the memmory mapped array
120        array.read(0, array_size, 1, out=output)
121        # use a multiprocessing.Pipe to send the file's path to the receiving
122        # process
123        path_send.send('/tmp/array1')
124        # receive the timestamps from the other process
125        recv_timestamp, finish_timestamp = result_recv.recv()
126        # because 'output' is shared between processes, all elements should now
127        # be equal to 2
128        assert(np.all(output == 2))
129    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
130    recv_process.join()
131
132
133# process to receive an array using a socket
134# for real use, this would require creating some protocol to specify the
135# array's data type and shape
136class SocketReceive(multiprocessing.Process):
137
138    def __init__(self, socket_family, address, result_send, array_nbytes):
139        super(SocketReceive, self).__init__()
140        self.socket_family = socket_family
141        self.address = address
142        self.result_send = result_send
143        self.array_nbytes = array_nbytes
144
145    def run(self):
146        # create the socket, listen for a connection and use select to block
147        # until a connection is made
148        sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
149        sock.bind(self.address)
150        sock.listen(1)
151        readable, _, _ = select.select([sock], [], [])
152        # accept the connection and read the sent data into a bytearray
153        connection = sock.accept()[0]
154        recv_buffer = bytearray(self.array_nbytes)
155        view = memoryview(recv_buffer)
156        bytes_recv = 0
157        while bytes_recv < self.array_nbytes:
158            bytes_recv += connection.recv_into(view[bytes_recv:])
159        # convert the bytearray into a NumPy array
160        array = np.frombuffer(recv_buffer, dtype='i8')
161        recv_timestamp = time.time()
162        # perform an operation on the received array
163        array += 1
164        finish_timestamp = time.time()
165        assert(np.all(array == 2))
166        # send the timestamps back to the originating process
167        self.result_send.send((recv_timestamp, finish_timestamp))
168        connection.close()
169        sock.close()
170
171
172def unix_socket_address():
173    # create a Unix domain address in the abstract namespace
174    # this will only work on Linux
175    return b'\x00' + os.urandom(5)
176
177
178def ipv4_socket_address():
179    # create an IPv4 socket address
180    return ('127.0.0.1', random.randint(9000, 10000))
181
182
183def read_and_send_socket(send_type, array_size, array_bytes, address_func,
184                         socket_family):
185    address = address_func()
186    # start the receiving process and pause to allow it to start up
187    result_recv, result_send = multiprocessing.Pipe(False)
188    recv_process = SocketReceive(socket_family, address, result_send,
189                                 array_bytes)
190    recv_process.start()
191    time.sleep(0.15)
192    with tables.open_file('test.h5', 'r') as fobj:
193        array = fobj.get_node('/', 'test')
194        start_timestamp = time.time()
195        # connect to the receiving process' socket
196        sock = socket.socket(socket_family, socket.SOCK_STREAM)
197        sock.connect(address)
198        # read the array from the PyTables file and send its
199        # data buffer to the receiving process
200        output = array.read(0, array_size, 1)
201        sock.send(output.data)
202        assert(np.all(output + 1 == 2))
203        # receive the timestamps from the other process
204        recv_timestamp, finish_timestamp = result_recv.recv()
205    sock.close()
206    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
207    recv_process.join()
208
209
210def print_results(send_type, start_timestamp, recv_timestamp,
211                  finish_timestamp):
212    msg = 'type: {0}\t receive: {1:5.5f}, add:{2:5.5f}, total: {3:5.5f}'
213    print(msg.format(send_type,
214                     recv_timestamp - start_timestamp,
215                     finish_timestamp - recv_timestamp,
216                     finish_timestamp - start_timestamp))
217
218
219if __name__ == '__main__':
220
221    random.seed(os.urandom(2))
222    array_num_bytes = [int(x) for x in [1e5, 1e6, 1e7, 1e8]]
223
224    for array_bytes in array_num_bytes:
225        array_size = int(array_bytes // 8)
226
227        create_file(array_size)
228        read_and_send_pipe('multiproc.Pipe', array_size)
229        read_and_send_memmap('memmap     ', array_size)
230        # comment out this line to run on an OS other than Linux
231        read_and_send_socket('Unix socket', array_size, array_bytes,
232                             unix_socket_address, socket.AF_UNIX)
233        read_and_send_socket('IPv4 socket', array_size, array_bytes,
234                             ipv4_socket_address, socket.AF_INET)
235        print()
236