1=========
2Threading
3=========
4
5.. py:currentmodule:: tables
6
7
8Background
9==========
10
11Several bug reports have been filed in the past by the users regarding
12problems related to the impossibility to use PyTables in multi-thread
13programs.
14
15The problem was mainly related to an internal registry that forced the
16sharing of HDF5 file handles across multiple threads.
17
18In PyTables 3.1.0 the code for file handles management has been completely
19redesigned (see the *Backward incompatible changes* section in
20:doc:`../release-notes/RELEASE_NOTES_v3.1.x`) to be more simple and
21transparent and to allow the use of PyTables in multi-thread programs.
22
23Citing the :doc:`../release-notes/RELEASE_NOTES_v3.1.x`::
24
25    It is important to stress that the new implementation still has an
26    internal registry (implementation detail) and it is still
27    **not thread safe**.
28    Just now a smart enough developer should be able to use PyTables in a
29    muti-thread program without too much headaches.
30
31
32A common schema for concurrency
33===============================
34
35Although it is probably not the most efficient or elegant solution to solve
36a certain class of problems, many users seems to like the possibility to
37load a portion of data and process it inside a *thread function* using
38multiple threads to process the entire dataset.
39
40Each thread is responsible of:
41
42* opening the (same) HDF5 file for reading,
43* load data from it and
44* close the HDF5 file itself
45
46Each file handle is of exclusive use of the thread that opened it and
47file handles are never shared across threads.
48
49In order to do it in a safe way with PyTables some care should be used
50during the phase of opening and closing HDF5 files in order ensure the
51correct behaviour of the internal machinery used to manage HDF5 file handles.
52
53
54Very simple solution
55====================
56
57A very simple solution for this kind of scenario is to use a
58:class:`threading.Lock` around part of the code that are considered critical
59e.g. the :func:`open_file` function and the :meth:`File.close` method::
60
61    import threading
62
63    lock = threading.Lock()
64
65    def synchronized_open_file(*args, **kwargs):
66        with lock:
67            return tb.open_file(*args, **kwargs)
68
69    def synchronized_close_file(self, *args, **kwargs):
70        with lock:
71            return self.close(*args, **kwargs)
72
73
74The :func:`synchronized_open_file` and :func:`synchronized_close_file` can
75be used in the *thread function* to open and close the HDF5 file::
76
77    import numpy as np
78    import tables as tb
79
80    def run(filename, path, inqueue, outqueue):
81        try:
82            yslice = inqueue.get()
83            h5file = synchronized_open_file(filename, mode='r')
84            h5array = h5file.get_node(path)
85            data = h5array[yslice, ...]
86            psum = np.sum(data)
87        except Exception as e:
88            outqueue.put(e)
89        else:
90            outqueue.put(psum)
91        finally:
92            synchronized_close_file(h5file)
93
94
95Finally the main function of the program:
96
97* instantiates the input and output :class:`queue.Queue`,
98* starts all threads,
99* sends the processing requests on the input :class:`queue.Queue`
100* collects results reading from the output :class:`queue.Queue`
101* performs finalization actions (:meth:`threading.Thread.join`)
102
103.. code-block:: python
104
105    import os
106    import queue
107    import threading
108
109    import numpy as np
110    import tables as tb
111
112    SIZE = 100
113    NTHREADS = 5
114    FILENAME = 'simple_threading.h5'
115    H5PATH = '/array'
116
117    def create_test_file(filename):
118        data = np.random.rand(SIZE, SIZE)
119
120        with tb.open_file(filename, 'w') as h5file:
121            h5file.create_array('/', 'array', title="Test Array", obj=data)
122
123    def chunk_generator(data_size, nchunks):
124        chunk_size = int(np.ceil(data_size / nchunks))
125        for start in range(0, data_size, chunk_size):
126            yield slice(start, start + chunk_size)
127
128    def main():
129        # generate the test data
130        if not os.path.exists(FILENAME):
131            create_test_file(FILENAME)
132
133        threads = []
134        inqueue = queue.Queue()
135        outqueue = queue.Queue()
136
137        # start all threads
138        for i in range(NTHREADS):
139            thread = threading.Thread(
140                target=run, args=(FILENAME, H5PATH, inqueue, outqueue))
141            thread.start()
142            threads.append(thread)
143
144        # push requests in the input queue
145        for yslice in chunk_generator(SIZE, len(threads)):
146            inqueue.put(yslice)
147
148        # collect results
149        try:
150            mean_ = 0.
151
152            for i in range(len(threads)):
153                out = outqueue.get()
154                if isinstance(out, Exception):
155                    raise out
156                else:
157                    mean_ += out
158
159            mean_ /= SIZE * SIZE
160
161        finally:
162            for thread in threads:
163                thread.join()
164
165        # print results
166        print('Mean: {}'.format(mean_))
167
168    if __name__ == '__main__':
169        main()
170
171The program in the example computes the mean value of a potentially huge
172dataset splinting the computation across :data:`NTHREADS` (5 in this case)
173threads.
174
175The complete and working code of this example (Python 3 is required) can be
176found in the :file:`examples` directory:
177:download:`simple_threading.py <../../../examples/simple_threading.py>`.
178
179The approach presented in this section is very simple and readable but has
180the **drawback** that the user code have to be modified to replace
181:func:`open_file` and :meth:`File.close` calls with their safe version
182(:func:`synchronized_open_file` and :func:`synchronized_close_file`).
183
184Also, the solution shown in the example does not cover the entire PyTables
185API (e.g. although not recommended HDF5 files can be opened using the
186:class:`File` constructor) and makes it impossible to use *pythonic*
187constructs like the *with* statement::
188
189    with tb.open_file(filename) as h5file:
190        do_something(h5file)
191
192
193Monkey-patching PyTables
194========================
195
196An alternative implementation with respect to the `Very simple solution`_
197presented in the previous section consists in monkey-patching the PyTables
198package to replace some of its components with a more thread-safe version of
199themselves::
200
201    import threading
202
203    import tables as tb
204    import tables.file as _tables_file
205
206    class ThreadsafeFileRegistry(_tables_file._FileRegistry):
207        lock = threading.RLock()
208
209        @property
210        def handlers(self):
211            return self._handlers.copy()
212
213        def add(self, handler):
214            with self.lock:
215                return super().add(handler)
216
217        def remove(self, handler):
218            with self.lock:
219                return super().remove(handler)
220
221        def close_all(self):
222            with self.lock:
223                return super().close_all(handler)
224
225    class ThreadsafeFile(_tables_file.File):
226        def __init__(self, *args, **kargs):
227            with ThreadsafeFileRegistry.lock:
228                super().__init__(*args, **kargs)
229
230        def close(self):
231            with ThreadsafeFileRegistry.lock:
232                super().close()
233
234    @functools.wraps(tb.open_file)
235    def synchronized_open_file(*args, **kwargs):
236        with ThreadsafeFileRegistry.lock:
237            return _tables_file._original_open_file(*args, **kwargs)
238
239    # monkey patch the tables package
240    _tables_file._original_open_file = _tables_file.open_file
241    _tables_file.open_file = synchronized_open_file
242    tb.open_file = synchronized_open_file
243
244    _tables_file._original_File = _tables_file.File
245    _tables_file.File = ThreadsafeFile
246    tb.File = ThreadsafeFile
247
248    _tables_file._open_files = ThreadsafeFileRegistry()
249
250
251At this point PyTables can be used transparently in the example program presented
252in the previous section.
253In particular the standard PyTables API (including *with* statements) can be
254used in the *thread function*::
255
256    def run(filename, path, inqueue, outqueue):
257        try:
258            yslice = inqueue.get()
259            with tb.open_file(filename, mode='r') as h5file:
260                h5array = h5file.get_node(path)
261                data = h5array[yslice, ...]
262            psum = np.sum(data)
263        except Exception as e:
264            outqueue.put(e)
265        else:
266            outqueue.put(psum)
267
268
269The complete code of this version of the example can be found in the
270:file:`examples` folder:
271:download:`simple_threading.py <../../../examples/threading_monkeypatch.py>`.
272Python 3 is required.
273
274
275