1 #ifndef LIBGEODECOMP_IO_REMOTESTEERER_COMMANDSERVER_H 2 #define LIBGEODECOMP_IO_REMOTESTEERER_COMMANDSERVER_H 3 4 #include <libgeodecomp/config.h> 5 #include <libgeodecomp/io/logger.h> 6 #include <libgeodecomp/io/remotesteerer/action.h> 7 #include <libgeodecomp/io/remotesteerer/getaction.h> 8 #include <libgeodecomp/io/remotesteerer/interactor.h> 9 #include <libgeodecomp/io/remotesteerer/waitaction.h> 10 #include <libgeodecomp/misc/stringops.h> 11 12 #include <boost/asio.hpp> 13 #include <boost/thread.hpp> 14 #include <boost/shared_ptr.hpp> 15 #include <cerrno> 16 #include <iostream> 17 #include <string> 18 #include <stdexcept> 19 #include <map> 20 21 namespace LibGeoDecomp { 22 23 namespace RemoteSteererHelpers { 24 25 using boost::asio::ip::tcp; 26 27 /** 28 * A server which can be reached by TCP (nc, telnet, ...). Its purpose 29 * is to do connection handling and parsing of incoming user commands. 30 * Action objects can be bound to certain commands and will be 31 * invoked. This allows a flexible extension of the CommandServer's 32 * functionality by composition, without having to resort to inheritance. 33 */ 34 template<typename CELL_TYPE> 35 class CommandServer 36 { 37 public: 38 typedef std::map<std::string, boost::shared_ptr<Action<CELL_TYPE> > > ActionMap; 39 40 /** 41 * This helper class lets us and the user safely close the 42 * CommandServer's network service, which is nice as it is using 43 * blocking IO and it's a major PITA to cancel that. 44 */ 45 // fixme: move to dedicated file 46 class QuitAction : public Action<CELL_TYPE> 47 { 48 public: QuitAction(bool * continueFlag)49 explicit QuitAction(bool *continueFlag) : 50 Action<CELL_TYPE>("quit", "Terminates the CommandServer and closes its socket."), 51 continueFlag(continueFlag) 52 {} 53 operator()54 void operator()(const StringVec& parameters, Pipe& pipe) 55 { 56 LOG(INFO, "QuitAction called"); 57 *continueFlag = false; 58 } 59 60 private: 61 bool *continueFlag; 62 }; 63 64 /** 65 * This class is just a NOP, which may be used by the client to 66 * retrieve new steering feedback. This can't happen automatically 67 * as the CommandServer's listener thread blocks for input from 68 * the client. 69 */ 70 // fixme: move to dedicated file 71 class PingAction : public Action<CELL_TYPE> 72 { 73 public: 74 using Action<CELL_TYPE>::key; 75 PingAction()76 PingAction() : 77 Action<CELL_TYPE>("ping", "wake the CommandServer, useful to retrieve a new batch of feedback"), 78 c(0) 79 {} 80 operator()81 void operator()(const StringVec& parameters, Pipe& pipe) 82 { 83 // // Do only reply if there is no feedback already waiting. 84 // // This is useful if the client is using ping to keep us 85 // // alive, but can only savely read back one line in 86 // // return. In that case this stragety avoids a memory leak 87 // // in our write buffer. 88 // if (pipe.copySteeringFeedback().size() == 0) { 89 pipe.addSteeringFeedback("pong " + StringOps::itoa(++c)); 90 // } 91 } 92 93 private: 94 int c; 95 }; 96 CommandServer(int port,boost::shared_ptr<Pipe> pipe)97 CommandServer( 98 int port, 99 boost::shared_ptr<Pipe> pipe) : 100 port(port), 101 pipe(pipe), 102 serverThread(&CommandServer::runServer, this) 103 { 104 addAction(new QuitAction(&continueFlag)); 105 addAction(new PingAction); 106 addAction(new WaitAction<CELL_TYPE>); 107 108 // The thread may take a while to start up. We need to wait 109 // here so we don't try to clean up in the d-tor before the 110 // thread has created anything. 111 boost::unique_lock<boost::mutex> lock(mutex); 112 while(!acceptor) { 113 threadCreationVar.wait(lock); 114 } 115 } 116 ~CommandServer()117 ~CommandServer() 118 { 119 signalClose(); 120 LOG(DBG, "CommandServer waiting for network thread"); 121 serverThread.join(); 122 } 123 124 /** 125 * Sends a message back to the end user. This is the primary way 126 * for (user-defined) Actions to give feedback. 127 */ sendMessage(const std::string & message)128 void sendMessage(const std::string& message) 129 { 130 LOG(DBG, "CommandServer::sendMessage(" << message << ")"); 131 boost::system::error_code errorCode; 132 boost::asio::write( 133 *socket, 134 boost::asio::buffer(message), 135 boost::asio::transfer_all(), 136 errorCode); 137 138 if (errorCode) { 139 LOG(WARN, "CommandServer::sendMessage encountered " << errorCode.message()); 140 } 141 } 142 143 /** 144 * A convenience method to send a string to a CommandServer 145 * listeting on the given host/port combination. 146 */ 147 static void sendCommand(const std::string& command, int port, const std::string& host = "127.0.0.1") 148 { 149 sendCommandWithFeedback(command, 0, port, host); 150 } 151 152 static StringVec sendCommandWithFeedback(const std::string& command, int feedbackLines, int port, const std::string& host = "127.0.0.1") 153 { 154 LOG(DBG, "CommandServer::sendCommandWithFeedback(" << command << ", port = " << port << ", host = " << host << ")"); 155 Interactor interactor(command, feedbackLines, false, port, host); 156 interactor(); 157 return interactor.feedback(); 158 boost::asio::io_service ioService; 159 tcp::resolver resolver(ioService); 160 tcp::resolver::query query(host, StringOps::itoa(port)); 161 tcp::resolver::iterator endpointIterator = resolver.resolve(query); 162 tcp::socket socket(ioService); 163 boost::asio::connect(socket, endpointIterator); 164 boost::system::error_code errorCode; 165 166 boost::asio::write( 167 socket, 168 boost::asio::buffer(command), 169 boost::asio::transfer_all(), 170 errorCode); 171 172 if (errorCode) { 173 LOG(WARN, "error while writing to socket: " << errorCode.message()); 174 } 175 176 StringVec ret; 177 178 for (int i = 0; i < feedbackLines; ++i) { 179 boost::asio::streambuf buf; 180 boost::system::error_code errorCode; 181 182 LOG(DBG, "CommandServer::sendCommandWithFeedback() reading line"); 183 184 std::size_t length = boost::asio::read_until(socket, buf, '\n', errorCode); 185 if (errorCode) { 186 LOG(WARN, "error while writing to socket: " << errorCode.message()); 187 } 188 189 // purge \n at end of line 190 if (length) { 191 length -= 1; 192 } 193 194 std::istream lineBuf(&buf); 195 std::string line(length, 'X'); 196 lineBuf.read(&line[0], length); 197 ret << line; 198 } 199 200 return ret; 201 } 202 203 /** 204 * Register a server-side callback for handling user input. The 205 * CommandServer will assume ownership of the action and free its 206 * memory upon destruction. 207 */ addAction(Action<CELL_TYPE> * action)208 void addAction(Action<CELL_TYPE> *action) 209 { 210 actions[action->key()] = boost::shared_ptr<Action<CELL_TYPE> >(action); 211 } 212 213 private: 214 int port; 215 boost::shared_ptr<Pipe> pipe; 216 boost::asio::io_service ioService; 217 boost::shared_ptr<tcp::acceptor> acceptor; 218 boost::shared_ptr<tcp::socket> socket; 219 boost::thread serverThread; 220 boost::condition_variable threadCreationVar; 221 boost::mutex mutex; 222 ActionMap actions; 223 bool continueFlag; 224 runSession()225 void runSession() 226 { 227 for (;;) { 228 boost::array<char, 1024> buf; 229 boost::system::error_code errorCode; 230 LOG(DBG, "CommandServer::runSession(): reading"); 231 std::size_t length = socket->read_some(boost::asio::buffer(buf), errorCode); 232 LOG(DBG, "CommandServer::runSession(): read " << length << " bytes"); 233 234 if (length > 0) { 235 std::string input(buf.data(), length); 236 handleInput(input); 237 } 238 239 if (errorCode == boost::asio::error::eof) { 240 LOG(INFO, "CommandServer::runSession(): client closed connection"); 241 return; 242 } 243 244 if (errorCode) { 245 LOG(WARN, "CommandServer::runSession encountered " << errorCode.message()); 246 } 247 248 if (!socket->is_open()) { 249 return; 250 } 251 252 StringVec feedback = pipe->retrieveSteeringFeedback(); 253 for (StringVec::iterator i = feedback.begin(); 254 i != feedback.end(); 255 ++i) { 256 LOG(DBG, "CommandServer::runSession sending »" << *i << "«"); 257 sendMessage(*i + "\n"); 258 } 259 } 260 } 261 handleInput(const std::string & input)262 void handleInput(const std::string& input) 263 { 264 LOG(DBG, "CommandServer::handleInput(" << input << ")"); 265 StringVec lines = StringOps::tokenize(input, "\n"); 266 std::string zeroString("x"); 267 zeroString[0] = 0; 268 269 for (StringVec::iterator iter = lines.begin(); 270 iter != lines.end(); 271 ++iter) { 272 StringVec parameters = StringOps::tokenize(*iter, " \n\r"); 273 274 if (*iter == zeroString) { 275 // silently ignore strings containing a single 0 276 continue; 277 } 278 279 if (parameters.size() == 0) { 280 pipe->addSteeringFeedback("no command given"); 281 continue; 282 } 283 284 std::string command = pop_front(parameters); 285 if (actions.count(command) == 0) { 286 std::string message = "command not found: " + command; 287 LOG(WARN, message); 288 pipe->addSteeringFeedback(message); 289 pipe->addSteeringFeedback("try \"help\""); 290 } else { 291 (*actions[command])(parameters, *pipe); 292 } 293 } 294 } 295 runServer()296 int runServer() 297 { 298 try { 299 // Thread-aware initialization, allows c-tor to exit safely. 300 { 301 boost::unique_lock<boost::mutex> lock(mutex); 302 acceptor.reset(new tcp::acceptor(ioService, tcp::endpoint(tcp::v4(), port))); 303 } 304 threadCreationVar.notify_one(); 305 306 boost::system::error_code errorCode; 307 continueFlag = true; 308 309 while (continueFlag) { 310 LOG(DBG, "CommandServer: waiting for new connection"); 311 socket.reset(new tcp::socket(ioService)); 312 acceptor->accept(*socket, errorCode); 313 314 if (errorCode) { 315 LOG(WARN, "CommandServer::runServer() encountered " << errorCode.message()); 316 } else { 317 LOG(INFO, "CommandServer: client connected"); 318 runSession(); 319 LOG(INFO, "CommandServer: client disconnected"); 320 } 321 } 322 } 323 catch (std::exception& e) { 324 LOG(FATAL, "CommandServer::runServer() caught exception " << e.what() << ", exiting"); 325 return 1; 326 } 327 328 return 0; 329 } 330 signalClose()331 void signalClose() 332 { 333 sendCommand("quit", port); 334 } 335 }; 336 337 338 } 339 340 } 341 342 #endif 343