1 #ifndef LIBGEODECOMP_IO_REMOTESTEERER_COMMANDSERVER_H
2 #define LIBGEODECOMP_IO_REMOTESTEERER_COMMANDSERVER_H
3 
4 #include <libgeodecomp/config.h>
5 #include <libgeodecomp/io/logger.h>
6 #include <libgeodecomp/io/remotesteerer/action.h>
7 #include <libgeodecomp/io/remotesteerer/getaction.h>
8 #include <libgeodecomp/io/remotesteerer/interactor.h>
9 #include <libgeodecomp/io/remotesteerer/waitaction.h>
10 #include <libgeodecomp/misc/stringops.h>
11 
12 #include <boost/asio.hpp>
13 #include <boost/thread.hpp>
14 #include <boost/shared_ptr.hpp>
15 #include <cerrno>
16 #include <iostream>
17 #include <string>
18 #include <stdexcept>
19 #include <map>
20 
21 namespace LibGeoDecomp {
22 
23 namespace RemoteSteererHelpers {
24 
25 using boost::asio::ip::tcp;
26 
27 /**
28  * A server which can be reached by TCP (nc, telnet, ...). Its purpose
29  * is to do connection handling and parsing of incoming user commands.
30  * Action objects can be bound to certain commands and will be
31  * invoked. This allows a flexible extension of the CommandServer's
32  * functionality by composition, without having to resort to inheritance.
33  */
34 template<typename CELL_TYPE>
35 class CommandServer
36 {
37 public:
38     typedef std::map<std::string, boost::shared_ptr<Action<CELL_TYPE> > > ActionMap;
39 
40     /**
41      * This helper class lets us and the user safely close the
42      * CommandServer's network service, which is nice as it is using
43      * blocking IO and it's a major PITA to cancel that.
44      */
45     // fixme: move to dedicated file
46     class QuitAction : public Action<CELL_TYPE>
47     {
48     public:
QuitAction(bool * continueFlag)49         explicit QuitAction(bool *continueFlag) :
50             Action<CELL_TYPE>("quit", "Terminates the CommandServer and closes its socket."),
51             continueFlag(continueFlag)
52         {}
53 
operator()54         void operator()(const StringVec& parameters, Pipe& pipe)
55         {
56             LOG(INFO, "QuitAction called");
57             *continueFlag = false;
58         }
59 
60     private:
61         bool *continueFlag;
62     };
63 
64     /**
65      * This class is just a NOP, which may be used by the client to
66      * retrieve new steering feedback. This can't happen automatically
67      * as the CommandServer's listener thread blocks for input from
68      * the client.
69      */
70     // fixme: move to dedicated file
71     class PingAction : public Action<CELL_TYPE>
72     {
73     public:
74         using Action<CELL_TYPE>::key;
75 
PingAction()76         PingAction() :
77             Action<CELL_TYPE>("ping", "wake the CommandServer, useful to retrieve a new batch of feedback"),
78             c(0)
79         {}
80 
operator()81         void operator()(const StringVec& parameters, Pipe& pipe)
82         {
83             // // Do only reply if there is no feedback already waiting.
84             // // This is useful if the client is using ping to keep us
85             // // alive, but can only savely read back one line in
86             // // return. In that case this stragety avoids a memory leak
87             // // in our write buffer.
88             // if (pipe.copySteeringFeedback().size() == 0) {
89                 pipe.addSteeringFeedback("pong " + StringOps::itoa(++c));
90             // }
91         }
92 
93     private:
94         int c;
95     };
96 
CommandServer(int port,boost::shared_ptr<Pipe> pipe)97     CommandServer(
98         int port,
99         boost::shared_ptr<Pipe> pipe) :
100         port(port),
101         pipe(pipe),
102         serverThread(&CommandServer::runServer, this)
103     {
104         addAction(new QuitAction(&continueFlag));
105         addAction(new PingAction);
106         addAction(new WaitAction<CELL_TYPE>);
107 
108         // The thread may take a while to start up. We need to wait
109         // here so we don't try to clean up in the d-tor before the
110         // thread has created anything.
111         boost::unique_lock<boost::mutex> lock(mutex);
112         while(!acceptor) {
113             threadCreationVar.wait(lock);
114         }
115     }
116 
~CommandServer()117     ~CommandServer()
118     {
119         signalClose();
120         LOG(DBG, "CommandServer waiting for network thread");
121         serverThread.join();
122     }
123 
124     /**
125      * Sends a message back to the end user. This is the primary way
126      * for (user-defined) Actions to give feedback.
127      */
sendMessage(const std::string & message)128     void sendMessage(const std::string& message)
129     {
130         LOG(DBG, "CommandServer::sendMessage(" << message << ")");
131         boost::system::error_code errorCode;
132         boost::asio::write(
133             *socket,
134             boost::asio::buffer(message),
135             boost::asio::transfer_all(),
136             errorCode);
137 
138         if (errorCode) {
139             LOG(WARN, "CommandServer::sendMessage encountered " << errorCode.message());
140         }
141     }
142 
143     /**
144      * A convenience method to send a string to a CommandServer
145      * listeting on the given host/port combination.
146      */
147     static void sendCommand(const std::string& command, int port, const std::string& host = "127.0.0.1")
148     {
149         sendCommandWithFeedback(command, 0, port, host);
150     }
151 
152     static StringVec sendCommandWithFeedback(const std::string& command, int feedbackLines, int port, const std::string& host = "127.0.0.1")
153     {
154         LOG(DBG, "CommandServer::sendCommandWithFeedback(" << command << ", port = " << port << ", host = " << host << ")");
155         Interactor interactor(command, feedbackLines, false, port, host);
156         interactor();
157         return interactor.feedback();
158         boost::asio::io_service ioService;
159         tcp::resolver resolver(ioService);
160         tcp::resolver::query query(host, StringOps::itoa(port));
161         tcp::resolver::iterator endpointIterator = resolver.resolve(query);
162         tcp::socket socket(ioService);
163         boost::asio::connect(socket, endpointIterator);
164         boost::system::error_code errorCode;
165 
166         boost::asio::write(
167             socket,
168             boost::asio::buffer(command),
169             boost::asio::transfer_all(),
170             errorCode);
171 
172         if (errorCode) {
173             LOG(WARN, "error while writing to socket: " << errorCode.message());
174         }
175 
176         StringVec ret;
177 
178         for (int i = 0; i < feedbackLines; ++i) {
179             boost::asio::streambuf buf;
180             boost::system::error_code errorCode;
181 
182             LOG(DBG, "CommandServer::sendCommandWithFeedback() reading line");
183 
184             std::size_t length = boost::asio::read_until(socket, buf, '\n', errorCode);
185             if (errorCode) {
186                 LOG(WARN, "error while writing to socket: " << errorCode.message());
187             }
188 
189             // purge \n at end of line
190             if (length) {
191                 length -= 1;
192             }
193 
194             std::istream lineBuf(&buf);
195             std::string line(length, 'X');
196             lineBuf.read(&line[0], length);
197             ret << line;
198         }
199 
200         return ret;
201     }
202 
203     /**
204      * Register a server-side callback for handling user input. The
205      * CommandServer will assume ownership of the action and free its
206      * memory upon destruction.
207      */
addAction(Action<CELL_TYPE> * action)208     void addAction(Action<CELL_TYPE> *action)
209     {
210         actions[action->key()] = boost::shared_ptr<Action<CELL_TYPE> >(action);
211     }
212 
213 private:
214     int port;
215     boost::shared_ptr<Pipe> pipe;
216     boost::asio::io_service ioService;
217     boost::shared_ptr<tcp::acceptor> acceptor;
218     boost::shared_ptr<tcp::socket> socket;
219     boost::thread serverThread;
220     boost::condition_variable threadCreationVar;
221     boost::mutex mutex;
222     ActionMap actions;
223     bool continueFlag;
224 
runSession()225     void runSession()
226     {
227         for (;;) {
228             boost::array<char, 1024> buf;
229             boost::system::error_code errorCode;
230             LOG(DBG, "CommandServer::runSession(): reading");
231             std::size_t length = socket->read_some(boost::asio::buffer(buf), errorCode);
232             LOG(DBG, "CommandServer::runSession(): read " << length << " bytes");
233 
234             if (length > 0) {
235                 std::string input(buf.data(), length);
236                 handleInput(input);
237             }
238 
239             if (errorCode == boost::asio::error::eof) {
240                 LOG(INFO, "CommandServer::runSession(): client closed connection");
241                 return;
242             }
243 
244             if (errorCode) {
245                 LOG(WARN, "CommandServer::runSession encountered " << errorCode.message());
246             }
247 
248             if (!socket->is_open()) {
249                 return;
250             }
251 
252             StringVec feedback = pipe->retrieveSteeringFeedback();
253             for (StringVec::iterator i = feedback.begin();
254                  i != feedback.end();
255                  ++i) {
256                 LOG(DBG, "CommandServer::runSession sending »" << *i << "«");
257                 sendMessage(*i + "\n");
258             }
259         }
260     }
261 
handleInput(const std::string & input)262     void handleInput(const std::string& input)
263     {
264         LOG(DBG, "CommandServer::handleInput(" << input << ")");
265         StringVec lines = StringOps::tokenize(input, "\n");
266         std::string zeroString("x");
267         zeroString[0] = 0;
268 
269         for (StringVec::iterator iter = lines.begin();
270              iter != lines.end();
271              ++iter) {
272             StringVec parameters = StringOps::tokenize(*iter, " \n\r");
273 
274             if (*iter == zeroString) {
275                 // silently ignore strings containing a single 0
276                 continue;
277             }
278 
279             if (parameters.size() == 0) {
280                 pipe->addSteeringFeedback("no command given");
281                 continue;
282             }
283 
284             std::string command = pop_front(parameters);
285             if (actions.count(command) == 0) {
286                 std::string message = "command not found: " + command;
287                 LOG(WARN, message);
288                 pipe->addSteeringFeedback(message);
289                 pipe->addSteeringFeedback("try \"help\"");
290             } else {
291                 (*actions[command])(parameters, *pipe);
292             }
293         }
294     }
295 
runServer()296     int runServer()
297     {
298         try {
299             // Thread-aware initialization, allows c-tor to exit safely.
300             {
301                 boost::unique_lock<boost::mutex> lock(mutex);
302                 acceptor.reset(new tcp::acceptor(ioService, tcp::endpoint(tcp::v4(), port)));
303             }
304             threadCreationVar.notify_one();
305 
306             boost::system::error_code errorCode;
307             continueFlag = true;
308 
309             while (continueFlag) {
310                 LOG(DBG, "CommandServer: waiting for new connection");
311                 socket.reset(new tcp::socket(ioService));
312                 acceptor->accept(*socket, errorCode);
313 
314                 if (errorCode) {
315                     LOG(WARN, "CommandServer::runServer() encountered " << errorCode.message());
316                 } else {
317                     LOG(INFO, "CommandServer: client connected");
318                     runSession();
319                     LOG(INFO, "CommandServer: client disconnected");
320                 }
321             }
322         }
323         catch (std::exception& e) {
324             LOG(FATAL, "CommandServer::runServer() caught exception " << e.what() << ", exiting");
325             return 1;
326         }
327 
328         return 0;
329     }
330 
signalClose()331     void signalClose()
332     {
333         sendCommand("quit", port);
334     }
335 };
336 
337 
338 }
339 
340 }
341 
342 #endif
343