1 /************************************************************************************************* 2 * RPC utilities 3 * Copyright (C) 2009-2012 FAL Labs 4 * This file is part of Kyoto Tycoon. 5 * This program is free software: you can redistribute it and/or modify it under the terms of 6 * the GNU General Public License as published by the Free Software Foundation, either version 7 * 3 of the License, or any later version. 8 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 9 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 10 * See the GNU General Public License for more details. 11 * You should have received a copy of the GNU General Public License along with this program. 12 * If not, see <http://www.gnu.org/licenses/>. 13 *************************************************************************************************/ 14 15 16 #ifndef _KTRPC_H // duplication check 17 #define _KTRPC_H 18 19 #include <ktcommon.h> 20 #include <ktutil.h> 21 #include <ktsocket.h> 22 #include <ktthserv.h> 23 #include <kthttp.h> 24 25 #define KTRPCPATHPREFIX "/rpc/" ///< prefix of the RPC entry 26 #define KTRPCFORMMTYPE "application/x-www-form-urlencoded" ///< MIME type of form data 27 #define KTRPCTSVMTYPE "text/tab-separated-values" ///< MIME type of TSV 28 #define KTRPCTSVMATTR "colenc" ///< encoding attribute of TSV 29 30 namespace kyototycoon { // common namespace 31 32 33 /** 34 * RPC client. 35 * @note Although all methods of this class are thread-safe, its instance does not have mutual 36 * exclusion mechanism. So, multiple threads must not share the same instance and they must use 37 * their own respective instances. 38 */ 39 class RPCClient { 40 public: 41 /** 42 * Return value. 43 */ 44 enum ReturnValue { 45 RVSUCCESS, ///< success 46 RVENOIMPL, ///< not implemented 47 RVEINVALID, ///< invalid operation 48 RVELOGIC, ///< logical inconsistency 49 RVETIMEOUT, ///< timeout 50 RVEINTERNAL, ///< internal error 51 RVENETWORK, ///< network error 52 RVEMISC = 15 ///< miscellaneous error 53 }; 54 /** 55 * Default constructor. 56 */ RPCClient()57 RPCClient() : ua_(), host_(), port_(0), timeout_(0), open_(false), alive_(false) { 58 _assert_(true); 59 } 60 /** 61 * Destructor. 62 */ ~RPCClient()63 ~RPCClient() { 64 _assert_(true); 65 if (open_) close(); 66 } 67 /** 68 * Open the connection. 69 * @param host the name or the address of the server. If it is an empty string, the local host 70 * is specified. 71 * @param port the port numger of the server. 72 * @param timeout the timeout of each operation in seconds. If it is not more than 0, no 73 * timeout is specified. 74 * @return true on success, or false on failure. 75 */ 76 bool open(const std::string& host = "", int32_t port = DEFPORT, double timeout = -1) { 77 _assert_(true); 78 if (open_ || port < 1) return false; 79 if (!ua_.open(host, port, timeout)) return false; 80 host_ = host; 81 port_ = port; 82 timeout_ = timeout; 83 open_ = true; 84 alive_ = true; 85 return true; 86 } 87 /** 88 * Close the connection. 89 * @param grace true for graceful shutdown, or false for immediate disconnection. 90 * @return true on success, or false on failure. 91 */ 92 bool close(bool grace = true) { 93 _assert_(true); 94 if (!open_) return false; 95 bool err = false; 96 if (alive_ && !ua_.close(grace)) err = true; 97 return !err; 98 } 99 /** 100 * Call a remote procedure. 101 * @param name the name of the procecude. 102 * @param inmap a string map which contains the input of the procedure. If it is NULL, it is 103 * ignored. 104 * @param outmap a string map to contain the output parameters. If it is NULL, it is ignored. 105 * @return the return value of the procedure. 106 */ 107 ReturnValue call(const std::string& name, 108 const std::map<std::string, std::string>* inmap = NULL, 109 std::map<std::string, std::string>* outmap = NULL) { 110 _assert_(true); 111 if (outmap) outmap->clear(); 112 if (!open_) return RVENETWORK; 113 if (!alive_ && !ua_.open(host_, port_, timeout_)) return RVENETWORK; 114 alive_ = true; 115 std::string pathquery = KTRPCPATHPREFIX; 116 char* zstr = kc::urlencode(name.data(), name.size()); 117 pathquery.append(zstr); 118 delete[] zstr; 119 std::map<std::string, std::string> reqheads; 120 std::string reqbody; 121 if (inmap) { 122 std::map<std::string, std::string> tmap; 123 tmap.insert(inmap->begin(), inmap->end()); 124 int32_t enc = checkmapenc(tmap); 125 std::string outtype = KTRPCTSVMTYPE; 126 switch (enc) { 127 case 'B': kc::strprintf(&outtype, "; %s=B", KTRPCTSVMATTR); break; 128 case 'Q': kc::strprintf(&outtype, "; %s=Q", KTRPCTSVMATTR); break; 129 case 'U': kc::strprintf(&outtype, "; %s=U", KTRPCTSVMATTR); break; 130 } 131 reqheads["content-type"] = outtype; 132 if (enc != 0) tsvmapencode(&tmap, enc); 133 maptotsv(tmap, &reqbody); 134 } 135 std::map<std::string, std::string> resheads; 136 std::string resbody; 137 int32_t code = ua_.fetch(pathquery, HTTPClient::MPOST, &resbody, &resheads, 138 &reqbody, &reqheads); 139 if (outmap) { 140 const char* rp = strmapget(resheads, "content-type"); 141 if (rp) { 142 if (kc::strifwm(rp, KTRPCFORMMTYPE)) { 143 wwwformtomap(resbody.c_str(), outmap); 144 } else if (kc::strifwm(rp, KTRPCTSVMTYPE)) { 145 rp += sizeof(KTRPCTSVMTYPE) - 1; 146 int32_t enc = 0; 147 while (*rp != '\0') { 148 while (*rp == ' ' || *rp == ';') { 149 rp++; 150 } 151 if (kc::strifwm(rp, KTRPCTSVMATTR) && rp[sizeof(KTRPCTSVMATTR)-1] == '=') { 152 rp += sizeof(KTRPCTSVMATTR); 153 if (*rp == '"') rp++; 154 switch (*rp) { 155 case 'b': case 'B': enc = 'B'; break; 156 case 'q': case 'Q': enc = 'Q'; break; 157 case 'u': case 'U': enc = 'U'; break; 158 } 159 } 160 while (*rp != '\0' && *rp != ' ' && *rp != ';') { 161 rp++; 162 } 163 } 164 tsvtomap(resbody, outmap); 165 if (enc != 0) tsvmapdecode(outmap, enc); 166 } 167 } 168 } 169 ReturnValue rv; 170 if (code < 1) { 171 rv = RVENETWORK; 172 ua_.close(false); 173 alive_ = false; 174 } else if (code >= 200 && code < 300) { 175 rv = RVSUCCESS; 176 } else if (code >= 400 && code < 500) { 177 if (code >= 450) { 178 rv = RVELOGIC; 179 } else { 180 rv = RVEINVALID; 181 } 182 } else if (code >= 500 && code < 600) { 183 if (code == 501) { 184 rv = RVENOIMPL; 185 } else if (code == 503) { 186 rv = RVETIMEOUT; 187 } else { 188 rv = RVEINTERNAL; 189 } 190 } else { 191 rv = RVEMISC; 192 } 193 return rv; 194 } 195 /** 196 * Get the expression of the socket. 197 * @return the expression of the socket or an empty string on failure. 198 */ expression()199 const std::string expression() { 200 _assert_(true); 201 if (!open_) return ""; 202 std::string expr; 203 kc::strprintf(&expr, "%s:%d", host_.c_str(), port_); 204 return expr; 205 } 206 /** 207 * Reveal the internal HTTP client. 208 * @return the internal HTTP client. 209 */ reveal_core()210 HTTPClient* reveal_core() { 211 _assert_(true); 212 return &ua_; 213 } 214 private: 215 /** The HTTP client. */ 216 HTTPClient ua_; 217 /** The host name of the server. */ 218 std::string host_; 219 /** The port numer of the server. */ 220 uint32_t port_; 221 /** The timeout. */ 222 double timeout_; 223 /** The open flag. */ 224 bool open_; 225 /** The alive flag. */ 226 bool alive_; 227 }; 228 229 230 /** 231 * RPC server. 232 */ 233 class RPCServer { 234 public: 235 class Logger; 236 class Worker; 237 class Session; 238 private: 239 class WorkerAdapter; 240 public: 241 /** 242 * Interface to log internal information and errors. 243 */ 244 class Logger : public HTTPServer::Logger { 245 public: 246 /** 247 * Destructor. 248 */ ~Logger()249 virtual ~Logger() { 250 _assert_(true); 251 } 252 }; 253 /** 254 * Interface to process each request. 255 */ 256 class Worker { 257 public: 258 /** 259 * Destructor. 260 */ ~Worker()261 virtual ~Worker() { 262 _assert_(true); 263 } 264 /** 265 * Process each request of RPC. 266 * @param serv the server. 267 * @param sess the session with the client. 268 * @param name the name of the procecude. 269 * @param inmap a string map which contains the input of the procedure. 270 * @param outmap a string map to contain the input parameters. 271 * @return the return value of the procedure. 272 */ 273 virtual RPCClient::ReturnValue process(RPCServer* serv, Session* sess, 274 const std::string& name, 275 const std::map<std::string, std::string>& inmap, 276 std::map<std::string, std::string>& outmap) = 0; 277 /** 278 * Process each request of the others. 279 * @param serv the server. 280 * @param sess the session with the client. 281 * @param path the path of the requested resource. 282 * @param method the kind of the request methods. 283 * @param reqheads a string map which contains the headers of the request. Header names are 284 * converted into lower cases. The empty key means the request-line. 285 * @param reqbody a string which contains the entity body of the request. 286 * @param resheads a string map to contain the headers of the response. 287 * @param resbody a string to contain the entity body of the response. 288 * @param misc a string map which contains miscellaneous information. "url" means the 289 * absolute URL. "query" means the query string of the URL. 290 * @return the status code of the response. If it is less than 1, internal server error is 291 * sent to the client and the connection is closed. 292 */ process(HTTPServer * serv,HTTPServer::Session * sess,const std::string & path,HTTPClient::Method method,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)293 virtual int32_t process(HTTPServer* serv, HTTPServer::Session* sess, 294 const std::string& path, HTTPClient::Method method, 295 const std::map<std::string, std::string>& reqheads, 296 const std::string& reqbody, 297 std::map<std::string, std::string>& resheads, 298 std::string& resbody, 299 const std::map<std::string, std::string>& misc) { 300 _assert_(serv && sess); 301 return 501; 302 } 303 /** 304 * Process each binary request. 305 * @param serv the server. 306 * @param sess the session with the client. 307 * @return true to reuse the session, or false to close the session. 308 */ process_binary(ThreadedServer * serv,ThreadedServer::Session * sess)309 virtual bool process_binary(ThreadedServer* serv, ThreadedServer::Session* sess) { 310 _assert_(serv && sess); 311 return false; 312 } 313 /** 314 * Process each idle event. 315 * @param serv the server. 316 */ process_idle(RPCServer * serv)317 virtual void process_idle(RPCServer* serv) { 318 _assert_(serv); 319 } 320 /** 321 * Process each timer event. 322 * @param serv the server. 323 */ process_timer(RPCServer * serv)324 virtual void process_timer(RPCServer* serv) { 325 _assert_(serv); 326 } 327 /** 328 * Process the starting event. 329 * @param serv the server. 330 */ process_start(RPCServer * serv)331 virtual void process_start(RPCServer* serv) { 332 _assert_(serv); 333 } 334 /** 335 * Process the finishing event. 336 * @param serv the server. 337 */ process_finish(RPCServer * serv)338 virtual void process_finish(RPCServer* serv) { 339 _assert_(serv); 340 } 341 }; 342 /** 343 * Interface to log internal information and errors. 344 */ 345 class Session { 346 friend class RPCServer; 347 public: 348 /** 349 * Interface of session local data. 350 */ 351 class Data : public HTTPServer::Session::Data { 352 public: 353 /** 354 * Destructor. 355 */ ~Data()356 virtual ~Data() { 357 _assert_(true); 358 } 359 }; 360 /** 361 * Get the ID number of the session. 362 * @return the ID number of the session. 363 */ id()364 uint64_t id() { 365 _assert_(true); 366 return sess_->id(); 367 } 368 /** 369 * Get the ID number of the worker thread. 370 * @return the ID number of the worker thread. It is from 0 to less than the number of 371 * worker threads. 372 */ thread_id()373 uint32_t thread_id() { 374 _assert_(true); 375 return sess_->thread_id(); 376 } 377 /** 378 * Set the session local data. 379 * @param data the session local data. If it is NULL, no data is registered. 380 * @note The registered data is destroyed implicitly when the session object is destroyed or 381 * this method is called again. 382 */ set_data(Data * data)383 void set_data(Data* data) { 384 _assert_(true); 385 sess_->set_data(data); 386 } 387 /** 388 * Get the session local data. 389 * @return the session local data, or NULL if no data is registered. 390 */ data()391 Data* data() { 392 _assert_(true); 393 return (Data*)sess_->data(); 394 } 395 /** 396 * Get the expression of the socket. 397 * @return the expression of the socket or an empty string on failure. 398 */ expression()399 const std::string expression() { 400 _assert_(true); 401 return sess_->expression(); 402 } 403 private: 404 /** 405 * Constructor. 406 */ Session(HTTPServer::Session * sess)407 explicit Session(HTTPServer::Session* sess) : sess_(sess) { 408 _assert_(true); 409 } 410 /** 411 * Destructor. 412 */ ~Session()413 virtual ~Session() { 414 _assert_(true); 415 } 416 private: 417 HTTPServer::Session* sess_; 418 }; 419 /** 420 * Default constructor. 421 */ RPCServer()422 explicit RPCServer() : serv_(), worker_() { 423 _assert_(true); 424 } 425 /** 426 * Destructor. 427 */ ~RPCServer()428 ~RPCServer() { 429 _assert_(true); 430 } 431 /** 432 * Set the network configurations. 433 * @param expr an expression of the address and the port of the server. 434 * @param timeout the timeout of each network operation in seconds. If it is not more than 0, 435 * no timeout is specified. 436 */ 437 void set_network(const std::string& expr, double timeout = -1) { 438 _assert_(true); 439 serv_.set_network(expr, timeout); 440 } 441 /** 442 * Set the logger to process each log message. 443 * @param logger the logger object. 444 * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging, 445 * Logger::INFO for normal information, Logger::SYSTEM for system information, and 446 * Logger::ERROR for fatal error. 447 */ 448 void set_logger(Logger* logger, uint32_t kinds = Logger::SYSTEM | Logger::ERROR) { 449 _assert_(true); 450 serv_.set_logger(logger, kinds); 451 } 452 /** 453 * Set the worker to process each request. 454 * @param worker the worker object. 455 * @param thnum the number of worker threads. 456 */ 457 void set_worker(Worker* worker, size_t thnum = 1) { 458 _assert_(true); 459 worker_.serv_ = this; 460 worker_.worker_ = worker; 461 serv_.set_worker(&worker_, thnum); 462 } 463 /** 464 * Start the service. 465 * @return true on success, or false on failure. 466 * @note This function blocks until the server stops by the RPCServer::stop method. 467 */ start()468 bool start() { 469 _assert_(true); 470 return serv_.start(); 471 } 472 /** 473 * Stop the service. 474 * @return true on success, or false on failure. 475 */ stop()476 bool stop() { 477 _assert_(true); 478 return serv_.stop(); 479 } 480 /** 481 * Finish the service. 482 * @return true on success, or false on failure. 483 */ finish()484 bool finish() { 485 _assert_(true); 486 return serv_.finish(); 487 } 488 /** 489 * Log a message. 490 * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal 491 * information, Logger::SYSTEM for system information, and Logger::ERROR for fatal error. 492 * @param format the printf-like format string. The conversion character `%' can be used with 493 * such flag characters as `s', `d', `o', `u', `x', `X', `c', `e', `E', `f', `g', `G', and `%'. 494 * @param ... used according to the format string. 495 */ log(Logger::Kind kind,const char * format,...)496 void log(Logger::Kind kind, const char* format, ...) { 497 _assert_(format); 498 va_list ap; 499 va_start(ap, format); 500 serv_.log_v(kind, format, ap); 501 va_end(ap); 502 } 503 /** 504 * Log a message. 505 * @note Equal to the original Cursor::set_value method except that the last parameters is 506 * va_list. 507 */ log_v(Logger::Kind kind,const char * format,va_list ap)508 void log_v(Logger::Kind kind, const char* format, va_list ap) { 509 _assert_(format); 510 serv_.log_v(kind, format, ap); 511 } 512 /** 513 * Reveal the internal HTTP server. 514 * @return the internal HTTP server. 515 */ reveal_core()516 HTTPServer* reveal_core() { 517 _assert_(true); 518 return &serv_; 519 } 520 private: 521 /** 522 * Adapter for the worker. 523 */ 524 class WorkerAdapter : public HTTPServer::Worker { 525 friend class RPCServer; 526 public: WorkerAdapter()527 WorkerAdapter() : serv_(NULL), worker_(NULL) { 528 _assert_(true); 529 } 530 private: process(HTTPServer * serv,HTTPServer::Session * sess,const std::string & path,HTTPClient::Method method,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)531 int32_t process(HTTPServer* serv, HTTPServer::Session* sess, 532 const std::string& path, HTTPClient::Method method, 533 const std::map<std::string, std::string>& reqheads, 534 const std::string& reqbody, 535 std::map<std::string, std::string>& resheads, 536 std::string& resbody, 537 const std::map<std::string, std::string>& misc) { 538 const char* name = path.c_str(); 539 if (!kc::strfwm(name, KTRPCPATHPREFIX)) 540 return worker_->process(serv, sess, path, method, reqheads, reqbody, 541 resheads, resbody, misc); 542 name += sizeof(KTRPCPATHPREFIX) - 1; 543 size_t zsiz; 544 char* zbuf = kc::urldecode(name, &zsiz); 545 std::string rawname(zbuf, zsiz); 546 delete[] zbuf; 547 std::map<std::string, std::string> inmap; 548 const char* rp = strmapget(misc, "query"); 549 if (rp) wwwformtomap(rp, &inmap); 550 rp = strmapget(reqheads, "content-type"); 551 if (rp) { 552 if (kc::strifwm(rp, KTRPCFORMMTYPE)) { 553 wwwformtomap(reqbody.c_str(), &inmap); 554 } else if (kc::strifwm(rp, KTRPCTSVMTYPE)) { 555 rp += sizeof(KTRPCTSVMTYPE) - 1; 556 int32_t enc = 0; 557 while (*rp != '\0') { 558 while (*rp == ' ' || *rp == ';') { 559 rp++; 560 } 561 if (kc::strifwm(rp, KTRPCTSVMATTR) && rp[sizeof(KTRPCTSVMATTR)-1] == '=') { 562 rp += sizeof(KTRPCTSVMATTR); 563 if (*rp == '"') rp++; 564 switch (*rp) { 565 case 'b': case 'B': enc = 'B'; break; 566 case 'q': case 'Q': enc = 'Q'; break; 567 case 'u': case 'U': enc = 'U'; break; 568 } 569 } 570 while (*rp != '\0' && *rp != ' ' && *rp != ';') { 571 rp++; 572 } 573 } 574 tsvtomap(reqbody, &inmap); 575 if (enc != 0) tsvmapdecode(&inmap, enc); 576 } 577 } 578 std::map<std::string, std::string> outmap; 579 Session mysess(sess); 580 RPCClient::ReturnValue rv = worker_->process(serv_, &mysess, rawname, inmap, outmap); 581 int32_t code = -1; 582 switch (rv) { 583 case RPCClient::RVSUCCESS: code = 200; break; 584 case RPCClient::RVENOIMPL: code = 501; break; 585 case RPCClient::RVEINVALID: code = 400; break; 586 case RPCClient::RVELOGIC: code = 450; break; 587 case RPCClient::RVETIMEOUT: code = 503; break; 588 default: code = 500; break; 589 } 590 int32_t enc = checkmapenc(outmap); 591 std::string outtype = KTRPCTSVMTYPE; 592 switch (enc) { 593 case 'B': kc::strprintf(&outtype, "; %s=B", KTRPCTSVMATTR); break; 594 case 'Q': kc::strprintf(&outtype, "; %s=Q", KTRPCTSVMATTR); break; 595 case 'U': kc::strprintf(&outtype, "; %s=U", KTRPCTSVMATTR); break; 596 } 597 resheads["content-type"] = outtype; 598 if (enc != 0) tsvmapencode(&outmap, enc); 599 maptotsv(outmap, &resbody); 600 return code; 601 } process_binary(ThreadedServer * serv,ThreadedServer::Session * sess)602 bool process_binary(ThreadedServer* serv, ThreadedServer::Session* sess) { 603 return worker_->process_binary(serv, sess); 604 } process_idle(HTTPServer * serv)605 void process_idle(HTTPServer* serv) { 606 worker_->process_idle(serv_); 607 } process_timer(HTTPServer * serv)608 void process_timer(HTTPServer* serv) { 609 worker_->process_timer(serv_); 610 } process_start(HTTPServer * serv)611 void process_start(HTTPServer* serv) { 612 worker_->process_start(serv_); 613 } process_finish(HTTPServer * serv)614 void process_finish(HTTPServer* serv) { 615 worker_->process_finish(serv_); 616 } 617 RPCServer* serv_; 618 RPCServer::Worker* worker_; 619 }; 620 /** Dummy constructor to forbid the use. */ 621 RPCServer(const RPCServer&); 622 /** Dummy Operator to forbid the use. */ 623 RPCServer& operator =(const RPCServer&); 624 /** The internal server. */ 625 HTTPServer serv_; 626 /** The adapter for worker. */ 627 WorkerAdapter worker_; 628 }; 629 630 631 } // common namespace 632 633 #endif // duplication check 634 635 // END OF FILE 636