1"""Example showing how to access a PyTables file from multiple processes using 2queues.""" 3 4from __future__ import print_function 5import sys 6 7if sys.version < '3': 8 import Queue as queue 9else: 10 import queue 11 12import multiprocessing 13import os 14import random 15import time 16 17import numpy 18import tables 19 20 21# this creates an HDF5 file with one array containing n rows 22def make_file(file_path, n): 23 24 with tables.open_file(file_path, 'w') as fobj: 25 array = fobj.create_carray('/', 'array', tables.Int64Atom(), (n, n)) 26 for i in range(n): 27 array[i, :] = i 28 29 30# All access to the file goes through a single instance of this class. 31# It contains several queues that are used to communicate with other 32# processes. 33# The read_queue is used for requests to read data from the HDF5 file. 34# A list of result_queues is used to send data back to client processes. 35# The write_queue is used for requests to modify the HDF5 file. 36# One end of a pipe (shutdown) is used to signal the process to terminate. 37class FileAccess(multiprocessing.Process): 38 39 def __init__(self, h5_path, read_queue, result_queues, write_queue, 40 shutdown): 41 self.h5_path = h5_path 42 self.read_queue = read_queue 43 self.result_queues = result_queues 44 self.write_queue = write_queue 45 self.shutdown = shutdown 46 self.block_period = .01 47 super(FileAccess, self).__init__() 48 49 def run(self): 50 self.h5_file = tables.open_file(self.h5_path, 'r+') 51 self.array = self.h5_file.get_node('/array') 52 another_loop = True 53 while another_loop: 54 55 # Check if the process has received the shutdown signal. 56 if self.shutdown.poll(): 57 another_loop = False 58 59 # Check for any data requests in the read_queue. 60 try: 61 row_num, proc_num = self.read_queue.get( 62 True, self.block_period) 63 # look up the appropriate result_queue for this data processor 64 # instance 65 result_queue = self.result_queues[proc_num] 66 print('processor {0} reading from row {1}'.format(proc_num, 67 row_num)) 68 result_queue.put(self.read_data(row_num)) 69 another_loop = True 70 except queue.Empty: 71 pass 72 73 # Check for any write requests in the write_queue. 74 try: 75 row_num, data = self.write_queue.get(True, self.block_period) 76 print('writing row', row_num) 77 self.write_data(row_num, data) 78 another_loop = True 79 except queue.Empty: 80 pass 81 82 # close the HDF5 file before shutting down 83 self.h5_file.close() 84 85 def read_data(self, row_num): 86 return self.array[row_num, :] 87 88 def write_data(self, row_num, data): 89 self.array[row_num, :] = data 90 91 92# This class represents a process that does work by reading and writing to the 93# HDF5 file. It does this by sending requests to the FileAccess class instance 94# through its read and write queues. The data results are sent back through 95# the result_queue. 96# Its actions are logged to a text file. 97class DataProcessor(multiprocessing.Process): 98 99 def __init__(self, read_queue, result_queue, write_queue, proc_num, 100 array_size, output_file): 101 self.read_queue = read_queue 102 self.result_queue = result_queue 103 self.write_queue = write_queue 104 self.proc_num = proc_num 105 self.array_size = array_size 106 self.output_file = output_file 107 super(DataProcessor, self).__init__() 108 109 def run(self): 110 self.output_file = open(self.output_file, 'w') 111 # read a random row from the file 112 row_num = random.randint(0, self.array_size - 1) 113 self.read_queue.put((row_num, self.proc_num)) 114 self.output_file.write(str(row_num) + '\n') 115 self.output_file.write(str(self.result_queue.get()) + '\n') 116 117 # modify a random row to equal 11 * (self.proc_num + 1) 118 row_num = random.randint(0, self.array_size - 1) 119 new_data = (numpy.zeros((1, self.array_size), 'i8') + 120 11 * (self.proc_num + 1)) 121 self.write_queue.put((row_num, new_data)) 122 123 # pause, then read the modified row 124 time.sleep(0.015) 125 self.read_queue.put((row_num, self.proc_num)) 126 self.output_file.write(str(row_num) + '\n') 127 self.output_file.write(str(self.result_queue.get()) + '\n') 128 self.output_file.close() 129 130 131# this function starts the FileAccess class instance and 132# sets up all the queues used to communicate with it 133def make_queues(num_processors): 134 read_queue = multiprocessing.Queue() 135 write_queue = multiprocessing.Queue() 136 shutdown_recv, shutdown_send = multiprocessing.Pipe(False) 137 result_queues = [multiprocessing.Queue() for i in range(num_processors)] 138 file_access = FileAccess(file_path, read_queue, result_queues, write_queue, 139 shutdown_recv) 140 file_access.start() 141 return read_queue, result_queues, write_queue, shutdown_send 142 143 144if __name__ == '__main__': 145 146 file_path = 'test.h5' 147 n = 10 148 make_file(file_path, n) 149 150 num_processors = 3 151 (read_queue, result_queues, 152 write_queue, shutdown_send) = make_queues(num_processors) 153 154 processors = [] 155 output_files = [] 156 for i in range(num_processors): 157 result_queue = result_queues[i] 158 output_file = str(i) 159 processor = DataProcessor(read_queue, result_queue, write_queue, i, n, 160 output_file) 161 processors.append(processor) 162 output_files.append(output_file) 163 164 # start all DataProcessor instances 165 for processor in processors: 166 processor.start() 167 168 # wait for all DataProcessor instances to finish 169 for processor in processors: 170 processor.join() 171 172 # shut down the FileAccess instance 173 shutdown_send.send(0) 174 175 # print out contents of log files and delete them 176 print() 177 for output_file in output_files: 178 print() 179 print('contents of log file {0}'.format(output_file)) 180 print(open(output_file, 'r').read()) 181 os.remove(output_file) 182 183 os.remove('test.h5') 184