1 #ifndef LIBGEODECOMP_IO_REMOTESTEERER_H 2 #define LIBGEODECOMP_IO_REMOTESTEERER_H 3 4 #include <libgeodecomp/config.h> 5 #ifdef LIBGEODECOMP_WITH_BOOST_ASIO 6 #ifdef LIBGEODECOMP_WITH_THREADS 7 #ifdef LIBGEODECOMP_WITH_MPI 8 9 #include <libgeodecomp/communication/typemaps.h> 10 #include <libgeodecomp/communication/mpilayer.h> 11 #include <libgeodecomp/io/steerer.h> 12 #include <libgeodecomp/io/remotesteerer/commandserver.h> 13 #include <libgeodecomp/io/remotesteerer/handler.h> 14 #include <libgeodecomp/io/remotesteerer/gethandler.h> 15 #include <libgeodecomp/io/remotesteerer/pipe.h> 16 17 namespace LibGeoDecomp { 18 19 using namespace RemoteSteererHelpers; 20 21 /** 22 * The RemoteSteerer allows the user to control a parallel simulation 23 * from a single network connection (on the "connection node"). It 24 * employs a two-way callback scheme to enable convenient extension of 25 * its functionality: 26 * 27 * - On the connection node Action objects can be invoked via the 28 * CommandServer to provide the user with a richer terminal 29 * interface. 30 * 31 * - On the compute nodes Handler objects will then execute the user 32 * requests on simulation data. Handlers and Actions may communicate 33 * via a asynchronous message buffer. 34 * 35 * Keep in mind that the connection node will generally double as an 36 * execution node. 37 */ 38 template<typename CELL_TYPE> 39 class RemoteSteerer : public Steerer<CELL_TYPE> 40 { 41 public: 42 friend class RemoteSteererTest; 43 typedef typename Steerer<CELL_TYPE>::SteererFeedback SteererFeedback; 44 typedef typename Steerer<CELL_TYPE>::Topology Topology; 45 typedef typename Steerer<CELL_TYPE>::GridType GridType; 46 typedef std::map<std::string, boost::shared_ptr<Handler<CELL_TYPE> > > HandlerMap; 47 static const int DIM = Topology::DIM; 48 49 RemoteSteerer( 50 unsigned period, 51 int port, 52 int root = 0, 53 MPI_Comm communicator = MPI_COMM_WORLD) : 54 Steerer<CELL_TYPE>(period), 55 port(port), 56 pipe(new Pipe(root, communicator)) 57 { 58 if (MPILayer(communicator).rank() == root) { 59 commandServer.reset(new CommandServer<CELL_TYPE>(port, pipe)); 60 } 61 } 62 nextStep(GridType * grid,const Region<DIM> & validRegion,const Coord<DIM> & gridDim,unsigned step,SteererEvent event,std::size_t rank,bool lastCall,SteererFeedback * feedback)63 virtual void nextStep( 64 GridType *grid, 65 const Region<DIM>& validRegion, 66 const Coord<DIM>& gridDim, 67 unsigned step, 68 SteererEvent event, 69 std::size_t rank, 70 bool lastCall, 71 SteererFeedback *feedback) 72 { 73 LOG(DBG, "RemoteSteerer::nextStep(step = " << step << ")"); 74 pipe->sync(); 75 StringVec steeringRequests = pipe->retrieveSteeringRequests(); 76 77 for (StringVec::iterator i = steeringRequests.begin(); 78 i != steeringRequests.end(); 79 ++i) { 80 LOG(DBG, "RemoteSteerer::nextStep got" << *i); 81 StringVec parameters = StringOps::tokenize(*i, " "); 82 std::string command = pop_front(parameters); 83 84 if (handlers.count(command) == 0) { 85 std::string message = "handler not found: " + command; 86 LOG(Logger::WARN, message); 87 pipe->addSteeringFeedback(message); 88 continue; 89 } 90 91 int handled = (*handlers[command])(parameters, *pipe, grid, validRegion, step); 92 // we may have to requeue requrests which could not yet be handled 93 if (!handled) { 94 pipe->addSteeringRequest(*i); 95 } 96 } 97 98 pipe->sync(); 99 } 100 addAction(Action<CELL_TYPE> * action)101 void addAction(Action<CELL_TYPE> *action) 102 { 103 commandServer->addAction(action); 104 } 105 addHandler(Handler<CELL_TYPE> * handler)106 void addHandler(Handler<CELL_TYPE> *handler) 107 { 108 handlers[handler->key()] = boost::shared_ptr<Handler<CELL_TYPE> >(handler); 109 } 110 111 template<typename MEMBER_TYPE> addDataAccessor(DataAccessor<CELL_TYPE,MEMBER_TYPE> * accessor)112 void addDataAccessor(DataAccessor<CELL_TYPE, MEMBER_TYPE> *accessor) 113 { 114 if (commandServer) { 115 GetAction<CELL_TYPE> *action = new GetAction<CELL_TYPE>(accessor->name()); 116 addAction(action); 117 } 118 119 boost::shared_ptr<DataAccessor<CELL_TYPE, MEMBER_TYPE> > accessorPtr(accessor); 120 handlers["get_" + accessor->name()].reset(new GetHandler<CELL_TYPE, MEMBER_TYPE>(accessorPtr)); 121 } 122 sendCommand(const std::string & command)123 void sendCommand(const std::string& command) 124 { 125 CommandServer<CELL_TYPE>::sendCommand(command, port); 126 } 127 sendCommandWithFeedback(const std::string & command,int feedbackLines)128 StringVec sendCommandWithFeedback(const std::string& command, int feedbackLines) 129 { 130 return CommandServer<CELL_TYPE>::sendCommandWithFeedback(command, feedbackLines, port); 131 } 132 133 private: 134 HandlerMap handlers; 135 int port; 136 boost::shared_ptr<Pipe> pipe; 137 boost::shared_ptr<CommandServer<CELL_TYPE> > commandServer; 138 }; 139 140 } 141 142 #endif 143 #endif 144 #endif 145 146 #endif 147