1"""
2    Demonstrate the use of h5py in SWMR mode to write to a dataset (appending)
3    from one process while monitoring the growing dataset from another process.
4
5    Usage:
6            swmr_multiprocess.py [FILENAME [DATASETNAME]]
7
8              FILENAME:    name of file to monitor. Default: swmrmp.h5
9              DATASETNAME: name of dataset to monitor in DATAFILE. Default: data
10
11    This script will start up two processes: a writer and a reader. The writer
12    will open/create the file (FILENAME) in SWMR mode, create a dataset and start
13    appending data to it. After each append the dataset is flushed and an event
14    sent to the reader process. Meanwhile the reader process will wait for events
15    from the writer and when triggered it will refresh the dataset and read the
16    current shape of it.
17"""
18
19import sys
20import h5py
21import numpy as np
22import logging
23from multiprocessing import Process, Event
24
25class SwmrReader(Process):
26    def __init__(self, event, fname, dsetname, timeout = 2.0):
27        super(SwmrReader, self).__init__()
28        self._event = event
29        self._fname = fname
30        self._dsetname = dsetname
31        self._timeout = timeout
32
33    def run(self):
34        self.log = logging.getLogger('reader')
35        self.log.info("Waiting for initial event")
36        assert self._event.wait( self._timeout )
37        self._event.clear()
38
39        self.log.info("Opening file %s", self._fname)
40        f = h5py.File(self._fname, 'r', libver='latest', swmr=True)
41        assert f.swmr_mode
42        dset = f[self._dsetname]
43        try:
44            # monitor and read loop
45            while self._event.wait( self._timeout ):
46                self._event.clear()
47                self.log.debug("Refreshing dataset")
48                dset.refresh()
49
50                shape = dset.shape
51                self.log.info("Read dset shape: %s"%str(shape))
52        finally:
53            f.close()
54
55class SwmrWriter(Process):
56    def __init__(self, event, fname, dsetname):
57        super(SwmrWriter, self).__init__()
58        self._event = event
59        self._fname = fname
60        self._dsetname = dsetname
61
62    def run(self):
63        self.log = logging.getLogger('writer')
64        self.log.info("Creating file %s", self._fname)
65        f = h5py.File(self._fname, 'w', libver='latest')
66        try:
67            arr = np.array([1,2,3,4])
68            dset = f.create_dataset(self._dsetname, chunks=(2,), maxshape=(None,), data=arr)
69            assert not f.swmr_mode
70
71            self.log.info("SWMR mode")
72            f.swmr_mode = True
73            assert f.swmr_mode
74            self.log.debug("Sending initial event")
75            self._event.set()
76
77            # Write loop
78            for i in range(5):
79                new_shape = ((i+1) * len(arr), )
80                self.log.info("Resizing dset shape: %s"%str(new_shape))
81                dset.resize( new_shape )
82                self.log.debug("Writing data")
83                dset[i*len(arr):] = arr
84                #dset.write_direct( arr, np.s_[:], np.s_[i*len(arr):] )
85                self.log.debug("Flushing data")
86                dset.flush()
87                self.log.info("Sending event")
88                self._event.set()
89        finally:
90            f.close()
91
92
93if __name__ == "__main__":
94    logging.basicConfig(format='%(levelname)10s  %(asctime)s  %(name)10s  %(message)s',level=logging.INFO)
95    fname = 'swmrmp.h5'
96    dsetname = 'data'
97    if len(sys.argv) > 1:
98        fname = sys.argv[1]
99    if len(sys.argv) > 2:
100        dsetname = sys.argv[2]
101
102    event = Event()
103    reader = SwmrReader(event, fname, dsetname)
104    writer = SwmrWriter(event, fname, dsetname)
105
106    logging.info("Starting reader")
107    reader.start()
108    logging.info("Starting reader")
109    writer.start()
110
111    logging.info("Waiting for writer to finish")
112    writer.join()
113    logging.info("Waiting for reader to finish")
114    reader.join()
115