1 /*************************************************************************************************
2  * RPC utilities
3  *                                                               Copyright (C) 2009-2012 FAL Labs
4  * This file is part of Kyoto Tycoon.
5  * This program is free software: you can redistribute it and/or modify it under the terms of
6  * the GNU General Public License as published by the Free Software Foundation, either version
7  * 3 of the License, or any later version.
8  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10  * See the GNU General Public License for more details.
11  * You should have received a copy of the GNU General Public License along with this program.
12  * If not, see <http://www.gnu.org/licenses/>.
13  *************************************************************************************************/
14 
15 
16 #ifndef _KTRPC_H                         // duplication check
17 #define _KTRPC_H
18 
19 #include <ktcommon.h>
20 #include <ktutil.h>
21 #include <ktsocket.h>
22 #include <ktthserv.h>
23 #include <kthttp.h>
24 
25 #define KTRPCPATHPREFIX  "/rpc/"           ///< prefix of the RPC entry
26 #define KTRPCFORMMTYPE  "application/x-www-form-urlencoded"  ///< MIME type of form data
27 #define KTRPCTSVMTYPE  "text/tab-separated-values"  ///< MIME type of TSV
28 #define KTRPCTSVMATTR  "colenc"            ///< encoding attribute of TSV
29 
30 namespace kyototycoon {                  // common namespace
31 
32 
33 /**
34  * RPC client.
35  * @note Although all methods of this class are thread-safe, its instance does not have mutual
36  * exclusion mechanism.  So, multiple threads must not share the same instance and they must use
37  * their own respective instances.
38  */
39 class RPCClient {
40  public:
41   /**
42    * Return value.
43    */
44   enum ReturnValue {
45     RVSUCCESS,                           ///< success
46     RVENOIMPL,                           ///< not implemented
47     RVEINVALID,                          ///< invalid operation
48     RVELOGIC,                            ///< logical inconsistency
49     RVETIMEOUT,                          ///< timeout
50     RVEINTERNAL,                         ///< internal error
51     RVENETWORK,                          ///< network error
52     RVEMISC = 15                         ///< miscellaneous error
53   };
54   /**
55    * Default constructor.
56    */
RPCClient()57   RPCClient() : ua_(), host_(), port_(0), timeout_(0), open_(false), alive_(false) {
58     _assert_(true);
59   }
60   /**
61    * Destructor.
62    */
~RPCClient()63   ~RPCClient() {
64     _assert_(true);
65     if (open_) close();
66   }
67   /**
68    * Open the connection.
69    * @param host the name or the address of the server.  If it is an empty string, the local host
70    * is specified.
71    * @param port the port numger of the server.
72    * @param timeout the timeout of each operation in seconds.  If it is not more than 0, no
73    * timeout is specified.
74    * @return true on success, or false on failure.
75    */
76   bool open(const std::string& host = "", int32_t port = DEFPORT, double timeout = -1) {
77     _assert_(true);
78     if (open_ || port < 1) return false;
79     if (!ua_.open(host, port, timeout)) return false;
80     host_ = host;
81     port_ = port;
82     timeout_ = timeout;
83     open_ = true;
84     alive_ = true;
85     return true;
86   }
87   /**
88    * Close the connection.
89    * @param grace true for graceful shutdown, or false for immediate disconnection.
90    * @return true on success, or false on failure.
91    */
92   bool close(bool grace = true) {
93     _assert_(true);
94     if (!open_) return false;
95     bool err = false;
96     if (alive_ && !ua_.close(grace)) err = true;
97     return !err;
98   }
99   /**
100    * Call a remote procedure.
101    * @param name the name of the procecude.
102    * @param inmap a string map which contains the input of the procedure.  If it is NULL, it is
103    * ignored.
104    * @param outmap a string map to contain the output parameters.  If it is NULL, it is ignored.
105    * @return the return value of the procedure.
106    */
107   ReturnValue call(const std::string& name,
108                    const std::map<std::string, std::string>* inmap = NULL,
109                    std::map<std::string, std::string>* outmap = NULL) {
110     _assert_(true);
111     if (outmap) outmap->clear();
112     if (!open_) return RVENETWORK;
113     if (!alive_ && !ua_.open(host_, port_, timeout_)) return RVENETWORK;
114     alive_ = true;
115     std::string pathquery = KTRPCPATHPREFIX;
116     char* zstr = kc::urlencode(name.data(), name.size());
117     pathquery.append(zstr);
118     delete[] zstr;
119     std::map<std::string, std::string> reqheads;
120     std::string reqbody;
121     if (inmap) {
122       std::map<std::string, std::string> tmap;
123       tmap.insert(inmap->begin(), inmap->end());
124       int32_t enc = checkmapenc(tmap);
125       std::string outtype = KTRPCTSVMTYPE;
126       switch (enc) {
127         case 'B': kc::strprintf(&outtype, "; %s=B", KTRPCTSVMATTR); break;
128         case 'Q': kc::strprintf(&outtype, "; %s=Q", KTRPCTSVMATTR); break;
129         case 'U': kc::strprintf(&outtype, "; %s=U", KTRPCTSVMATTR); break;
130       }
131       reqheads["content-type"] = outtype;
132       if (enc != 0) tsvmapencode(&tmap, enc);
133       maptotsv(tmap, &reqbody);
134     }
135     std::map<std::string, std::string> resheads;
136     std::string resbody;
137     int32_t code = ua_.fetch(pathquery, HTTPClient::MPOST, &resbody, &resheads,
138                              &reqbody, &reqheads);
139     if (outmap) {
140       const char* rp = strmapget(resheads, "content-type");
141       if (rp) {
142         if (kc::strifwm(rp, KTRPCFORMMTYPE)) {
143           wwwformtomap(resbody.c_str(), outmap);
144         } else if (kc::strifwm(rp, KTRPCTSVMTYPE)) {
145           rp += sizeof(KTRPCTSVMTYPE) - 1;
146           int32_t enc = 0;
147           while (*rp != '\0') {
148             while (*rp == ' ' || *rp == ';') {
149               rp++;
150             }
151             if (kc::strifwm(rp, KTRPCTSVMATTR) && rp[sizeof(KTRPCTSVMATTR)-1] == '=') {
152               rp += sizeof(KTRPCTSVMATTR);
153               if (*rp == '"') rp++;
154               switch (*rp) {
155                 case 'b': case 'B': enc = 'B'; break;
156                 case 'q': case 'Q': enc = 'Q'; break;
157                 case 'u': case 'U': enc = 'U'; break;
158               }
159             }
160             while (*rp != '\0' && *rp != ' ' && *rp != ';') {
161               rp++;
162             }
163           }
164           tsvtomap(resbody, outmap);
165           if (enc != 0) tsvmapdecode(outmap, enc);
166         }
167       }
168     }
169     ReturnValue rv;
170     if (code < 1) {
171       rv = RVENETWORK;
172       ua_.close(false);
173       alive_ = false;
174     } else if (code >= 200 && code < 300) {
175       rv = RVSUCCESS;
176     } else if (code >= 400 && code < 500) {
177       if (code >= 450) {
178         rv = RVELOGIC;
179       } else {
180         rv = RVEINVALID;
181       }
182     } else if (code >= 500 && code < 600) {
183       if (code == 501) {
184         rv = RVENOIMPL;
185       } else if (code == 503) {
186         rv = RVETIMEOUT;
187       } else {
188         rv = RVEINTERNAL;
189       }
190     } else {
191       rv = RVEMISC;
192     }
193     return rv;
194   }
195   /**
196    * Get the expression of the socket.
197    * @return the expression of the socket or an empty string on failure.
198    */
expression()199   const std::string expression() {
200     _assert_(true);
201     if (!open_) return "";
202     std::string expr;
203     kc::strprintf(&expr, "%s:%d", host_.c_str(), port_);
204     return expr;
205   }
206   /**
207    * Reveal the internal HTTP client.
208    * @return the internal HTTP client.
209    */
reveal_core()210   HTTPClient* reveal_core() {
211     _assert_(true);
212     return &ua_;
213   }
214  private:
215   /** The HTTP client. */
216   HTTPClient ua_;
217   /** The host name of the server. */
218   std::string host_;
219   /** The port numer of the server. */
220   uint32_t port_;
221   /** The timeout. */
222   double timeout_;
223   /** The open flag. */
224   bool open_;
225   /** The alive flag. */
226   bool alive_;
227 };
228 
229 
230 /**
231  * RPC server.
232  */
233 class RPCServer {
234  public:
235   class Logger;
236   class Worker;
237   class Session;
238  private:
239   class WorkerAdapter;
240  public:
241   /**
242    * Interface to log internal information and errors.
243    */
244   class Logger : public HTTPServer::Logger {
245    public:
246     /**
247      * Destructor.
248      */
~Logger()249     virtual ~Logger() {
250       _assert_(true);
251     }
252   };
253   /**
254    * Interface to process each request.
255    */
256   class Worker {
257    public:
258     /**
259      * Destructor.
260      */
~Worker()261     virtual ~Worker() {
262       _assert_(true);
263     }
264     /**
265      * Process each request of RPC.
266      * @param serv the server.
267      * @param sess the session with the client.
268      * @param name the name of the procecude.
269      * @param inmap a string map which contains the input of the procedure.
270      * @param outmap a string map to contain the input parameters.
271      * @return the return value of the procedure.
272      */
273     virtual RPCClient::ReturnValue process(RPCServer* serv, Session* sess,
274                                            const std::string& name,
275                                            const std::map<std::string, std::string>& inmap,
276                                            std::map<std::string, std::string>& outmap) = 0;
277     /**
278      * Process each request of the others.
279      * @param serv the server.
280      * @param sess the session with the client.
281      * @param path the path of the requested resource.
282      * @param method the kind of the request methods.
283      * @param reqheads a string map which contains the headers of the request.  Header names are
284      * converted into lower cases.  The empty key means the request-line.
285      * @param reqbody a string which contains the entity body of the request.
286      * @param resheads a string map to contain the headers of the response.
287      * @param resbody a string to contain the entity body of the response.
288      * @param misc a string map which contains miscellaneous information.  "url" means the
289      * absolute URL.  "query" means the query string of the URL.
290      * @return the status code of the response.  If it is less than 1, internal server error is
291      * sent to the client and the connection is closed.
292      */
process(HTTPServer * serv,HTTPServer::Session * sess,const std::string & path,HTTPClient::Method method,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)293     virtual int32_t process(HTTPServer* serv, HTTPServer::Session* sess,
294                             const std::string& path, HTTPClient::Method method,
295                             const std::map<std::string, std::string>& reqheads,
296                             const std::string& reqbody,
297                             std::map<std::string, std::string>& resheads,
298                             std::string& resbody,
299                             const std::map<std::string, std::string>& misc) {
300       _assert_(serv && sess);
301       return 501;
302     }
303     /**
304      * Process each binary request.
305      * @param serv the server.
306      * @param sess the session with the client.
307      * @return true to reuse the session, or false to close the session.
308      */
process_binary(ThreadedServer * serv,ThreadedServer::Session * sess)309     virtual bool process_binary(ThreadedServer* serv, ThreadedServer::Session* sess) {
310       _assert_(serv && sess);
311       return false;
312     }
313     /**
314      * Process each idle event.
315      * @param serv the server.
316      */
process_idle(RPCServer * serv)317     virtual void process_idle(RPCServer* serv) {
318       _assert_(serv);
319     }
320     /**
321      * Process each timer event.
322      * @param serv the server.
323      */
process_timer(RPCServer * serv)324     virtual void process_timer(RPCServer* serv) {
325       _assert_(serv);
326     }
327     /**
328      * Process the starting event.
329      * @param serv the server.
330      */
process_start(RPCServer * serv)331     virtual void process_start(RPCServer* serv) {
332       _assert_(serv);
333     }
334     /**
335      * Process the finishing event.
336      * @param serv the server.
337      */
process_finish(RPCServer * serv)338     virtual void process_finish(RPCServer* serv) {
339       _assert_(serv);
340     }
341   };
342   /**
343    * Interface to log internal information and errors.
344    */
345   class Session {
346     friend class RPCServer;
347    public:
348     /**
349      * Interface of session local data.
350      */
351     class Data : public HTTPServer::Session::Data {
352      public:
353       /**
354        * Destructor.
355        */
~Data()356       virtual ~Data() {
357         _assert_(true);
358       }
359     };
360     /**
361      * Get the ID number of the session.
362      * @return the ID number of the session.
363      */
id()364     uint64_t id() {
365       _assert_(true);
366       return sess_->id();
367     }
368     /**
369      * Get the ID number of the worker thread.
370      * @return the ID number of the worker thread.  It is from 0 to less than the number of
371      * worker threads.
372      */
thread_id()373     uint32_t thread_id() {
374       _assert_(true);
375       return sess_->thread_id();
376     }
377     /**
378      * Set the session local data.
379      * @param data the session local data.  If it is NULL, no data is registered.
380      * @note The registered data is destroyed implicitly when the session object is destroyed or
381      * this method is called again.
382      */
set_data(Data * data)383     void set_data(Data* data) {
384       _assert_(true);
385       sess_->set_data(data);
386     }
387     /**
388      * Get the session local data.
389      * @return the session local data, or NULL if no data is registered.
390      */
data()391     Data* data() {
392       _assert_(true);
393       return (Data*)sess_->data();
394     }
395     /**
396      * Get the expression of the socket.
397      * @return the expression of the socket or an empty string on failure.
398      */
expression()399     const std::string expression() {
400       _assert_(true);
401       return sess_->expression();
402     }
403    private:
404     /**
405      * Constructor.
406      */
Session(HTTPServer::Session * sess)407     explicit Session(HTTPServer::Session* sess) : sess_(sess) {
408       _assert_(true);
409     }
410     /**
411      * Destructor.
412      */
~Session()413     virtual ~Session() {
414       _assert_(true);
415     }
416    private:
417     HTTPServer::Session* sess_;
418   };
419   /**
420    * Default constructor.
421    */
RPCServer()422   explicit RPCServer() : serv_(), worker_() {
423     _assert_(true);
424   }
425   /**
426    * Destructor.
427    */
~RPCServer()428   ~RPCServer() {
429     _assert_(true);
430   }
431   /**
432    * Set the network configurations.
433    * @param expr an expression of the address and the port of the server.
434    * @param timeout the timeout of each network operation in seconds.  If it is not more than 0,
435    * no timeout is specified.
436    */
437   void set_network(const std::string& expr, double timeout = -1) {
438     _assert_(true);
439     serv_.set_network(expr, timeout);
440   }
441   /**
442    * Set the logger to process each log message.
443    * @param logger the logger object.
444    * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
445    * Logger::INFO for normal information, Logger::SYSTEM for system information, and
446    * Logger::ERROR for fatal error.
447    */
448   void set_logger(Logger* logger, uint32_t kinds = Logger::SYSTEM | Logger::ERROR) {
449     _assert_(true);
450     serv_.set_logger(logger, kinds);
451   }
452   /**
453    * Set the worker to process each request.
454    * @param worker the worker object.
455    * @param thnum the number of worker threads.
456    */
457   void set_worker(Worker* worker, size_t thnum = 1) {
458     _assert_(true);
459     worker_.serv_ = this;
460     worker_.worker_ = worker;
461     serv_.set_worker(&worker_, thnum);
462   }
463   /**
464    * Start the service.
465    * @return true on success, or false on failure.
466    * @note This function blocks until the server stops by the RPCServer::stop method.
467    */
start()468   bool start() {
469     _assert_(true);
470     return serv_.start();
471   }
472   /**
473    * Stop the service.
474    * @return true on success, or false on failure.
475    */
stop()476   bool stop() {
477     _assert_(true);
478     return serv_.stop();
479   }
480   /**
481    * Finish the service.
482    * @return true on success, or false on failure.
483    */
finish()484   bool finish() {
485     _assert_(true);
486     return serv_.finish();
487   }
488   /**
489    * Log a message.
490    * @param kind the kind of the event.  Logger::DEBUG for debugging, Logger::INFO for normal
491    * information, Logger::SYSTEM for system information, and Logger::ERROR for fatal error.
492    * @param format the printf-like format string.  The conversion character `%' can be used with
493    * such flag characters as `s', `d', `o', `u', `x', `X', `c', `e', `E', `f', `g', `G', and `%'.
494    * @param ... used according to the format string.
495    */
log(Logger::Kind kind,const char * format,...)496   void log(Logger::Kind kind, const char* format, ...) {
497     _assert_(format);
498     va_list ap;
499     va_start(ap, format);
500     serv_.log_v(kind, format, ap);
501     va_end(ap);
502   }
503   /**
504    * Log a message.
505    * @note Equal to the original Cursor::set_value method except that the last parameters is
506    * va_list.
507    */
log_v(Logger::Kind kind,const char * format,va_list ap)508   void log_v(Logger::Kind kind, const char* format, va_list ap) {
509     _assert_(format);
510     serv_.log_v(kind, format, ap);
511   }
512   /**
513    * Reveal the internal HTTP server.
514    * @return the internal HTTP server.
515    */
reveal_core()516   HTTPServer* reveal_core() {
517     _assert_(true);
518     return &serv_;
519   }
520  private:
521   /**
522    * Adapter for the worker.
523    */
524   class WorkerAdapter : public HTTPServer::Worker {
525     friend class RPCServer;
526    public:
WorkerAdapter()527     WorkerAdapter() : serv_(NULL), worker_(NULL) {
528       _assert_(true);
529     }
530    private:
process(HTTPServer * serv,HTTPServer::Session * sess,const std::string & path,HTTPClient::Method method,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)531     int32_t process(HTTPServer* serv, HTTPServer::Session* sess,
532                     const std::string& path, HTTPClient::Method method,
533                     const std::map<std::string, std::string>& reqheads,
534                     const std::string& reqbody,
535                     std::map<std::string, std::string>& resheads,
536                     std::string& resbody,
537                     const std::map<std::string, std::string>& misc) {
538       const char* name = path.c_str();
539       if (!kc::strfwm(name, KTRPCPATHPREFIX))
540         return worker_->process(serv, sess, path, method, reqheads, reqbody,
541                                 resheads, resbody, misc);
542       name += sizeof(KTRPCPATHPREFIX) - 1;
543       size_t zsiz;
544       char* zbuf = kc::urldecode(name, &zsiz);
545       std::string rawname(zbuf, zsiz);
546       delete[] zbuf;
547       std::map<std::string, std::string> inmap;
548       const char* rp = strmapget(misc, "query");
549       if (rp) wwwformtomap(rp, &inmap);
550       rp = strmapget(reqheads, "content-type");
551       if (rp) {
552         if (kc::strifwm(rp, KTRPCFORMMTYPE)) {
553           wwwformtomap(reqbody.c_str(), &inmap);
554         } else if (kc::strifwm(rp, KTRPCTSVMTYPE)) {
555           rp += sizeof(KTRPCTSVMTYPE) - 1;
556           int32_t enc = 0;
557           while (*rp != '\0') {
558             while (*rp == ' ' || *rp == ';') {
559               rp++;
560             }
561             if (kc::strifwm(rp, KTRPCTSVMATTR) && rp[sizeof(KTRPCTSVMATTR)-1] == '=') {
562               rp += sizeof(KTRPCTSVMATTR);
563               if (*rp == '"') rp++;
564               switch (*rp) {
565                 case 'b': case 'B': enc = 'B'; break;
566                 case 'q': case 'Q': enc = 'Q'; break;
567                 case 'u': case 'U': enc = 'U'; break;
568               }
569             }
570             while (*rp != '\0' && *rp != ' ' && *rp != ';') {
571               rp++;
572             }
573           }
574           tsvtomap(reqbody, &inmap);
575           if (enc != 0) tsvmapdecode(&inmap, enc);
576         }
577       }
578       std::map<std::string, std::string> outmap;
579       Session mysess(sess);
580       RPCClient::ReturnValue rv = worker_->process(serv_, &mysess, rawname, inmap, outmap);
581       int32_t code = -1;
582       switch (rv) {
583         case RPCClient::RVSUCCESS: code = 200; break;
584         case RPCClient::RVENOIMPL: code = 501; break;
585         case RPCClient::RVEINVALID: code = 400; break;
586         case RPCClient::RVELOGIC: code = 450; break;
587         case RPCClient::RVETIMEOUT: code = 503; break;
588         default: code = 500; break;
589       }
590       int32_t enc = checkmapenc(outmap);
591       std::string outtype = KTRPCTSVMTYPE;
592       switch (enc) {
593         case 'B': kc::strprintf(&outtype, "; %s=B", KTRPCTSVMATTR); break;
594         case 'Q': kc::strprintf(&outtype, "; %s=Q", KTRPCTSVMATTR); break;
595         case 'U': kc::strprintf(&outtype, "; %s=U", KTRPCTSVMATTR); break;
596       }
597       resheads["content-type"] = outtype;
598       if (enc != 0) tsvmapencode(&outmap, enc);
599       maptotsv(outmap, &resbody);
600       return code;
601     }
process_binary(ThreadedServer * serv,ThreadedServer::Session * sess)602     bool process_binary(ThreadedServer* serv, ThreadedServer::Session* sess) {
603       return worker_->process_binary(serv, sess);
604     }
process_idle(HTTPServer * serv)605     void process_idle(HTTPServer* serv) {
606       worker_->process_idle(serv_);
607     }
process_timer(HTTPServer * serv)608     void process_timer(HTTPServer* serv) {
609       worker_->process_timer(serv_);
610     }
process_start(HTTPServer * serv)611     void process_start(HTTPServer* serv) {
612       worker_->process_start(serv_);
613     }
process_finish(HTTPServer * serv)614     void process_finish(HTTPServer* serv) {
615       worker_->process_finish(serv_);
616     }
617     RPCServer* serv_;
618     RPCServer::Worker* worker_;
619   };
620   /** Dummy constructor to forbid the use. */
621   RPCServer(const RPCServer&);
622   /** Dummy Operator to forbid the use. */
623   RPCServer& operator =(const RPCServer&);
624   /** The internal server. */
625   HTTPServer serv_;
626   /** The adapter for worker. */
627   WorkerAdapter worker_;
628 };
629 
630 
631 }                                        // common namespace
632 
633 #endif                                   // duplication check
634 
635 // END OF FILE
636