1 #ifndef LIBGEODECOMP_IO_REMOTESTEERER_H
2 #define LIBGEODECOMP_IO_REMOTESTEERER_H
3 
4 #include <libgeodecomp/config.h>
5 #ifdef LIBGEODECOMP_WITH_BOOST_ASIO
6 #ifdef LIBGEODECOMP_WITH_THREADS
7 #ifdef LIBGEODECOMP_WITH_MPI
8 
9 #include <libgeodecomp/communication/typemaps.h>
10 #include <libgeodecomp/communication/mpilayer.h>
11 #include <libgeodecomp/io/steerer.h>
12 #include <libgeodecomp/io/remotesteerer/commandserver.h>
13 #include <libgeodecomp/io/remotesteerer/handler.h>
14 #include <libgeodecomp/io/remotesteerer/gethandler.h>
15 #include <libgeodecomp/io/remotesteerer/pipe.h>
16 
17 namespace LibGeoDecomp {
18 
19 using namespace RemoteSteererHelpers;
20 
21 /**
22  * The RemoteSteerer allows the user to control a parallel simulation
23  * from a single network connection (on the "connection node"). It
24  * employs a two-way callback scheme to enable convenient extension of
25  * its functionality:
26  *
27  * - On the connection node Action objects can be invoked via the
28  *   CommandServer to provide the user with a richer terminal
29  *   interface.
30  *
31  * - On the compute nodes Handler objects will then execute the user
32  *   requests on simulation data. Handlers and Actions may communicate
33  *   via a asynchronous message buffer.
34  *
35  * Keep in mind that the connection node will generally double as an
36  * execution node.
37  */
38 template<typename CELL_TYPE>
39 class RemoteSteerer : public Steerer<CELL_TYPE>
40 {
41 public:
42     friend class RemoteSteererTest;
43     typedef typename Steerer<CELL_TYPE>::SteererFeedback SteererFeedback;
44     typedef typename Steerer<CELL_TYPE>::Topology Topology;
45     typedef typename Steerer<CELL_TYPE>::GridType GridType;
46     typedef std::map<std::string, boost::shared_ptr<Handler<CELL_TYPE> > > HandlerMap;
47     static const int DIM = Topology::DIM;
48 
49     RemoteSteerer(
50         unsigned period,
51         int port,
52         int root = 0,
53         MPI_Comm communicator = MPI_COMM_WORLD) :
54         Steerer<CELL_TYPE>(period),
55         port(port),
56         pipe(new Pipe(root, communicator))
57     {
58         if (MPILayer(communicator).rank() == root) {
59             commandServer.reset(new CommandServer<CELL_TYPE>(port, pipe));
60         }
61     }
62 
nextStep(GridType * grid,const Region<DIM> & validRegion,const Coord<DIM> & gridDim,unsigned step,SteererEvent event,std::size_t rank,bool lastCall,SteererFeedback * feedback)63     virtual void nextStep(
64         GridType *grid,
65         const Region<DIM>& validRegion,
66         const Coord<DIM>& gridDim,
67         unsigned step,
68         SteererEvent event,
69         std::size_t rank,
70         bool lastCall,
71         SteererFeedback *feedback)
72     {
73         LOG(DBG, "RemoteSteerer::nextStep(step = " << step << ")");
74         pipe->sync();
75         StringVec steeringRequests = pipe->retrieveSteeringRequests();
76 
77         for (StringVec::iterator i = steeringRequests.begin();
78              i != steeringRequests.end();
79              ++i) {
80             LOG(DBG, "RemoteSteerer::nextStep got" << *i);
81             StringVec parameters = StringOps::tokenize(*i, " ");
82             std::string command = pop_front(parameters);
83 
84             if (handlers.count(command) == 0) {
85                 std::string message = "handler not found: " + command;
86                 LOG(Logger::WARN, message);
87                 pipe->addSteeringFeedback(message);
88                 continue;
89             }
90 
91             int handled = (*handlers[command])(parameters, *pipe, grid, validRegion, step);
92             // we may have to requeue requrests which could not yet be handled
93             if (!handled) {
94                 pipe->addSteeringRequest(*i);
95             }
96         }
97 
98         pipe->sync();
99     }
100 
addAction(Action<CELL_TYPE> * action)101     void addAction(Action<CELL_TYPE> *action)
102     {
103         commandServer->addAction(action);
104     }
105 
addHandler(Handler<CELL_TYPE> * handler)106     void addHandler(Handler<CELL_TYPE> *handler)
107     {
108         handlers[handler->key()] = boost::shared_ptr<Handler<CELL_TYPE> >(handler);
109     }
110 
111     template<typename MEMBER_TYPE>
addDataAccessor(DataAccessor<CELL_TYPE,MEMBER_TYPE> * accessor)112     void addDataAccessor(DataAccessor<CELL_TYPE, MEMBER_TYPE> *accessor)
113     {
114         if (commandServer) {
115             GetAction<CELL_TYPE> *action = new GetAction<CELL_TYPE>(accessor->name());
116             addAction(action);
117         }
118 
119         boost::shared_ptr<DataAccessor<CELL_TYPE, MEMBER_TYPE> > accessorPtr(accessor);
120         handlers["get_" + accessor->name()].reset(new GetHandler<CELL_TYPE, MEMBER_TYPE>(accessorPtr));
121     }
122 
sendCommand(const std::string & command)123     void sendCommand(const std::string& command)
124     {
125         CommandServer<CELL_TYPE>::sendCommand(command, port);
126     }
127 
sendCommandWithFeedback(const std::string & command,int feedbackLines)128     StringVec sendCommandWithFeedback(const std::string& command, int feedbackLines)
129     {
130         return CommandServer<CELL_TYPE>::sendCommandWithFeedback(command, feedbackLines, port);
131     }
132 
133 private:
134     HandlerMap handlers;
135     int port;
136     boost::shared_ptr<Pipe> pipe;
137     boost::shared_ptr<CommandServer<CELL_TYPE> > commandServer;
138 };
139 
140 }
141 
142 #endif
143 #endif
144 #endif
145 
146 #endif
147