1 #ifndef LIBGEODECOMP_IO_HPXWRITERSINKSERVER_H
2 #define LIBGEODECOMP_IO_HPXWRITERSINKSERVER_H
3 
4 #include <libgeodecomp/config.h>
5 #ifdef LIBGEODECOMP_WITH_HPX
6 
7 #include <libgeodecomp/io/parallelwriter.h>
8 #include <libgeodecomp/io/writer.h>
9 #include <libgeodecomp/storage/gridvecconv.h>
10 
11 #include <hpx/hpx_fwd.hpp>
12 #include <hpx/include/components.hpp>
13 #include <hpx/lcos/local/spinlock.hpp>
14 
15 namespace LibGeoDecomp {
16 
17 template<typename CELL_TYPE>
18 class DistributedSimulator;
19 
20 // fixme: can we kill this?
21 template <typename CELL_TYPE>
22 class IdentityConverter
23 {
24 public:
25     typedef CELL_TYPE CellType;
26     typedef typename APITraits::SelectTopology<CELL_TYPE>::Value Topology;
27 
operator()28     const CellType& operator()(
29         const CellType& cell,
30         const Coord<Topology::DIM>& globalDimensions,
31         unsigned step,
32         std::size_t rank)
33     {
34         return cell;
35     }
36 };
37 
38 template<typename CELL_TYPE, typename CONVERTER = IdentityConverter<CELL_TYPE> >
39 class HpxWriterSinkServer
40   : public hpx::components::managed_component_base<
41         HpxWriterSinkServer<CELL_TYPE, CONVERTER>
42     >
43 {
44 
45     class RegionInfo;
46 
47 public:
48     typedef typename CONVERTER::CellType CellType;
49     typedef typename APITraits::SelectTopology<CellType>::Value Topology;
50     static const int DIM = Topology::DIM;
51     typedef DisplacedGrid<CellType, Topology> GridType;
52     typedef Region<Topology::DIM> RegionType;
53     typedef Coord<Topology::DIM> CoordType;
54     typedef std::vector<CellType> BufferType;
55     typedef std::map<unsigned, std::vector<RegionInfo> > RegionInfoMapType;
56     typedef std::map<unsigned, std::size_t> StepCountMapType;
57     typedef std::map<unsigned, GridType> GridMapType;
58     typedef
59         std::map<std::size_t, boost::shared_ptr<ParallelWriter<CellType> > >
60         ParallelWritersMap;
61     typedef
62         std::map<std::size_t, boost::shared_ptr<Writer<CellType> > >
63         SerialWritersMap;
64 
65     typedef hpx::lcos::local::spinlock MutexType;
66 
HpxWriterSinkServer()67     HpxWriterSinkServer()
68     {}
69 
HpxWriterSinkServer(std::size_t numUpdateGroups)70     HpxWriterSinkServer(
71         std::size_t numUpdateGroups) :
72         numUpdateGroups(numUpdateGroups),
73         nextId(0)
74     {
75     }
76 
HpxWriterSinkServer(boost::shared_ptr<ParallelWriter<CellType>> parallelWriter,std::size_t numUpdateGroups)77     HpxWriterSinkServer(
78         boost::shared_ptr<ParallelWriter<CellType> > parallelWriter,
79         std::size_t numUpdateGroups) :
80         numUpdateGroups(numUpdateGroups),
81         nextId(0)
82     {
83         connectParallelWriter(parallelWriter);
84     }
85 
HpxWriterSinkServer(boost::shared_ptr<Writer<CellType>> serialWriter,std::size_t numUpdateGroups)86     HpxWriterSinkServer(
87         boost::shared_ptr<Writer<CellType> > serialWriter,
88         std::size_t numUpdateGroups) :
89         numUpdateGroups(numUpdateGroups),
90         nextId(0)
91     {
92         connectSerialWriter(serialWriter);
93     }
94 
stepFinished(boost::shared_ptr<BufferType> buffer,const RegionType & validRegion,const CoordType & globalDimensions,unsigned step,WriterEvent event,std::size_t rank,bool lastCall)95     void stepFinished(
96         boost::shared_ptr<BufferType> buffer,
97         const RegionType& validRegion,
98         const CoordType& globalDimensions,
99         unsigned step,
100         WriterEvent event,
101         std::size_t rank,
102         bool lastCall)
103     {
104         typedef typename RegionInfoMapType::iterator RegionMapIterator;
105         typedef typename StepCountMapType::iterator StepCountMapIterator;
106         typedef typename GridMapType::iterator GridIterator;
107 
108         MutexType::scoped_lock l(mutex);
109         GridIterator kt = gridMap.find(step);
110         if (kt == gridMap.end())
111         {
112             CoordBox<DIM> coordBox(CoordType(), globalDimensions);
113             kt = gridMap.insert(
114                 kt,
115                 std::make_pair(
116                     step,
117                     GridType(coordBox)));
118         }
119 
120         GridVecConv::vectorToGrid(*buffer, &kt->second, validRegion);
121 
122         RegionMapIterator it = regionInfoMap.find(step);
123         if (it == regionInfoMap.end()) {
124             it = regionInfoMap.insert(
125                     it,
126                     std::make_pair(step, std::vector<RegionInfo>())
127                 );
128         }
129 
130         it->second.push_back(
131             RegionInfo(validRegion, globalDimensions, event, rank, lastCall)
132         );
133 
134         if (lastCall) {
135             StepCountMapIterator jt = stepCountMap.find(step);
136             if (jt == stepCountMap.end())
137             {
138                 jt = stepCountMap.insert(jt, std::make_pair(step, 1));
139             }
140             else
141             {
142                 ++jt->second;
143             }
144 
145             if (jt->second == numUpdateGroups)
146             {
147                 {
148                     hpx::util::scoped_unlock<MutexType::scoped_lock> ull(l);
149                     notifyWriters(kt->second, step, event);
150                 }
151                 regionInfoMap.erase(it);
152                 stepCountMap.erase(jt);
153                 gridMap.erase(kt);
154             }
155         }
156     }
157     HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, stepFinished, StepFinishedAction);
158 
connectParallelWriter(boost::shared_ptr<ParallelWriter<CellType>> parallelWriter)159     std::size_t connectParallelWriter(
160         boost::shared_ptr<ParallelWriter<CellType> > parallelWriter)
161     {
162         MutexType::scoped_lock l(mutex);
163         std::size_t id = getNextId();
164         parallelWriters.insert(std::make_pair(id, parallelWriter));
165         return id;
166     }
167     HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, connectParallelWriter, ConnectParallelWriterAction);
168 
connectSerialWriter(boost::shared_ptr<Writer<CellType>> serialWriter)169     std::size_t connectSerialWriter(
170         boost::shared_ptr<Writer<CellType> > serialWriter)
171     {
172         MutexType::scoped_lock l(mutex);
173         std::size_t id = getNextId();
174         serialWriters.insert(std::make_pair(id, serialWriter));
175         return id;
176     }
177     HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, connectSerialWriter, ConnectSerialWriterAction);
178 
disconnectWriter(std::size_t id)179     void disconnectWriter(std::size_t id)
180     {
181         MutexType::scoped_lock l(mutex);
182         {
183             typename ParallelWritersMap::iterator it = parallelWriters.find(id);
184             if (it != parallelWriters.end()) {
185                 parallelWriters.erase(it);
186                 freeIds.push_back(id);
187                 return;
188             }
189         }
190         {
191             typename SerialWritersMap::iterator it = serialWriters.find(id);
192             if (it != serialWriters.end()) {
193                 serialWriters.erase(it);
194                 freeIds.push_back(id);
195                 return;
196             }
197         }
198     }
199     HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, disconnectWriter, DisconnectWriterAction);
200 
getNumUpdateGroups()201     std::size_t getNumUpdateGroups()
202     {
203         return numUpdateGroups;
204     }
205     HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, getNumUpdateGroups, NumUpdateGroupsAction);
206 
207 private:
208     GridMapType gridMap;
209     ParallelWritersMap parallelWriters;
210     SerialWritersMap serialWriters;
211     std::size_t numUpdateGroups;
212 
213     StepCountMapType stepCountMap;
214     RegionInfoMapType regionInfoMap;
215 
216     std::size_t nextId;
217     std::vector<std::size_t> freeIds;
218 
219     MutexType mutex;
220 
getNextId()221     std::size_t getNextId()
222     {
223         std::size_t id = 0;
224         if (!freeIds.empty()) {
225             id = freeIds.back();
226             freeIds.pop_back();
227         }
228         else {
229             id = nextId++;
230         }
231         return id;
232     }
233 
notifyWriters(GridType const & grid,unsigned step,WriterEvent event)234     void notifyWriters(GridType const & grid, unsigned step, WriterEvent event)
235     {
236         BOOST_FOREACH(typename ParallelWritersMap::value_type& writer,
237                       parallelWriters) {
238             MutexType::scoped_lock l(mutex);
239             typedef typename RegionInfoMapType::iterator RegionInfoIterator;
240 
241             RegionInfoIterator it = regionInfoMap.find(step);
242             BOOST_ASSERT(it != regionInfoMap.end());
243             BOOST_FOREACH(RegionInfo const & regionInfo, it->second) {
244                 writer.second->stepFinished(
245                     grid,
246                     regionInfo.validRegion,
247                     regionInfo.globalDimensions,
248                     step,
249                     regionInfo.event, regionInfo.rank,
250                     regionInfo.lastCall
251                 );
252             }
253         }
254         BOOST_FOREACH(typename SerialWritersMap::value_type& writer,
255                       serialWriters) {
256             writer.second->stepFinished(grid, step, event);
257         }
258     }
259 
260     class RegionInfo
261     {
262     public:
RegionInfo(const RegionType & validRegion,const CoordType & globalDimensions,WriterEvent event,std::size_t rank,bool lastCall)263         RegionInfo(
264             const RegionType& validRegion,
265             const CoordType& globalDimensions,
266             WriterEvent event,
267             std::size_t rank,
268             bool lastCall) :
269             validRegion(validRegion),
270             globalDimensions(globalDimensions),
271             event(event),
272             rank(rank),
273             lastCall(lastCall)
274         {}
275 
276         RegionType validRegion;
277         CoordType globalDimensions;
278         WriterEvent event;
279         std::size_t rank;
280         bool lastCall;
281     };
282 
283 };
284 
285 }
286 
287 #endif
288 #endif
289