1# Benchmark three methods of using PyTables with multiple processes, where data 2# is read from a PyTables file in one process and then sent to another 3# 4# 1. using multiprocessing.Pipe 5# 2. using a memory mapped file that's shared between two processes, passed as 6# out argument to tables.Array.read. 7# 3. using a Unix domain socket (this uses the "abstract namespace" and will 8# work only on Linux). 9# 4. using an IPv4 socket 10# 11# In all three cases, an array is loaded from a file in one process, sent to 12# another, and then modified by incrementing each array element. This is meant 13# to simulate retrieving data and then modifying it. 14 15from __future__ import division 16from __future__ import print_function 17from __future__ import unicode_literals 18 19import multiprocessing 20import os 21import random 22import select 23import socket 24import time 25 26import numpy as np 27import tables 28 29 30# create a PyTables file with a single int64 array with the specified number of 31# elements 32def create_file(array_size): 33 array = np.ones(array_size, dtype='i8') 34 with tables.open_file('test.h5', 'w') as fobj: 35 array = fobj.create_array('/', 'test', array) 36 print('file created, size: {0} MB'.format(array.size_on_disk / 1e6)) 37 38 39# process to receive an array using a multiprocessing.Pipe connection 40class PipeReceive(multiprocessing.Process): 41 42 def __init__(self, receiver_pipe, result_send): 43 super(PipeReceive, self).__init__() 44 self.receiver_pipe = receiver_pipe 45 self.result_send = result_send 46 47 def run(self): 48 # block until something is received on the pipe 49 array = self.receiver_pipe.recv() 50 recv_timestamp = time.time() 51 # perform an operation on the received array 52 array += 1 53 finish_timestamp = time.time() 54 assert(np.all(array == 2)) 55 # send the measured timestamps back to the originating process 56 self.result_send.send((recv_timestamp, finish_timestamp)) 57 58 59def read_and_send_pipe(send_type, array_size): 60 # set up Pipe objects to send the actual array to the other process 61 # and receive the timing results from the other process 62 array_recv, array_send = multiprocessing.Pipe(False) 63 result_recv, result_send = multiprocessing.Pipe(False) 64 # start the other process and pause to allow it to start up 65 recv_process = PipeReceive(array_recv, result_send) 66 recv_process.start() 67 time.sleep(0.15) 68 with tables.open_file('test.h5', 'r') as fobj: 69 array = fobj.get_node('/', 'test') 70 start_timestamp = time.time() 71 # read an array from the PyTables file and send it to the other process 72 output = array.read(0, array_size, 1) 73 array_send.send(output) 74 assert(np.all(output + 1 == 2)) 75 # receive the timestamps from the other process 76 recv_timestamp, finish_timestamp = result_recv.recv() 77 print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp) 78 recv_process.join() 79 80 81# process to receive an array using a shared memory mapped file 82# for real use, this would require creating some protocol to specify the 83# array's data type and shape 84class MemmapReceive(multiprocessing.Process): 85 86 def __init__(self, path_recv, result_send): 87 super(MemmapReceive, self).__init__() 88 self.path_recv = path_recv 89 self.result_send = result_send 90 91 def run(self): 92 # block until the memmap file path is received from the other process 93 path = self.path_recv.recv() 94 # create a memmap array using the received file path 95 array = np.memmap(path, 'i8', 'r+') 96 recv_timestamp = time.time() 97 # perform an operation on the array 98 array += 1 99 finish_timestamp = time.time() 100 assert(np.all(array == 2)) 101 # send the timing results back to the other process 102 self.result_send.send((recv_timestamp, finish_timestamp)) 103 104 105def read_and_send_memmap(send_type, array_size): 106 # create a multiprocessing Pipe that will be used to send the memmap 107 # file path to the receiving process 108 path_recv, path_send = multiprocessing.Pipe(False) 109 result_recv, result_send = multiprocessing.Pipe(False) 110 # start the receiving process and pause to allow it to start up 111 recv_process = MemmapReceive(path_recv, result_send) 112 recv_process.start() 113 time.sleep(0.15) 114 with tables.open_file('test.h5', 'r') as fobj: 115 array = fobj.get_node('/', 'test') 116 start_timestamp = time.time() 117 # memmap a file as a NumPy array in 'overwrite' mode 118 output = np.memmap('/tmp/array1', 'i8', 'w+', shape=(array_size, )) 119 # read an array from a PyTables file into the memmory mapped array 120 array.read(0, array_size, 1, out=output) 121 # use a multiprocessing.Pipe to send the file's path to the receiving 122 # process 123 path_send.send('/tmp/array1') 124 # receive the timestamps from the other process 125 recv_timestamp, finish_timestamp = result_recv.recv() 126 # because 'output' is shared between processes, all elements should now 127 # be equal to 2 128 assert(np.all(output == 2)) 129 print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp) 130 recv_process.join() 131 132 133# process to receive an array using a socket 134# for real use, this would require creating some protocol to specify the 135# array's data type and shape 136class SocketReceive(multiprocessing.Process): 137 138 def __init__(self, socket_family, address, result_send, array_nbytes): 139 super(SocketReceive, self).__init__() 140 self.socket_family = socket_family 141 self.address = address 142 self.result_send = result_send 143 self.array_nbytes = array_nbytes 144 145 def run(self): 146 # create the socket, listen for a connection and use select to block 147 # until a connection is made 148 sock = socket.socket(self.socket_family, socket.SOCK_STREAM) 149 sock.bind(self.address) 150 sock.listen(1) 151 readable, _, _ = select.select([sock], [], []) 152 # accept the connection and read the sent data into a bytearray 153 connection = sock.accept()[0] 154 recv_buffer = bytearray(self.array_nbytes) 155 view = memoryview(recv_buffer) 156 bytes_recv = 0 157 while bytes_recv < self.array_nbytes: 158 bytes_recv += connection.recv_into(view[bytes_recv:]) 159 # convert the bytearray into a NumPy array 160 array = np.frombuffer(recv_buffer, dtype='i8') 161 recv_timestamp = time.time() 162 # perform an operation on the received array 163 array += 1 164 finish_timestamp = time.time() 165 assert(np.all(array == 2)) 166 # send the timestamps back to the originating process 167 self.result_send.send((recv_timestamp, finish_timestamp)) 168 connection.close() 169 sock.close() 170 171 172def unix_socket_address(): 173 # create a Unix domain address in the abstract namespace 174 # this will only work on Linux 175 return b'\x00' + os.urandom(5) 176 177 178def ipv4_socket_address(): 179 # create an IPv4 socket address 180 return ('127.0.0.1', random.randint(9000, 10000)) 181 182 183def read_and_send_socket(send_type, array_size, array_bytes, address_func, 184 socket_family): 185 address = address_func() 186 # start the receiving process and pause to allow it to start up 187 result_recv, result_send = multiprocessing.Pipe(False) 188 recv_process = SocketReceive(socket_family, address, result_send, 189 array_bytes) 190 recv_process.start() 191 time.sleep(0.15) 192 with tables.open_file('test.h5', 'r') as fobj: 193 array = fobj.get_node('/', 'test') 194 start_timestamp = time.time() 195 # connect to the receiving process' socket 196 sock = socket.socket(socket_family, socket.SOCK_STREAM) 197 sock.connect(address) 198 # read the array from the PyTables file and send its 199 # data buffer to the receiving process 200 output = array.read(0, array_size, 1) 201 sock.send(output.data) 202 assert(np.all(output + 1 == 2)) 203 # receive the timestamps from the other process 204 recv_timestamp, finish_timestamp = result_recv.recv() 205 sock.close() 206 print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp) 207 recv_process.join() 208 209 210def print_results(send_type, start_timestamp, recv_timestamp, 211 finish_timestamp): 212 msg = 'type: {0}\t receive: {1:5.5f}, add:{2:5.5f}, total: {3:5.5f}' 213 print(msg.format(send_type, 214 recv_timestamp - start_timestamp, 215 finish_timestamp - recv_timestamp, 216 finish_timestamp - start_timestamp)) 217 218 219if __name__ == '__main__': 220 221 random.seed(os.urandom(2)) 222 array_num_bytes = [int(x) for x in [1e5, 1e6, 1e7, 1e8]] 223 224 for array_bytes in array_num_bytes: 225 array_size = int(array_bytes // 8) 226 227 create_file(array_size) 228 read_and_send_pipe('multiproc.Pipe', array_size) 229 read_and_send_memmap('memmap ', array_size) 230 # comment out this line to run on an OS other than Linux 231 read_and_send_socket('Unix socket', array_size, array_bytes, 232 unix_socket_address, socket.AF_UNIX) 233 read_and_send_socket('IPv4 socket', array_size, array_bytes, 234 ipv4_socket_address, socket.AF_INET) 235 print() 236