1""" 2 Demonstrate the use of h5py in SWMR mode to write to a dataset (appending) 3 from one process while monitoring the growing dataset from another process. 4 5 Usage: 6 swmr_multiprocess.py [FILENAME [DATASETNAME]] 7 8 FILENAME: name of file to monitor. Default: swmrmp.h5 9 DATASETNAME: name of dataset to monitor in DATAFILE. Default: data 10 11 This script will start up two processes: a writer and a reader. The writer 12 will open/create the file (FILENAME) in SWMR mode, create a dataset and start 13 appending data to it. After each append the dataset is flushed and an event 14 sent to the reader process. Meanwhile the reader process will wait for events 15 from the writer and when triggered it will refresh the dataset and read the 16 current shape of it. 17""" 18 19import sys 20import h5py 21import numpy as np 22import logging 23from multiprocessing import Process, Event 24 25class SwmrReader(Process): 26 def __init__(self, event, fname, dsetname, timeout = 2.0): 27 super(SwmrReader, self).__init__() 28 self._event = event 29 self._fname = fname 30 self._dsetname = dsetname 31 self._timeout = timeout 32 33 def run(self): 34 self.log = logging.getLogger('reader') 35 self.log.info("Waiting for initial event") 36 assert self._event.wait( self._timeout ) 37 self._event.clear() 38 39 self.log.info("Opening file %s", self._fname) 40 f = h5py.File(self._fname, 'r', libver='latest', swmr=True) 41 assert f.swmr_mode 42 dset = f[self._dsetname] 43 try: 44 # monitor and read loop 45 while self._event.wait( self._timeout ): 46 self._event.clear() 47 self.log.debug("Refreshing dataset") 48 dset.refresh() 49 50 shape = dset.shape 51 self.log.info("Read dset shape: %s"%str(shape)) 52 finally: 53 f.close() 54 55class SwmrWriter(Process): 56 def __init__(self, event, fname, dsetname): 57 super(SwmrWriter, self).__init__() 58 self._event = event 59 self._fname = fname 60 self._dsetname = dsetname 61 62 def run(self): 63 self.log = logging.getLogger('writer') 64 self.log.info("Creating file %s", self._fname) 65 f = h5py.File(self._fname, 'w', libver='latest') 66 try: 67 arr = np.array([1,2,3,4]) 68 dset = f.create_dataset(self._dsetname, chunks=(2,), maxshape=(None,), data=arr) 69 assert not f.swmr_mode 70 71 self.log.info("SWMR mode") 72 f.swmr_mode = True 73 assert f.swmr_mode 74 self.log.debug("Sending initial event") 75 self._event.set() 76 77 # Write loop 78 for i in range(5): 79 new_shape = ((i+1) * len(arr), ) 80 self.log.info("Resizing dset shape: %s"%str(new_shape)) 81 dset.resize( new_shape ) 82 self.log.debug("Writing data") 83 dset[i*len(arr):] = arr 84 #dset.write_direct( arr, np.s_[:], np.s_[i*len(arr):] ) 85 self.log.debug("Flushing data") 86 dset.flush() 87 self.log.info("Sending event") 88 self._event.set() 89 finally: 90 f.close() 91 92 93if __name__ == "__main__": 94 logging.basicConfig(format='%(levelname)10s %(asctime)s %(name)10s %(message)s',level=logging.INFO) 95 fname = 'swmrmp.h5' 96 dsetname = 'data' 97 if len(sys.argv) > 1: 98 fname = sys.argv[1] 99 if len(sys.argv) > 2: 100 dsetname = sys.argv[2] 101 102 event = Event() 103 reader = SwmrReader(event, fname, dsetname) 104 writer = SwmrWriter(event, fname, dsetname) 105 106 logging.info("Starting reader") 107 reader.start() 108 logging.info("Starting reader") 109 writer.start() 110 111 logging.info("Waiting for writer to finish") 112 writer.join() 113 logging.info("Waiting for reader to finish") 114 reader.join() 115