1 #ifndef LIBGEODECOMP_IO_HPXWRITERSINKSERVER_H 2 #define LIBGEODECOMP_IO_HPXWRITERSINKSERVER_H 3 4 #include <libgeodecomp/config.h> 5 #ifdef LIBGEODECOMP_WITH_HPX 6 7 #include <libgeodecomp/io/parallelwriter.h> 8 #include <libgeodecomp/io/writer.h> 9 #include <libgeodecomp/storage/gridvecconv.h> 10 11 #include <hpx/hpx_fwd.hpp> 12 #include <hpx/include/components.hpp> 13 #include <hpx/lcos/local/spinlock.hpp> 14 15 namespace LibGeoDecomp { 16 17 template<typename CELL_TYPE> 18 class DistributedSimulator; 19 20 // fixme: can we kill this? 21 template <typename CELL_TYPE> 22 class IdentityConverter 23 { 24 public: 25 typedef CELL_TYPE CellType; 26 typedef typename APITraits::SelectTopology<CELL_TYPE>::Value Topology; 27 operator()28 const CellType& operator()( 29 const CellType& cell, 30 const Coord<Topology::DIM>& globalDimensions, 31 unsigned step, 32 std::size_t rank) 33 { 34 return cell; 35 } 36 }; 37 38 template<typename CELL_TYPE, typename CONVERTER = IdentityConverter<CELL_TYPE> > 39 class HpxWriterSinkServer 40 : public hpx::components::managed_component_base< 41 HpxWriterSinkServer<CELL_TYPE, CONVERTER> 42 > 43 { 44 45 class RegionInfo; 46 47 public: 48 typedef typename CONVERTER::CellType CellType; 49 typedef typename APITraits::SelectTopology<CellType>::Value Topology; 50 static const int DIM = Topology::DIM; 51 typedef DisplacedGrid<CellType, Topology> GridType; 52 typedef Region<Topology::DIM> RegionType; 53 typedef Coord<Topology::DIM> CoordType; 54 typedef std::vector<CellType> BufferType; 55 typedef std::map<unsigned, std::vector<RegionInfo> > RegionInfoMapType; 56 typedef std::map<unsigned, std::size_t> StepCountMapType; 57 typedef std::map<unsigned, GridType> GridMapType; 58 typedef 59 std::map<std::size_t, boost::shared_ptr<ParallelWriter<CellType> > > 60 ParallelWritersMap; 61 typedef 62 std::map<std::size_t, boost::shared_ptr<Writer<CellType> > > 63 SerialWritersMap; 64 65 typedef hpx::lcos::local::spinlock MutexType; 66 HpxWriterSinkServer()67 HpxWriterSinkServer() 68 {} 69 HpxWriterSinkServer(std::size_t numUpdateGroups)70 HpxWriterSinkServer( 71 std::size_t numUpdateGroups) : 72 numUpdateGroups(numUpdateGroups), 73 nextId(0) 74 { 75 } 76 HpxWriterSinkServer(boost::shared_ptr<ParallelWriter<CellType>> parallelWriter,std::size_t numUpdateGroups)77 HpxWriterSinkServer( 78 boost::shared_ptr<ParallelWriter<CellType> > parallelWriter, 79 std::size_t numUpdateGroups) : 80 numUpdateGroups(numUpdateGroups), 81 nextId(0) 82 { 83 connectParallelWriter(parallelWriter); 84 } 85 HpxWriterSinkServer(boost::shared_ptr<Writer<CellType>> serialWriter,std::size_t numUpdateGroups)86 HpxWriterSinkServer( 87 boost::shared_ptr<Writer<CellType> > serialWriter, 88 std::size_t numUpdateGroups) : 89 numUpdateGroups(numUpdateGroups), 90 nextId(0) 91 { 92 connectSerialWriter(serialWriter); 93 } 94 stepFinished(boost::shared_ptr<BufferType> buffer,const RegionType & validRegion,const CoordType & globalDimensions,unsigned step,WriterEvent event,std::size_t rank,bool lastCall)95 void stepFinished( 96 boost::shared_ptr<BufferType> buffer, 97 const RegionType& validRegion, 98 const CoordType& globalDimensions, 99 unsigned step, 100 WriterEvent event, 101 std::size_t rank, 102 bool lastCall) 103 { 104 typedef typename RegionInfoMapType::iterator RegionMapIterator; 105 typedef typename StepCountMapType::iterator StepCountMapIterator; 106 typedef typename GridMapType::iterator GridIterator; 107 108 MutexType::scoped_lock l(mutex); 109 GridIterator kt = gridMap.find(step); 110 if (kt == gridMap.end()) 111 { 112 CoordBox<DIM> coordBox(CoordType(), globalDimensions); 113 kt = gridMap.insert( 114 kt, 115 std::make_pair( 116 step, 117 GridType(coordBox))); 118 } 119 120 GridVecConv::vectorToGrid(*buffer, &kt->second, validRegion); 121 122 RegionMapIterator it = regionInfoMap.find(step); 123 if (it == regionInfoMap.end()) { 124 it = regionInfoMap.insert( 125 it, 126 std::make_pair(step, std::vector<RegionInfo>()) 127 ); 128 } 129 130 it->second.push_back( 131 RegionInfo(validRegion, globalDimensions, event, rank, lastCall) 132 ); 133 134 if (lastCall) { 135 StepCountMapIterator jt = stepCountMap.find(step); 136 if (jt == stepCountMap.end()) 137 { 138 jt = stepCountMap.insert(jt, std::make_pair(step, 1)); 139 } 140 else 141 { 142 ++jt->second; 143 } 144 145 if (jt->second == numUpdateGroups) 146 { 147 { 148 hpx::util::scoped_unlock<MutexType::scoped_lock> ull(l); 149 notifyWriters(kt->second, step, event); 150 } 151 regionInfoMap.erase(it); 152 stepCountMap.erase(jt); 153 gridMap.erase(kt); 154 } 155 } 156 } 157 HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, stepFinished, StepFinishedAction); 158 connectParallelWriter(boost::shared_ptr<ParallelWriter<CellType>> parallelWriter)159 std::size_t connectParallelWriter( 160 boost::shared_ptr<ParallelWriter<CellType> > parallelWriter) 161 { 162 MutexType::scoped_lock l(mutex); 163 std::size_t id = getNextId(); 164 parallelWriters.insert(std::make_pair(id, parallelWriter)); 165 return id; 166 } 167 HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, connectParallelWriter, ConnectParallelWriterAction); 168 connectSerialWriter(boost::shared_ptr<Writer<CellType>> serialWriter)169 std::size_t connectSerialWriter( 170 boost::shared_ptr<Writer<CellType> > serialWriter) 171 { 172 MutexType::scoped_lock l(mutex); 173 std::size_t id = getNextId(); 174 serialWriters.insert(std::make_pair(id, serialWriter)); 175 return id; 176 } 177 HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, connectSerialWriter, ConnectSerialWriterAction); 178 disconnectWriter(std::size_t id)179 void disconnectWriter(std::size_t id) 180 { 181 MutexType::scoped_lock l(mutex); 182 { 183 typename ParallelWritersMap::iterator it = parallelWriters.find(id); 184 if (it != parallelWriters.end()) { 185 parallelWriters.erase(it); 186 freeIds.push_back(id); 187 return; 188 } 189 } 190 { 191 typename SerialWritersMap::iterator it = serialWriters.find(id); 192 if (it != serialWriters.end()) { 193 serialWriters.erase(it); 194 freeIds.push_back(id); 195 return; 196 } 197 } 198 } 199 HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, disconnectWriter, DisconnectWriterAction); 200 getNumUpdateGroups()201 std::size_t getNumUpdateGroups() 202 { 203 return numUpdateGroups; 204 } 205 HPX_DEFINE_COMPONENT_ACTION_TPL(HpxWriterSinkServer, getNumUpdateGroups, NumUpdateGroupsAction); 206 207 private: 208 GridMapType gridMap; 209 ParallelWritersMap parallelWriters; 210 SerialWritersMap serialWriters; 211 std::size_t numUpdateGroups; 212 213 StepCountMapType stepCountMap; 214 RegionInfoMapType regionInfoMap; 215 216 std::size_t nextId; 217 std::vector<std::size_t> freeIds; 218 219 MutexType mutex; 220 getNextId()221 std::size_t getNextId() 222 { 223 std::size_t id = 0; 224 if (!freeIds.empty()) { 225 id = freeIds.back(); 226 freeIds.pop_back(); 227 } 228 else { 229 id = nextId++; 230 } 231 return id; 232 } 233 notifyWriters(GridType const & grid,unsigned step,WriterEvent event)234 void notifyWriters(GridType const & grid, unsigned step, WriterEvent event) 235 { 236 BOOST_FOREACH(typename ParallelWritersMap::value_type& writer, 237 parallelWriters) { 238 MutexType::scoped_lock l(mutex); 239 typedef typename RegionInfoMapType::iterator RegionInfoIterator; 240 241 RegionInfoIterator it = regionInfoMap.find(step); 242 BOOST_ASSERT(it != regionInfoMap.end()); 243 BOOST_FOREACH(RegionInfo const & regionInfo, it->second) { 244 writer.second->stepFinished( 245 grid, 246 regionInfo.validRegion, 247 regionInfo.globalDimensions, 248 step, 249 regionInfo.event, regionInfo.rank, 250 regionInfo.lastCall 251 ); 252 } 253 } 254 BOOST_FOREACH(typename SerialWritersMap::value_type& writer, 255 serialWriters) { 256 writer.second->stepFinished(grid, step, event); 257 } 258 } 259 260 class RegionInfo 261 { 262 public: RegionInfo(const RegionType & validRegion,const CoordType & globalDimensions,WriterEvent event,std::size_t rank,bool lastCall)263 RegionInfo( 264 const RegionType& validRegion, 265 const CoordType& globalDimensions, 266 WriterEvent event, 267 std::size_t rank, 268 bool lastCall) : 269 validRegion(validRegion), 270 globalDimensions(globalDimensions), 271 event(event), 272 rank(rank), 273 lastCall(lastCall) 274 {} 275 276 RegionType validRegion; 277 CoordType globalDimensions; 278 WriterEvent event; 279 std::size_t rank; 280 bool lastCall; 281 }; 282 283 }; 284 285 } 286 287 #endif 288 #endif 289