1"""Example showing how to access a PyTables file from multiple processes using
2queues."""
3
4from __future__ import print_function
5import sys
6
7if sys.version < '3':
8    import Queue as queue
9else:
10    import queue
11
12import multiprocessing
13import os
14import random
15import time
16
17import numpy
18import tables
19
20
21# this creates an HDF5 file with one array containing n rows
22def make_file(file_path, n):
23
24    with tables.open_file(file_path, 'w') as fobj:
25        array = fobj.create_carray('/', 'array', tables.Int64Atom(), (n, n))
26        for i in range(n):
27            array[i, :] = i
28
29
30# All access to the file goes through a single instance of this class.
31# It contains several queues that are used to communicate with other
32# processes.
33# The read_queue is used for requests to read data from the HDF5 file.
34# A list of result_queues is used to send data back to client processes.
35# The write_queue is used for requests to modify the HDF5 file.
36# One end of a pipe (shutdown) is used to signal the process to terminate.
37class FileAccess(multiprocessing.Process):
38
39    def __init__(self, h5_path, read_queue, result_queues, write_queue,
40                 shutdown):
41        self.h5_path = h5_path
42        self.read_queue = read_queue
43        self.result_queues = result_queues
44        self.write_queue = write_queue
45        self.shutdown = shutdown
46        self.block_period = .01
47        super(FileAccess, self).__init__()
48
49    def run(self):
50        self.h5_file = tables.open_file(self.h5_path, 'r+')
51        self.array = self.h5_file.get_node('/array')
52        another_loop = True
53        while another_loop:
54
55            # Check if the process has received the shutdown signal.
56            if self.shutdown.poll():
57                another_loop = False
58
59            # Check for any data requests in the read_queue.
60            try:
61                row_num, proc_num = self.read_queue.get(
62                    True, self.block_period)
63                # look up the appropriate result_queue for this data processor
64                # instance
65                result_queue = self.result_queues[proc_num]
66                print('processor {0} reading from row {1}'.format(proc_num,
67                                                                  row_num))
68                result_queue.put(self.read_data(row_num))
69                another_loop = True
70            except queue.Empty:
71                pass
72
73            # Check for any write requests in the write_queue.
74            try:
75                row_num, data = self.write_queue.get(True, self.block_period)
76                print('writing row', row_num)
77                self.write_data(row_num, data)
78                another_loop = True
79            except queue.Empty:
80                pass
81
82        # close the HDF5 file before shutting down
83        self.h5_file.close()
84
85    def read_data(self, row_num):
86        return self.array[row_num, :]
87
88    def write_data(self, row_num, data):
89        self.array[row_num, :] = data
90
91
92# This class represents a process that does work by reading and writing to the
93# HDF5 file.  It does this by sending requests to the FileAccess class instance
94# through its read and write queues.  The data results are sent back through
95# the result_queue.
96# Its actions are logged to a text file.
97class DataProcessor(multiprocessing.Process):
98
99    def __init__(self, read_queue, result_queue, write_queue, proc_num,
100                 array_size, output_file):
101        self.read_queue = read_queue
102        self.result_queue = result_queue
103        self.write_queue = write_queue
104        self.proc_num = proc_num
105        self.array_size = array_size
106        self.output_file = output_file
107        super(DataProcessor, self).__init__()
108
109    def run(self):
110        self.output_file = open(self.output_file, 'w')
111        # read a random row from the file
112        row_num = random.randint(0, self.array_size - 1)
113        self.read_queue.put((row_num, self.proc_num))
114        self.output_file.write(str(row_num) + '\n')
115        self.output_file.write(str(self.result_queue.get()) + '\n')
116
117        # modify a random row to equal 11 * (self.proc_num + 1)
118        row_num = random.randint(0, self.array_size - 1)
119        new_data = (numpy.zeros((1, self.array_size), 'i8') +
120                    11 * (self.proc_num + 1))
121        self.write_queue.put((row_num, new_data))
122
123        # pause, then read the modified row
124        time.sleep(0.015)
125        self.read_queue.put((row_num, self.proc_num))
126        self.output_file.write(str(row_num) + '\n')
127        self.output_file.write(str(self.result_queue.get()) + '\n')
128        self.output_file.close()
129
130
131# this function starts the FileAccess class instance and
132# sets up all the queues used to communicate with it
133def make_queues(num_processors):
134    read_queue = multiprocessing.Queue()
135    write_queue = multiprocessing.Queue()
136    shutdown_recv, shutdown_send = multiprocessing.Pipe(False)
137    result_queues = [multiprocessing.Queue() for i in range(num_processors)]
138    file_access = FileAccess(file_path, read_queue, result_queues, write_queue,
139                             shutdown_recv)
140    file_access.start()
141    return read_queue, result_queues, write_queue, shutdown_send
142
143
144if __name__ == '__main__':
145
146    file_path = 'test.h5'
147    n = 10
148    make_file(file_path, n)
149
150    num_processors = 3
151    (read_queue, result_queues,
152     write_queue, shutdown_send) = make_queues(num_processors)
153
154    processors = []
155    output_files = []
156    for i in range(num_processors):
157        result_queue = result_queues[i]
158        output_file = str(i)
159        processor = DataProcessor(read_queue, result_queue, write_queue, i, n,
160                                  output_file)
161        processors.append(processor)
162        output_files.append(output_file)
163
164    # start all DataProcessor instances
165    for processor in processors:
166        processor.start()
167
168    # wait for all DataProcessor instances to finish
169    for processor in processors:
170        processor.join()
171
172    # shut down the FileAccess instance
173    shutdown_send.send(0)
174
175    # print out contents of log files and delete them
176    print()
177    for output_file in output_files:
178        print()
179        print('contents of log file {0}'.format(output_file))
180        print(open(output_file, 'r').read())
181        os.remove(output_file)
182
183    os.remove('test.h5')
184