1========= 2Threading 3========= 4 5.. py:currentmodule:: tables 6 7 8Background 9========== 10 11Several bug reports have been filed in the past by the users regarding 12problems related to the impossibility to use PyTables in multi-thread 13programs. 14 15The problem was mainly related to an internal registry that forced the 16sharing of HDF5 file handles across multiple threads. 17 18In PyTables 3.1.0 the code for file handles management has been completely 19redesigned (see the *Backward incompatible changes* section in 20:doc:`../release-notes/RELEASE_NOTES_v3.1.x`) to be more simple and 21transparent and to allow the use of PyTables in multi-thread programs. 22 23Citing the :doc:`../release-notes/RELEASE_NOTES_v3.1.x`:: 24 25 It is important to stress that the new implementation still has an 26 internal registry (implementation detail) and it is still 27 **not thread safe**. 28 Just now a smart enough developer should be able to use PyTables in a 29 muti-thread program without too much headaches. 30 31 32A common schema for concurrency 33=============================== 34 35Although it is probably not the most efficient or elegant solution to solve 36a certain class of problems, many users seems to like the possibility to 37load a portion of data and process it inside a *thread function* using 38multiple threads to process the entire dataset. 39 40Each thread is responsible of: 41 42* opening the (same) HDF5 file for reading, 43* load data from it and 44* close the HDF5 file itself 45 46Each file handle is of exclusive use of the thread that opened it and 47file handles are never shared across threads. 48 49In order to do it in a safe way with PyTables some care should be used 50during the phase of opening and closing HDF5 files in order ensure the 51correct behaviour of the internal machinery used to manage HDF5 file handles. 52 53 54Very simple solution 55==================== 56 57A very simple solution for this kind of scenario is to use a 58:class:`threading.Lock` around part of the code that are considered critical 59e.g. the :func:`open_file` function and the :meth:`File.close` method:: 60 61 import threading 62 63 lock = threading.Lock() 64 65 def synchronized_open_file(*args, **kwargs): 66 with lock: 67 return tb.open_file(*args, **kwargs) 68 69 def synchronized_close_file(self, *args, **kwargs): 70 with lock: 71 return self.close(*args, **kwargs) 72 73 74The :func:`synchronized_open_file` and :func:`synchronized_close_file` can 75be used in the *thread function* to open and close the HDF5 file:: 76 77 import numpy as np 78 import tables as tb 79 80 def run(filename, path, inqueue, outqueue): 81 try: 82 yslice = inqueue.get() 83 h5file = synchronized_open_file(filename, mode='r') 84 h5array = h5file.get_node(path) 85 data = h5array[yslice, ...] 86 psum = np.sum(data) 87 except Exception as e: 88 outqueue.put(e) 89 else: 90 outqueue.put(psum) 91 finally: 92 synchronized_close_file(h5file) 93 94 95Finally the main function of the program: 96 97* instantiates the input and output :class:`queue.Queue`, 98* starts all threads, 99* sends the processing requests on the input :class:`queue.Queue` 100* collects results reading from the output :class:`queue.Queue` 101* performs finalization actions (:meth:`threading.Thread.join`) 102 103.. code-block:: python 104 105 import os 106 import queue 107 import threading 108 109 import numpy as np 110 import tables as tb 111 112 SIZE = 100 113 NTHREADS = 5 114 FILENAME = 'simple_threading.h5' 115 H5PATH = '/array' 116 117 def create_test_file(filename): 118 data = np.random.rand(SIZE, SIZE) 119 120 with tb.open_file(filename, 'w') as h5file: 121 h5file.create_array('/', 'array', title="Test Array", obj=data) 122 123 def chunk_generator(data_size, nchunks): 124 chunk_size = int(np.ceil(data_size / nchunks)) 125 for start in range(0, data_size, chunk_size): 126 yield slice(start, start + chunk_size) 127 128 def main(): 129 # generate the test data 130 if not os.path.exists(FILENAME): 131 create_test_file(FILENAME) 132 133 threads = [] 134 inqueue = queue.Queue() 135 outqueue = queue.Queue() 136 137 # start all threads 138 for i in range(NTHREADS): 139 thread = threading.Thread( 140 target=run, args=(FILENAME, H5PATH, inqueue, outqueue)) 141 thread.start() 142 threads.append(thread) 143 144 # push requests in the input queue 145 for yslice in chunk_generator(SIZE, len(threads)): 146 inqueue.put(yslice) 147 148 # collect results 149 try: 150 mean_ = 0. 151 152 for i in range(len(threads)): 153 out = outqueue.get() 154 if isinstance(out, Exception): 155 raise out 156 else: 157 mean_ += out 158 159 mean_ /= SIZE * SIZE 160 161 finally: 162 for thread in threads: 163 thread.join() 164 165 # print results 166 print('Mean: {}'.format(mean_)) 167 168 if __name__ == '__main__': 169 main() 170 171The program in the example computes the mean value of a potentially huge 172dataset splinting the computation across :data:`NTHREADS` (5 in this case) 173threads. 174 175The complete and working code of this example (Python 3 is required) can be 176found in the :file:`examples` directory: 177:download:`simple_threading.py <../../../examples/simple_threading.py>`. 178 179The approach presented in this section is very simple and readable but has 180the **drawback** that the user code have to be modified to replace 181:func:`open_file` and :meth:`File.close` calls with their safe version 182(:func:`synchronized_open_file` and :func:`synchronized_close_file`). 183 184Also, the solution shown in the example does not cover the entire PyTables 185API (e.g. although not recommended HDF5 files can be opened using the 186:class:`File` constructor) and makes it impossible to use *pythonic* 187constructs like the *with* statement:: 188 189 with tb.open_file(filename) as h5file: 190 do_something(h5file) 191 192 193Monkey-patching PyTables 194======================== 195 196An alternative implementation with respect to the `Very simple solution`_ 197presented in the previous section consists in monkey-patching the PyTables 198package to replace some of its components with a more thread-safe version of 199themselves:: 200 201 import threading 202 203 import tables as tb 204 import tables.file as _tables_file 205 206 class ThreadsafeFileRegistry(_tables_file._FileRegistry): 207 lock = threading.RLock() 208 209 @property 210 def handlers(self): 211 return self._handlers.copy() 212 213 def add(self, handler): 214 with self.lock: 215 return super().add(handler) 216 217 def remove(self, handler): 218 with self.lock: 219 return super().remove(handler) 220 221 def close_all(self): 222 with self.lock: 223 return super().close_all(handler) 224 225 class ThreadsafeFile(_tables_file.File): 226 def __init__(self, *args, **kargs): 227 with ThreadsafeFileRegistry.lock: 228 super().__init__(*args, **kargs) 229 230 def close(self): 231 with ThreadsafeFileRegistry.lock: 232 super().close() 233 234 @functools.wraps(tb.open_file) 235 def synchronized_open_file(*args, **kwargs): 236 with ThreadsafeFileRegistry.lock: 237 return _tables_file._original_open_file(*args, **kwargs) 238 239 # monkey patch the tables package 240 _tables_file._original_open_file = _tables_file.open_file 241 _tables_file.open_file = synchronized_open_file 242 tb.open_file = synchronized_open_file 243 244 _tables_file._original_File = _tables_file.File 245 _tables_file.File = ThreadsafeFile 246 tb.File = ThreadsafeFile 247 248 _tables_file._open_files = ThreadsafeFileRegistry() 249 250 251At this point PyTables can be used transparently in the example program presented 252in the previous section. 253In particular the standard PyTables API (including *with* statements) can be 254used in the *thread function*:: 255 256 def run(filename, path, inqueue, outqueue): 257 try: 258 yslice = inqueue.get() 259 with tb.open_file(filename, mode='r') as h5file: 260 h5array = h5file.get_node(path) 261 data = h5array[yslice, ...] 262 psum = np.sum(data) 263 except Exception as e: 264 outqueue.put(e) 265 else: 266 outqueue.put(psum) 267 268 269The complete code of this version of the example can be found in the 270:file:`examples` folder: 271:download:`simple_threading.py <../../../examples/threading_monkeypatch.py>`. 272Python 3 is required. 273 274 275