1 /*************************************************************************************************
2  * A handy cache/storage server
3  *                                                               Copyright (C) 2009-2012 FAL Labs
4  * This file is part of Kyoto Tycoon.
5  * This program is free software: you can redistribute it and/or modify it under the terms of
6  * the GNU General Public License as published by the Free Software Foundation, either version
7  * 3 of the License, or any later version.
8  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10  * See the GNU General Public License for more details.
11  * You should have received a copy of the GNU General Public License along with this program.
12  * If not, see <http://www.gnu.org/licenses/>.
13  *************************************************************************************************/
14 
15 
16 #include "cmdcommon.h"
17 
18 
19 enum {                                   // enumeration for operation counting
20   CNTSET,                                // setting operations
21   CNTSETMISS,                            // misses of setting operations
22   CNTREMOVE,                             // removing operations
23   CNTREMOVEMISS,                         // misses of removing operations
24   CNTGET,                                // getting operations
25   CNTGETMISS,                            // misses of getting operations
26   CNTSCRIPT,                             // scripting operations
27   CNTMISC                                // miscellaneous operations
28 };
29 typedef uint64_t OpCount[CNTMISC+1];     // counters per thread
30 
31 
32 // global variables
33 const char* g_progname;                  // program name
34 int32_t g_procid;                        // process ID number
35 double g_starttime;                      // start time
36 bool g_daemon;                           // daemon flag
37 kt::RPCServer* g_serv;                   // running RPC server
38 bool g_restart;                          // restart flag
39 
40 
41 // function prototypes
42 int main(int argc, char** argv);
43 static void usage();
44 static void killserver(int signum);
45 static int32_t run(int argc, char** argv);
46 static int32_t proc(const std::vector<std::string>& dbpaths,
47                     const char* host, int32_t port, double tout, int32_t thnum,
48                     const char* logpath, uint32_t logkinds,
49                     const char* ulogpath, int64_t ulim, double uasi,
50                     int32_t sid, int32_t omode, double asi, bool ash,
51                     const char* bgspath, double bgsi, kc::Compressor* bgscomp, bool dmn,
52                     const char* pidpath, const char* cmdpath, const char* scrpath,
53                     const char* mhost, int32_t mport, const char* rtspath, double riv,
54                     const char* plsvpath, const char* plsvex, const char* pldbpath);
55 static bool dosnapshot(const char* bgspath, kc::Compressor* bgscomp,
56                        kt::TimedDB* dbs, int32_t dbnum, kt::RPCServer* serv);
57 
58 
59 // logger implementation
60 class Logger : public kt::RPCServer::Logger {
61  public:
62   // constructor
Logger()63   explicit Logger() : strm_(NULL), lock_() {}
64   // destructor
~Logger()65   ~Logger() {
66     if (strm_) close();
67   }
68   // open the stream
open(const char * path)69   bool open(const char* path) {
70     if (strm_) return false;
71     if (path && *path != '\0' && std::strcmp(path, "-")) {
72       std::ofstream* strm = new std::ofstream;
73       strm->open(path, std::ios_base::out | std::ios_base::binary | std::ios_base::app);
74       if (!*strm) {
75         delete strm;
76         return false;
77       }
78       strm_ = strm;
79     } else {
80       strm_ = &std::cout;
81     }
82     return true;
83   }
84   // close the stream
close()85   void close() {
86     if (!strm_) return;
87     if (strm_ != &std::cout) delete strm_;
88     strm_ = NULL;
89   }
90   // process a log message.
log(Kind kind,const char * message)91   void log(Kind kind, const char* message) {
92     if (!strm_) return;
93     char date[48];
94     kt::datestrwww(kc::nan(), kc::INT32MAX, 6, date);
95     const char* kstr = "MISC";
96     switch (kind) {
97       case kt::RPCServer::Logger::DEBUG: kstr = "DEBUG"; break;
98       case kt::RPCServer::Logger::INFO: kstr = "INFO"; break;
99       case kt::RPCServer::Logger::SYSTEM: kstr = "SYSTEM"; break;
100       case kt::RPCServer::Logger::ERROR: kstr = "ERROR"; break;
101     }
102     lock_.lock();
103     *strm_ << date << ": [" << kstr << "]: " << message << "\n";
104     strm_->flush();
105     lock_.unlock();
106   }
107  private:
108   std::ostream* strm_;
109   kc::Mutex lock_;
110 };
111 
112 
113 // database logger implementation
114 class DBLogger : public kc::BasicDB::Logger {
115  public:
116   // constructor
DBLogger(::Logger * logger,uint32_t kinds)117   explicit DBLogger(::Logger* logger, uint32_t kinds) : logger_(logger), kinds_(kinds) {}
118   // process a log message.
log(const char * file,int32_t line,const char * func,kc::BasicDB::Logger::Kind kind,const char * message)119   void log(const char* file, int32_t line, const char* func,
120            kc::BasicDB::Logger::Kind kind, const char* message) {
121     kt::RPCServer::Logger::Kind rkind;
122     switch (kind) {
123       default: rkind = kt::RPCServer::Logger::DEBUG; break;
124       case kc::BasicDB::Logger::INFO: rkind = kt::RPCServer::Logger::INFO; break;
125       case kc::BasicDB::Logger::WARN: rkind = kt::RPCServer::Logger::SYSTEM; break;
126       case kc::BasicDB::Logger::ERROR: rkind = kt::RPCServer::Logger::ERROR; break;
127     }
128     if (!(rkind & kinds_)) return;
129     std::string lmsg;
130     kc::strprintf(&lmsg, "[DB]: %s", message);
131     logger_->log(rkind, lmsg.c_str());
132   }
133  private:
134   ::Logger* logger_;
135   uint32_t kinds_;
136 };
137 
138 
139 // replication slave implemantation
140 class Slave : public kc::Thread {
141   friend class Worker;
142  public:
143   // constructor
Slave(uint16_t sid,const char * rtspath,const char * host,int32_t port,double riv,kt::RPCServer * serv,kt::TimedDB * dbs,int32_t dbnum,kt::UpdateLogger * ulog,DBUpdateLogger * ulogdbs)144   explicit Slave(uint16_t sid, const char* rtspath, const char* host, int32_t port, double riv,
145                  kt::RPCServer* serv, kt::TimedDB* dbs, int32_t dbnum,
146                  kt::UpdateLogger* ulog, DBUpdateLogger* ulogdbs) :
147       lock_(), sid_(sid), rtspath_(rtspath), host_(""), port_(port), riv_(riv),
148       serv_(serv), dbs_(dbs), dbnum_(dbnum), ulog_(ulog), ulogdbs_(ulogdbs),
149       wrts_(kc::UINT64MAX), rts_(0), alive_(true), hup_(false) {
150     if (host) host_ = host;
151   }
152   // stop the slave
stop()153   void stop() {
154     alive_ = false;
155   }
156   // restart the slave
restart()157   void restart() {
158     hup_ = true;
159   }
160   // set the configuration of the master
set_master(const std::string & host,int32_t port,uint64_t ts,double iv)161   void set_master(const std::string& host, int32_t port, uint64_t ts, double iv) {
162     kc::ScopedSpinLock lock(&lock_);
163     host_ = host;
164     port_ = port;
165     wrts_ = ts;
166     if (iv >= 0) riv_ = iv;
167   }
168   // get the host name of the master
host()169   std::string host() {
170     kc::ScopedSpinLock lock(&lock_);
171     return host_;
172   }
173   // get the port number name of the master
port()174   int32_t port() {
175     kc::ScopedSpinLock lock(&lock_);
176     return port_;
177   }
178   // get the replication time stamp
rts()179   uint64_t rts() {
180     return rts_;
181   }
182   // get the replication interval
riv()183   double riv() {
184     return riv_;
185   }
186  private:
187   static const int32_t DUMMYFREQ = 256;
188   static const size_t RTSFILESIZ = 21;
189   // perform replication
run(void)190   void run(void) {
191     if (!rtspath_) return;
192     kc::File rtsfile;
193     if (!rtsfile.open(rtspath_, kc::File::OWRITER | kc::File::OCREATE, kc::NUMBUFSIZ) ||
194         !rtsfile.truncate(RTSFILESIZ)) {
195       serv_->log(Logger::ERROR, "opening the RTS file failed: path=%s", rtspath_);
196       return;
197     }
198     rts_ = read_rts(&rtsfile);
199     write_rts(&rtsfile, rts_);
200     kc::Thread::sleep(0.2);
201     bool deferred = false;
202     while (true) {
203       lock_.lock();
204       std::string host = host_;
205       int32_t port = port_;
206       uint64_t wrts = wrts_;
207       lock_.unlock();
208       if (!host.empty()) {
209         if (wrts != kc::UINT64MAX) {
210           lock_.lock();
211           wrts_ = kc::UINT64MAX;
212           rts_ = wrts;
213           write_rts(&rtsfile, rts_);
214           lock_.unlock();
215         }
216         kt::ReplicationClient rc;
217         if (rc.open(host, port, 60, rts_, sid_)) {
218           serv_->log(Logger::SYSTEM, "replication started: host=%s port=%d rts=%llu",
219                      host.c_str(), port, (unsigned long long)rts_);
220           hup_ = false;
221           double rivsum = 0;
222           while (alive_ && !hup_ && rc.alive()) {
223             size_t msiz;
224             uint64_t mts;
225             char* mbuf = rc.read(&msiz, &mts);
226             if (mbuf) {
227               if (msiz > 0) {
228                 size_t rsiz;
229                 uint16_t rsid, rdbid;
230                 const char* rbuf = DBUpdateLogger::parse(mbuf, msiz, &rsiz, &rsid, &rdbid);
231                 if (rbuf && rsid != sid_ && rdbid < dbnum_) {
232                   kt::TimedDB* db = dbs_ + rdbid;
233                   DBUpdateLogger* ulogdb = ulogdbs_ ? ulogdbs_ + rdbid : NULL;
234                   if (ulogdb) ulogdb->set_rsid(rsid);
235                   if (!db->recover(rbuf, rsiz)) {
236                     const kc::BasicDB::Error& e = db->error();
237                     serv_->log(Logger::ERROR, "recovering a database failed: %s: %s",
238                                e.name(), e.message());
239                   }
240                   if (ulogdb) ulogdb->clear_rsid();
241                 }
242                 rivsum += riv_;
243               } else {
244                 rivsum += riv_ * DUMMYFREQ / 4;
245               }
246               delete[] mbuf;
247               while (rivsum > 100 && alive_ && !hup_ && rc.alive()) {
248                 kc::Thread::sleep(0.1);
249                 rivsum -= 100;
250               }
251             }
252             if (mts > rts_) rts_ = mts;
253           }
254           rc.close();
255           serv_->log(Logger::SYSTEM, "replication finished: host=%s port=%d",
256                      host.c_str(), port);
257           write_rts(&rtsfile, rts_);
258           deferred = false;
259         } else {
260           if (!deferred) serv_->log(Logger::SYSTEM, "replication was deferred: host=%s port=%d",
261                                     host.c_str(), port);
262           deferred = true;
263         }
264       }
265       if (alive_) {
266         kc::Thread::sleep(1);
267       } else {
268         break;
269       }
270     }
271     if (!rtsfile.close()) serv_->log(Logger::ERROR, "closing the RTS file failed");
272   }
273   // read the replication time stamp
read_rts(kc::File * file)274   uint64_t read_rts(kc::File* file) {
275     char buf[RTSFILESIZ];
276     file->read_fast(0, buf, RTSFILESIZ);
277     buf[sizeof(buf)-1] = '\0';
278     return kc::atoi(buf);
279   }
280   // write the replication time stamp
write_rts(kc::File * file,uint64_t rts)281   void write_rts(kc::File* file, uint64_t rts) {
282     char buf[kc::NUMBUFSIZ];
283     std::sprintf(buf, "%020llu\n", (unsigned long long)rts);
284     if (!file->write_fast(0, buf, RTSFILESIZ))
285       serv_->log(Logger::SYSTEM, "writing the time stamp failed");
286   }
287   kc::SpinLock lock_;
288   const uint16_t sid_;
289   const char* const rtspath_;
290   std::string host_;
291   int32_t port_;
292   double riv_;
293   kt::RPCServer* const serv_;
294   kt::TimedDB* const dbs_;
295   const int32_t dbnum_;
296   kt::UpdateLogger* const ulog_;
297   DBUpdateLogger* const ulogdbs_;
298   uint64_t wrts_;
299   uint64_t rts_;
300   bool alive_;
301   bool hup_;
302 };
303 
304 
305 // plug-in server driver
306 class PlugInDriver : public kc::Thread {
307  public:
308   // constructor
PlugInDriver(kt::PluggableServer * serv)309   explicit PlugInDriver(kt::PluggableServer* serv) : serv_(serv), error_(false) {}
310   // get the error flag
error()311   bool error() {
312     return error_;
313   }
314  private:
315   // perform service
run(void)316   void run(void) {
317     kc::Thread::sleep(0.4);
318     if (serv_->start()) {
319       if (!serv_->finish()) error_ = true;
320     } else {
321       error_ = true;
322     }
323   }
324   kt::PluggableServer* serv_;
325   bool error_;
326 };
327 
328 
329 // worker implementation
330 class Worker : public kt::RPCServer::Worker {
331  private:
332   class SLS;
333   typedef kt::RPCClient::ReturnValue RV;
334  public:
335   // constructor
Worker(int32_t thnum,kc::CondMap * condmap,kt::TimedDB * dbs,int32_t dbnum,const std::map<std::string,int32_t> & dbmap,int32_t omode,double asi,bool ash,const char * bgspath,double bgsi,kc::Compressor * bgscomp,kt::UpdateLogger * ulog,DBUpdateLogger * ulogdbs,const char * cmdpath,ScriptProcessor * scrprocs,OpCount * opcounts)336   explicit Worker(int32_t thnum, kc::CondMap* condmap, kt::TimedDB* dbs, int32_t dbnum,
337                   const std::map<std::string, int32_t>& dbmap, int32_t omode,
338                   double asi, bool ash, const char* bgspath, double bgsi,
339                   kc::Compressor* bgscomp, kt::UpdateLogger* ulog, DBUpdateLogger* ulogdbs,
340                   const char* cmdpath, ScriptProcessor* scrprocs, OpCount* opcounts) :
341       thnum_(thnum), condmap_(condmap), dbs_(dbs), dbnum_(dbnum), dbmap_(dbmap),
342       omode_(omode), asi_(asi), ash_(ash), bgspath_(bgspath), bgsi_(bgsi), bgscomp_(bgscomp),
343       ulog_(ulog), ulogdbs_(ulogdbs), cmdpath_(cmdpath), scrprocs_(scrprocs),
344       opcounts_(opcounts), idlecnt_(0), asnext_(0), bgsnext_(0), slave_(NULL) {
345     asnext_ = kc::time() + asi_;
346     bgsnext_ = kc::time() + bgsi_;
347   }
348   // set miscellaneous configuration
set_misc_conf(Slave * slave)349   void set_misc_conf(Slave* slave) {
350     slave_ = slave;
351   }
352  private:
353   // process each request of RPC.
process(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::string & name,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)354   RV process(kt::RPCServer* serv, kt::RPCServer::Session* sess, const std::string& name,
355              const std::map<std::string, std::string>& inmap,
356              std::map<std::string, std::string>& outmap) {
357     size_t rsiz;
358     const char* rp = kt::strmapget(inmap, "WAIT", &rsiz);
359     if (rp) {
360       std::string condname(rp, rsiz);
361       rp = kt::strmapget(inmap, "WAITTIME");
362       double wsec = rp ? kc::atof(rp) : 0.0;
363       if (wsec <= 0) wsec = DEFTOUT;
364       kt::ThreadedServer* thserv = serv->reveal_core()->reveal_core();
365       if (!condmap_->wait(condname, wsec) || thserv->aborted()) {
366         set_message(outmap, "ERROR", "the condition timed out");
367         return kt::RPCClient::RVETIMEOUT;
368       }
369     }
370     int32_t dbidx = 0;
371     rp = kt::strmapget(inmap, "DB");
372     if (rp && *rp != '\0') {
373       dbidx = -1;
374       if (*rp >= '0' && *rp <= '9') {
375         dbidx = kc::atoi(rp);
376       } else {
377         std::map<std::string, int32_t>::const_iterator it = dbmap_.find(rp);
378         if (it != dbmap_.end()) dbidx = it->second;
379       }
380     }
381     kt::TimedDB* db = dbidx >= 0 && dbidx < dbnum_ ? dbs_ + dbidx : NULL;
382     int64_t curid = -1;
383     rp = kt::strmapget(inmap, "CUR");
384     if (rp && *rp >= '0' && *rp <= '9') curid = kc::atoi(rp);
385     kt::TimedDB::Cursor* cur = NULL;
386     if (curid >= 0) {
387       SLS* sls = SLS::create(sess);
388       std::map<int64_t, kt::TimedDB::Cursor*>::iterator it = sls->curs_.find(curid);
389       if (it == sls->curs_.end()) {
390         if (db) {
391           cur = db->cursor();
392           sls->curs_[curid] = cur;
393         }
394       } else {
395         cur = it->second;
396         if (name == "cur_delete") {
397           sls->curs_.erase(curid);
398           delete cur;
399           return kt::RPCClient::RVSUCCESS;
400         }
401       }
402     }
403     RV rv;
404     if (name == "void") {
405       rv = do_void(serv, sess, inmap, outmap);
406     } else if (name == "echo") {
407       rv = do_echo(serv, sess, inmap, outmap);
408     } else if (name == "report") {
409       rv = do_report(serv, sess, inmap, outmap);
410     } else if (name == "play_script") {
411       rv = do_play_script(serv, sess, inmap, outmap);
412     } else if (name == "tune_replication") {
413       rv = do_tune_replication(serv, sess, inmap, outmap);
414     } else if (name == "ulog_list") {
415       rv = do_ulog_list(serv, sess, inmap, outmap);
416     } else if (name == "ulog_remove") {
417       rv = do_ulog_remove(serv, sess, inmap, outmap);
418     } else if (name == "status") {
419       rv = do_status(serv, sess, db, inmap, outmap);
420     } else if (name == "clear") {
421       rv = do_clear(serv, sess, db, inmap, outmap);
422     } else if (name == "synchronize") {
423       rv = do_synchronize(serv, sess, db, inmap, outmap);
424     } else if (name == "set") {
425       rv = do_set(serv, sess, db, inmap, outmap);
426     } else if (name == "add") {
427       rv = do_add(serv, sess, db, inmap, outmap);
428     } else if (name == "replace") {
429       rv = do_replace(serv, sess, db, inmap, outmap);
430     } else if (name == "append") {
431       rv = do_append(serv, sess, db, inmap, outmap);
432     } else if (name == "increment") {
433       rv = do_increment(serv, sess, db, inmap, outmap);
434     } else if (name == "increment_double") {
435       rv = do_increment_double(serv, sess, db, inmap, outmap);
436     } else if (name == "cas") {
437       rv = do_cas(serv, sess, db, inmap, outmap);
438     } else if (name == "remove") {
439       rv = do_remove(serv, sess, db, inmap, outmap);
440     } else if (name == "get") {
441       rv = do_get(serv, sess, db, inmap, outmap);
442     } else if (name == "check") {
443       rv = do_check(serv, sess, db, inmap, outmap);
444     } else if (name == "seize") {
445       rv = do_seize(serv, sess, db, inmap, outmap);
446     } else if (name == "set_bulk") {
447       rv = do_set_bulk(serv, sess, db, inmap, outmap);
448     } else if (name == "remove_bulk") {
449       rv = do_remove_bulk(serv, sess, db, inmap, outmap);
450     } else if (name == "get_bulk") {
451       rv = do_get_bulk(serv, sess, db, inmap, outmap);
452     } else if (name == "vacuum") {
453       rv = do_vacuum(serv, sess, db, inmap, outmap);
454     } else if (name == "match_prefix") {
455       rv = do_match_prefix(serv, sess, db, inmap, outmap);
456     } else if (name == "match_regex") {
457       rv = do_match_regex(serv, sess, db, inmap, outmap);
458     } else if (name == "match_similar") {
459       rv = do_match_similar(serv, sess, db, inmap, outmap);
460     } else if (name == "cur_jump") {
461       rv = do_cur_jump(serv, sess, cur, inmap, outmap);
462     } else if (name == "cur_jump_back") {
463       rv = do_cur_jump_back(serv, sess, cur, inmap, outmap);
464     } else if (name == "cur_step") {
465       rv = do_cur_step(serv, sess, cur, inmap, outmap);
466     } else if (name == "cur_step_back") {
467       rv = do_cur_step_back(serv, sess, cur, inmap, outmap);
468     } else if (name == "cur_set_value") {
469       rv = do_cur_set_value(serv, sess, cur, inmap, outmap);
470     } else if (name == "cur_remove") {
471       rv = do_cur_remove(serv, sess, cur, inmap, outmap);
472     } else if (name == "cur_get_key") {
473       rv = do_cur_get_key(serv, sess, cur, inmap, outmap);
474     } else if (name == "cur_get_value") {
475       rv = do_cur_get_value(serv, sess, cur, inmap, outmap);
476     } else if (name == "cur_get") {
477       rv = do_cur_get(serv, sess, cur, inmap, outmap);
478     } else if (name == "cur_seize") {
479       rv = do_cur_seize(serv, sess, cur, inmap, outmap);
480     } else {
481       set_message(outmap, "ERROR", "not implemented: %s", name.c_str());
482       rv = kt::RPCClient::RVENOIMPL;
483     }
484     rp = kt::strmapget(inmap, "SIGNAL", &rsiz);
485     if (rp) {
486       std::string condname(rp, rsiz);
487       rp = kt::strmapget(inmap, "SIGNALBROAD");
488       bool broad = rp ? true : false;
489       size_t wnum = broad ? condmap_->broadcast(condname) : condmap_->signal(condname);
490       set_message(outmap, "SIGNALED", "%lld", (long long)wnum);
491     }
492     return rv;
493   }
494   // process each request of the others.
process(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,const std::string & path,kt::HTTPClient::Method method,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)495   int32_t process(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
496                   const std::string& path, kt::HTTPClient::Method method,
497                   const std::map<std::string, std::string>& reqheads,
498                   const std::string& reqbody,
499                   std::map<std::string, std::string>& resheads,
500                   std::string& resbody,
501                   const std::map<std::string, std::string>& misc) {
502     const char* pstr = path.c_str();
503     if (*pstr == '/') pstr++;
504     int32_t dbidx = 0;
505     const char* rp = std::strchr(pstr, '/');
506     if (rp) {
507       std::string dbexpr(pstr, rp - pstr);
508       pstr = rp + 1;
509       if (*pstr == '/') pstr++;
510       size_t desiz;
511       char* destr = kc::urldecode(dbexpr.c_str(), &desiz);
512       if (*destr != '\0') {
513         dbidx = -1;
514         if (*destr >= '0' && *destr <= '9') {
515           dbidx = kc::atoi(destr);
516         } else {
517           std::map<std::string, int32_t>::const_iterator it = dbmap_.find(destr);
518           if (it != dbmap_.end()) dbidx = it->second;
519         }
520       }
521       delete[] destr;
522     }
523     if (dbidx < 0 || dbidx >= dbnum_) {
524       resbody.append("no such database\n");
525       return 400;
526     }
527     kt::TimedDB* db = dbs_ + dbidx;
528     size_t ksiz;
529     char* kbuf = kc::urldecode(pstr, &ksiz);
530     int32_t code;
531     switch (method) {
532       case kt::HTTPClient::MGET: {
533         code = do_rest_get(serv, sess, db, kbuf, ksiz,
534                            reqheads, reqbody, resheads, resbody, misc);
535         break;
536       }
537       case kt::HTTPClient::MHEAD: {
538         code = do_rest_head(serv, sess, db, kbuf, ksiz,
539                             reqheads, reqbody, resheads, resbody, misc);
540         break;
541       }
542       case kt::HTTPClient::MPUT: {
543         code = do_rest_put(serv, sess, db, kbuf, ksiz,
544                            reqheads, reqbody, resheads, resbody, misc);
545         break;
546       }
547       case kt::HTTPClient::MDELETE: {
548         code = do_rest_delete(serv, sess, db, kbuf, ksiz,
549                               reqheads, reqbody, resheads, resbody, misc);
550         break;
551       }
552       default: {
553         code = 501;
554         break;
555       }
556     }
557     delete[] kbuf;
558     return code;
559   }
560   // process each binary request
process_binary(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)561   bool process_binary(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
562     int32_t magic = sess->receive_byte();
563     const char* cmd;
564     bool rv;
565     switch (magic) {
566       case kt::RemoteDB::BMREPLICATION: {
567         cmd = "bin_replication";
568         rv = do_bin_replication(serv, sess);
569         break;
570       }
571       case kt::RemoteDB::BMPLAYSCRIPT: {
572         cmd = "bin_play_script";
573         rv = do_bin_play_script(serv, sess);
574         break;
575       }
576       case kt::RemoteDB::BMSETBULK: {
577         cmd = "bin_set_bulk";
578         rv = do_bin_set_bulk(serv, sess);
579         break;
580       }
581       case kt::RemoteDB::BMREMOVEBULK: {
582         cmd = "bin_remove_bulk";
583         rv = do_bin_remove_bulk(serv, sess);
584         break;
585       }
586       case kt::RemoteDB::BMGETBULK: {
587         cmd = "bin_get_bulk";
588         rv = do_bin_get_bulk(serv, sess);
589         break;
590       }
591       default: {
592         cmd = "bin_unknown";
593         rv = false;
594         break;
595       }
596     }
597     std::string expr = sess->expression();
598     serv->log(kt::ThreadedServer::Logger::INFO, "(%s): %s: %d", expr.c_str(), cmd, rv);
599     return rv;
600   }
601   // process each idle event
process_idle(kt::RPCServer * serv)602   void process_idle(kt::RPCServer* serv) {
603     if (omode_ & kc::BasicDB::OWRITER) {
604       int32_t dbidx = idlecnt_++ % dbnum_;
605       kt::TimedDB* db = dbs_ + dbidx;
606       kt::ThreadedServer* thserv = serv->reveal_core()->reveal_core();
607       for (int32_t i = 0; i < 4; i++) {
608         if (thserv->task_count() > 0) break;
609         if (!db->vacuum(2)) {
610           const kc::BasicDB::Error& e = db->error();
611           log_db_error(serv, e);
612           break;
613         }
614         kc::Thread::yield();
615       }
616     }
617   }
618   // process each timer event
process_timer(kt::RPCServer * serv)619   void process_timer(kt::RPCServer* serv) {
620     if (asi_ > 0 && (omode_ & kc::BasicDB::OWRITER) && kc::time() >= asnext_) {
621       serv->log(Logger::INFO, "synchronizing databases");
622       for (int32_t i = 0; i < dbnum_; i++) {
623         kt::TimedDB* db = dbs_ + i;
624         if (!db->synchronize(ash_)) {
625           const kc::BasicDB::Error& e = db->error();
626           log_db_error(serv, e);
627           break;
628         }
629         kc::Thread::yield();
630       }
631       asnext_ = kc::time() + asi_;
632     }
633     if (bgspath_ && bgsi_ > 0 && kc::time() >= bgsnext_) {
634       serv->log(Logger::INFO, "snapshotting databases");
635       dosnapshot(bgspath_, bgscomp_, dbs_, dbnum_, serv);
636       bgsnext_ = kc::time() + bgsi_;
637     }
638   }
639   // process the starting event
process_start(kt::RPCServer * serv)640   void process_start(kt::RPCServer* serv) {
641     kt::maskthreadsignal();
642   }
643   // set the error message
set_message(std::map<std::string,std::string> & outmap,const char * key,const char * format,...)644   void set_message(std::map<std::string, std::string>& outmap, const char* key,
645                    const char* format, ...) {
646     std::string message;
647     va_list ap;
648     va_start(ap, format);
649     kc::vstrprintf(&message, format, ap);
650     va_end(ap);
651     outmap[key] = message;
652   }
653   // set the database error message
set_db_error(std::map<std::string,std::string> & outmap,const kc::BasicDB::Error & e)654   void set_db_error(std::map<std::string, std::string>& outmap, const kc::BasicDB::Error& e) {
655     set_message(outmap, "ERROR", "DB: %d: %s: %s", e.code(), e.name(), e.message());
656   }
657   // log the database error message
log_db_error(kt::RPCServer * serv,const kc::BasicDB::Error & e)658   void log_db_error(kt::RPCServer* serv, const kc::BasicDB::Error& e) {
659     log_db_error(serv->reveal_core(), e);
660   }
661   // log the database error message
log_db_error(kt::HTTPServer * serv,const kc::BasicDB::Error & e)662   void log_db_error(kt::HTTPServer* serv, const kc::BasicDB::Error& e) {
663     serv->log(Logger::ERROR, "database error: %d: %s: %s", e.code(), e.name(), e.message());
664   }
665   // process the void procedure
do_void(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)666   RV do_void(kt::RPCServer* serv, kt::RPCServer::Session* sess,
667              const std::map<std::string, std::string>& inmap,
668              std::map<std::string, std::string>& outmap) {
669     return kt::RPCClient::RVSUCCESS;
670   }
671   // process the echo procedure
do_echo(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)672   RV do_echo(kt::RPCServer* serv, kt::RPCServer::Session* sess,
673              const std::map<std::string, std::string>& inmap,
674              std::map<std::string, std::string>& outmap) {
675     outmap.insert(inmap.begin(), inmap.end());
676     return kt::RPCClient::RVSUCCESS;
677   }
678   // process the report procedure
do_report(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)679   RV do_report(kt::RPCServer* serv, kt::RPCServer::Session* sess,
680                const std::map<std::string, std::string>& inmap,
681                std::map<std::string, std::string>& outmap) {
682     int64_t totalcount = 0;
683     int64_t totalsize = 0;
684     for (int32_t i = 0; i < dbnum_; i++) {
685       int64_t count = dbs_[i].count();
686       int64_t size = dbs_[i].size();
687       std::string key;
688       kc::strprintf(&key, "db_%d", i);
689       set_message(outmap, key.c_str(), "count=%lld size=%lld path=%s",
690                   (long long)count, (long long)size, dbs_[i].path().c_str());
691       totalcount += count;
692       totalsize += size;
693     }
694     set_message(outmap, "db_total_count", "%lld", (long long)totalcount);
695     set_message(outmap, "db_total_size", "%lld", (long long)totalsize);
696     kt::ThreadedServer* thserv = serv->reveal_core()->reveal_core();
697     set_message(outmap, "serv_conn_count", "%lld", (long long)thserv->connection_count());
698     set_message(outmap, "serv_task_count", "%lld", (long long)thserv->task_count());
699     set_message(outmap, "serv_thread_count", "%lld", (long long)thnum_);
700     double ctime = kc::time();
701     set_message(outmap, "serv_current_time", "%.6f", ctime);
702     set_message(outmap, "serv_running_term", "%.6f", ctime - g_starttime);
703     set_message(outmap, "serv_proc_id", "%d", g_procid);
704     std::map<std::string, std::string> sysinfo;
705     kc::getsysinfo(&sysinfo);
706     std::map<std::string, std::string>::iterator it = sysinfo.begin();
707     std::map<std::string, std::string>::iterator itend = sysinfo.end();
708     while (it != itend) {
709       std::string key;
710       kc::strprintf(&key, "sys_%s", it->first.c_str());
711       set_message(outmap, key.c_str(), it->second.c_str());
712       ++it;
713     }
714     const std::string& mhost = slave_->host();
715     if (!mhost.empty()) {
716       set_message(outmap, "repl_master_host", "%s", mhost.c_str());
717       set_message(outmap, "repl_master_port", "%d", slave_->port());
718       uint64_t rts = slave_->rts();
719       set_message(outmap, "repl_timestamp", "%llu", (unsigned long long)rts);
720       set_message(outmap, "repl_interval", "%.6f", slave_->riv());
721       uint64_t cc = kt::UpdateLogger::clock_pure();
722       uint64_t delay = cc > rts ? cc - rts : 0;
723       set_message(outmap, "repl_delay", "%.6f", delay / 1000000000.0);
724     }
725     OpCount ocsum;
726     for (int32_t i = 0; i <= CNTMISC; i++) {
727       ocsum[i] = 0;
728     }
729     for (int32_t i = 0; i < thnum_; i++) {
730       for (int32_t j = 0; j <= CNTMISC; j++) {
731         ocsum[j] += opcounts_[i][j];
732       }
733     }
734     set_message(outmap, "cnt_set", "%llu", (unsigned long long)ocsum[CNTSET]);
735     set_message(outmap, "cnt_set_misses", "%llu", (unsigned long long)ocsum[CNTSETMISS]);
736     set_message(outmap, "cnt_remove", "%llu", (unsigned long long)ocsum[CNTREMOVE]);
737     set_message(outmap, "cnt_remove_misses", "%llu", (unsigned long long)ocsum[CNTREMOVEMISS]);
738     set_message(outmap, "cnt_get", "%llu", (unsigned long long)ocsum[CNTGET]);
739     set_message(outmap, "cnt_get_misses", "%llu", (unsigned long long)ocsum[CNTGETMISS]);
740     set_message(outmap, "cnt_script", "%llu", (unsigned long long)ocsum[CNTSCRIPT]);
741     set_message(outmap, "cnt_misc", "%llu", (unsigned long long)ocsum[CNTMISC]);
742     set_message(outmap, "conf_kt_version", "%s (%d.%d)", kt::VERSION, kt::LIBVER, kt::LIBREV);
743     set_message(outmap, "conf_kt_features", "%s", kt::FEATURES);
744     set_message(outmap, "conf_kc_version", "%s (%d.%d)", kc::VERSION, kc::LIBVER, kc::LIBREV);
745     set_message(outmap, "conf_kc_features", "%s", kc::FEATURES);
746     set_message(outmap, "conf_os_name", "%s", kc::OSNAME);
747     return kt::RPCClient::RVSUCCESS;
748   }
749   // process the play_script procedure
do_play_script(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)750   RV do_play_script(kt::RPCServer* serv, kt::RPCServer::Session* sess,
751                     const std::map<std::string, std::string>& inmap,
752                     std::map<std::string, std::string>& outmap) {
753     uint32_t thid = sess->thread_id();
754     if (!scrprocs_) {
755       set_message(outmap, "ERROR", "the scripting extention is disabled");
756       return kt::RPCClient::RVENOIMPL;
757     }
758     ScriptProcessor* scrproc = scrprocs_ + thid;
759     const char* nstr = kt::strmapget(inmap, "name");
760     if (!nstr || *nstr == '\0' || !kt::strisalnum(nstr)) {
761       set_message(outmap, "ERROR", "invalid parameters");
762       return kt::RPCClient::RVEINVALID;
763     }
764     std::map<std::string, std::string> scrinmap;
765     std::map<std::string, std::string>::const_iterator it = inmap.begin();
766     std::map<std::string, std::string>::const_iterator itend = inmap.end();
767     while (it != itend) {
768       const char* kbuf = it->first.data();
769       size_t ksiz = it->first.size();
770       if (ksiz > 0 && *kbuf == '_') {
771         std::string key(kbuf + 1, ksiz - 1);
772         scrinmap[key] = it->second;
773       }
774       ++it;
775     }
776     opcounts_[thid][CNTSCRIPT]++;
777     std::map<std::string, std::string> scroutmap;
778     RV rv = scrproc->call(nstr, scrinmap, scroutmap);
779     if (rv == kt::RPCClient::RVSUCCESS) {
780       it = scroutmap.begin();
781       itend = scroutmap.end();
782       while (it != itend) {
783         std::string key = "_";
784         key.append(it->first);
785         outmap[key] = it->second;
786         ++it;
787       }
788     } else if (rv == kt::RPCClient::RVENOIMPL) {
789       set_message(outmap, "ERROR", "no such scripting procedure");
790     } else {
791       set_message(outmap, "ERROR", "the scripting procedure failed");
792     }
793     return rv;
794   }
795   // process the tune_replication procedure
do_tune_replication(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)796   RV do_tune_replication(kt::RPCServer* serv, kt::RPCServer::Session* sess,
797                          const std::map<std::string, std::string>& inmap,
798                          std::map<std::string, std::string>& outmap) {
799     if (!slave_->rtspath_) {
800       set_message(outmap, "ERROR", "the RTS file is not set");
801       return kt::RPCClient::RVENOIMPL;
802     }
803     const char* host = kt::strmapget(inmap, "host");
804     if (!host) host = "";
805     const char* rp = kt::strmapget(inmap, "port");
806     int32_t port = rp ? kc::atoi(rp) : 0;
807     if (port < 1) port = kt::DEFPORT;
808     rp = kt::strmapget(inmap, "ts");
809     uint64_t ts = kc::UINT64MAX;
810     if (rp) {
811       if (!std::strcmp(rp, "now")) {
812         ts = kt::UpdateLogger::clock_pure();
813       } else {
814         ts = kc::atoi(rp);
815       }
816     }
817     rp = kt::strmapget(inmap, "iv");
818     double iv = rp ? kc::atof(rp) : -1;
819     char tsstr[kc::NUMBUFSIZ];
820     if (ts == kc::UINT64MAX) {
821       std::sprintf(tsstr, "*");
822     } else {
823       std::sprintf(tsstr, "%llu", (unsigned long long)ts);
824     }
825     char ivstr[kc::NUMBUFSIZ];
826     if (iv < 0) {
827       std::sprintf(ivstr, "*");
828     } else {
829       std::sprintf(ivstr, "%.6f", iv);
830     }
831     serv->log(Logger::SYSTEM, "replication setting was modified: host=%s port=%d ts=%s iv=%s",
832               host, port, tsstr, ivstr);
833     slave_->set_master(host, port, ts, iv);
834     slave_->restart();
835     return kt::RPCClient::RVSUCCESS;
836   }
837   // process the ulog_list procedure
do_ulog_list(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)838   RV do_ulog_list(kt::RPCServer* serv, kt::RPCServer::Session* sess,
839                   const std::map<std::string, std::string>& inmap,
840                   std::map<std::string, std::string>& outmap) {
841     if (!ulog_) {
842       set_message(outmap, "ERROR", "no update log allows no replication");
843       return kt::RPCClient::RVEINVALID;
844     }
845     std::vector<kt::UpdateLogger::FileStatus> files;
846     ulog_->list_files(&files);
847     std::vector<kt::UpdateLogger::FileStatus>::iterator it = files.begin();
848     std::vector<kt::UpdateLogger::FileStatus>::iterator itend = files.end();
849     while (it != itend) {
850       set_message(outmap, it->path.c_str(), "%llu:%llu",
851                   (unsigned long long)it->size, (unsigned long long)it->ts);
852       ++it;
853     }
854     return kt::RPCClient::RVSUCCESS;
855   }
856   // process the ulog_remove procedure
do_ulog_remove(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)857   RV do_ulog_remove(kt::RPCServer* serv, kt::RPCServer::Session* sess,
858                     const std::map<std::string, std::string>& inmap,
859                     std::map<std::string, std::string>& outmap) {
860     if (!ulog_) {
861       set_message(outmap, "ERROR", "no update log allows no replication");
862       return kt::RPCClient::RVEINVALID;
863     }
864     const char* rp = kt::strmapget(inmap, "ts");
865     uint64_t ts = kc::UINT64MAX;
866     if (rp) {
867       if (!std::strcmp(rp, "now")) {
868         ts = kt::UpdateLogger::clock_pure();
869       } else {
870         ts = kc::atoi(rp);
871       }
872     }
873     bool err = false;
874     std::vector<kt::UpdateLogger::FileStatus> files;
875     ulog_->list_files(&files);
876     std::vector<kt::UpdateLogger::FileStatus>::iterator it = files.begin();
877     std::vector<kt::UpdateLogger::FileStatus>::iterator itend = files.end();
878     if (it != itend) itend--;
879     while (it != itend) {
880       if (it->ts <= ts && !kc::File::remove(it->path)) {
881         set_message(outmap, "ERROR", "removing a file failed: %s", it->path.c_str());
882         serv->log(Logger::ERROR, "removing a file failed: %s", it->path.c_str());
883         err = true;
884       }
885       ++it;
886     }
887     return err ? kt::RPCClient::RVEINTERNAL : kt::RPCClient::RVSUCCESS;
888   }
889   // process the status procedure
do_status(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)890   RV do_status(kt::RPCServer* serv, kt::RPCServer::Session* sess,
891                kt::TimedDB* db,
892                const std::map<std::string, std::string>& inmap,
893                std::map<std::string, std::string>& outmap) {
894     uint32_t thid = sess->thread_id();
895     if (!db) {
896       set_message(outmap, "ERROR", "no such database");
897       return kt::RPCClient::RVEINVALID;
898     }
899     RV rv;
900     opcounts_[thid][CNTMISC]++;
901     std::map<std::string, std::string> status;
902     if (db->status(&status)) {
903       rv = kt::RPCClient::RVSUCCESS;
904       outmap.insert(status.begin(), status.end());
905     } else {
906       const kc::BasicDB::Error& e = db->error();
907       set_db_error(outmap, e);
908       log_db_error(serv, e);
909       rv = kt::RPCClient::RVEINTERNAL;
910     }
911     return rv;
912   }
913   // process the clear procedure
do_clear(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)914   RV do_clear(kt::RPCServer* serv, kt::RPCServer::Session* sess,
915               kt::TimedDB* db,
916               const std::map<std::string, std::string>& inmap,
917               std::map<std::string, std::string>& outmap) {
918     uint32_t thid = sess->thread_id();
919     if (!db) {
920       set_message(outmap, "ERROR", "no such database");
921       return kt::RPCClient::RVEINVALID;
922     }
923     RV rv;
924     opcounts_[thid][CNTMISC]++;
925     if (db->clear()) {
926       rv = kt::RPCClient::RVSUCCESS;
927     } else {
928       const kc::BasicDB::Error& e = db->error();
929       set_db_error(outmap, e);
930       log_db_error(serv, e);
931       rv = kt::RPCClient::RVEINTERNAL;
932     }
933     return rv;
934   }
935   // process the synchronize procedure
do_synchronize(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)936   RV do_synchronize(kt::RPCServer* serv, kt::RPCServer::Session* sess,
937                     kt::TimedDB* db,
938                     const std::map<std::string, std::string>& inmap,
939                     std::map<std::string, std::string>& outmap) {
940     uint32_t thid = sess->thread_id();
941     if (!db) {
942       set_message(outmap, "ERROR", "no such database");
943       return kt::RPCClient::RVEINVALID;
944     }
945     const char* rp = kt::strmapget(inmap, "hard");
946     bool hard = rp ? true : false;
947     rp = kt::strmapget(inmap, "command");
948     std::string command = rp ? rp : "";
949     class Visitor : public kc::BasicDB::FileProcessor {
950      public:
951       Visitor(kt::RPCServer* serv, Worker* worker, const std::string& command) :
952           serv_(serv), worker_(worker), command_(command) {}
953      private:
954       bool process(const std::string& path, int64_t count, int64_t size) {
955         if (command_.size() < 1) return true;
956         const char* cmd = command_.c_str();
957         if (std::strchr(cmd, kc::File::PATHCHR) || !std::strcmp(cmd, kc::File::CDIRSTR) ||
958             !std::strcmp(cmd, kc::File::PDIRSTR)) {
959           serv_->log(Logger::INFO, "invalid command name: %s", cmd);
960           return false;
961         }
962         std::string cmdpath;
963         kc::strprintf(&cmdpath, "%s%c%s", worker_->cmdpath_, kc::File::PATHCHR, cmd);
964         std::vector<std::string> args;
965         args.push_back(cmdpath);
966         args.push_back(path);
967         std::string tsstr;
968         uint64_t cc = worker_->ulog_ ? worker_->ulog_->clock() : kt::UpdateLogger::clock_pure();
969         if (!worker_->slave_->host().empty()) {
970           uint64_t rts = worker_->slave_->rts();
971           if (rts < cc) cc = rts;
972         }
973         kc::strprintf(&tsstr, "%020llu", (unsigned long long)cc);
974         args.push_back(tsstr);
975         serv_->log(Logger::SYSTEM, "executing: %s \"%s\"", cmd, path.c_str());
976         if (kt::executecommand(args) != 0) {
977           serv_->log(Logger::ERROR, "execution failed: %s \"%s\"", cmd, path.c_str());
978           return false;
979         }
980         return true;
981       }
982       kt::RPCServer* serv_;
983       Worker* worker_;
984       std::string command_;
985     };
986     Visitor visitor(serv, this, command);
987     RV rv;
988     opcounts_[thid][CNTMISC]++;
989     if (db->synchronize(hard, &visitor)) {
990       rv = kt::RPCClient::RVSUCCESS;
991     } else {
992       const kc::BasicDB::Error& e = db->error();
993       set_db_error(outmap, e);
994       log_db_error(serv, e);
995       rv = kt::RPCClient::RVEINTERNAL;
996     }
997     return rv;
998   }
999   // process the set procedure
do_set(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1000   RV do_set(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1001             kt::TimedDB* db,
1002             const std::map<std::string, std::string>& inmap,
1003             std::map<std::string, std::string>& outmap) {
1004     uint32_t thid = sess->thread_id();
1005     if (!db) {
1006       set_message(outmap, "ERROR", "no such database");
1007       return kt::RPCClient::RVEINVALID;
1008     }
1009     size_t ksiz;
1010     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1011     size_t vsiz;
1012     const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1013     if (!kbuf || !vbuf) {
1014       set_message(outmap, "ERROR", "invalid parameters");
1015       return kt::RPCClient::RVEINVALID;
1016     }
1017     const char* rp = kt::strmapget(inmap, "xt");
1018     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1019     RV rv;
1020     opcounts_[thid][CNTSET]++;
1021     if (db->set(kbuf, ksiz, vbuf, vsiz, xt)) {
1022       rv = kt::RPCClient::RVSUCCESS;
1023     } else {
1024       opcounts_[thid][CNTSETMISS]++;
1025       const kc::BasicDB::Error& e = db->error();
1026       set_db_error(outmap, e);
1027       log_db_error(serv, e);
1028       rv = kt::RPCClient::RVEINTERNAL;
1029     }
1030     return rv;
1031   }
1032   // process the add procedure
do_add(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1033   RV do_add(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1034             kt::TimedDB* db,
1035             const std::map<std::string, std::string>& inmap,
1036             std::map<std::string, std::string>& outmap) {
1037     uint32_t thid = sess->thread_id();
1038     if (!db) {
1039       set_message(outmap, "ERROR", "no such database");
1040       return kt::RPCClient::RVEINVALID;
1041     }
1042     size_t ksiz;
1043     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1044     size_t vsiz;
1045     const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1046     if (!kbuf || !vbuf) {
1047       set_message(outmap, "ERROR", "invalid parameters");
1048       return kt::RPCClient::RVEINVALID;
1049     }
1050     const char* rp = kt::strmapget(inmap, "xt");
1051     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1052     RV rv;
1053     opcounts_[thid][CNTSET]++;
1054     if (db->add(kbuf, ksiz, vbuf, vsiz, xt)) {
1055       rv = kt::RPCClient::RVSUCCESS;
1056     } else {
1057       opcounts_[thid][CNTSETMISS]++;
1058       const kc::BasicDB::Error& e = db->error();
1059       set_db_error(outmap, e);
1060       if (e == kc::BasicDB::Error::DUPREC) {
1061         rv = kt::RPCClient::RVELOGIC;
1062       } else {
1063         log_db_error(serv, e);
1064         rv = kt::RPCClient::RVEINTERNAL;
1065       }
1066     }
1067     return rv;
1068   }
1069   // process the replace procedure
do_replace(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1070   RV do_replace(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1071                 kt::TimedDB* db,
1072                 const std::map<std::string, std::string>& inmap,
1073                 std::map<std::string, std::string>& outmap) {
1074     uint32_t thid = sess->thread_id();
1075     if (!db) {
1076       set_message(outmap, "ERROR", "no such database");
1077       return kt::RPCClient::RVEINVALID;
1078     }
1079     size_t ksiz;
1080     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1081     size_t vsiz;
1082     const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1083     if (!kbuf || !vbuf) {
1084       set_message(outmap, "ERROR", "invalid parameters");
1085       return kt::RPCClient::RVEINVALID;
1086     }
1087     const char* rp = kt::strmapget(inmap, "xt");
1088     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1089     RV rv;
1090     opcounts_[thid][CNTSET]++;
1091     if (db->replace(kbuf, ksiz, vbuf, vsiz, xt)) {
1092       rv = kt::RPCClient::RVSUCCESS;
1093     } else {
1094       opcounts_[thid][CNTSETMISS]++;
1095       const kc::BasicDB::Error& e = db->error();
1096       set_db_error(outmap, e);
1097       if (e == kc::BasicDB::Error::NOREC) {
1098         rv = kt::RPCClient::RVELOGIC;
1099       } else {
1100         log_db_error(serv, e);
1101         rv = kt::RPCClient::RVEINTERNAL;
1102       }
1103     }
1104     return rv;
1105   }
1106   // process the append procedure
do_append(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1107   RV do_append(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1108                kt::TimedDB* db,
1109                const std::map<std::string, std::string>& inmap,
1110                std::map<std::string, std::string>& outmap) {
1111     uint32_t thid = sess->thread_id();
1112     if (!db) {
1113       set_message(outmap, "ERROR", "no such database");
1114       return kt::RPCClient::RVEINVALID;
1115     }
1116     size_t ksiz;
1117     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1118     size_t vsiz;
1119     const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1120     if (!kbuf || !vbuf) {
1121       set_message(outmap, "ERROR", "invalid parameters");
1122       return kt::RPCClient::RVEINVALID;
1123     }
1124     const char* rp = kt::strmapget(inmap, "xt");
1125     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1126     RV rv;
1127     opcounts_[thid][CNTSET]++;
1128     if (db->append(kbuf, ksiz, vbuf, vsiz, xt)) {
1129       rv = kt::RPCClient::RVSUCCESS;
1130     } else {
1131       opcounts_[thid][CNTSETMISS]++;
1132       const kc::BasicDB::Error& e = db->error();
1133       set_db_error(outmap, e);
1134       log_db_error(serv, e);
1135       rv = kt::RPCClient::RVEINTERNAL;
1136     }
1137     return rv;
1138   }
1139   // process the increment procedure
do_increment(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1140   RV do_increment(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1141                   kt::TimedDB* db,
1142                   const std::map<std::string, std::string>& inmap,
1143                   std::map<std::string, std::string>& outmap) {
1144     uint32_t thid = sess->thread_id();
1145     if (!db) {
1146       set_message(outmap, "ERROR", "no such database");
1147       return kt::RPCClient::RVEINVALID;
1148     }
1149     size_t ksiz;
1150     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1151     const char* nstr = kt::strmapget(inmap, "num");
1152     if (!kbuf || !nstr) {
1153       set_message(outmap, "ERROR", "invalid parameters");
1154       return kt::RPCClient::RVEINVALID;
1155     }
1156     int64_t num = kc::atoi(nstr);
1157     const char* rp = kt::strmapget(inmap, "orig");
1158     int64_t orig;
1159     if (rp) {
1160       if (!std::strcmp(rp, "try")) {
1161         orig = kc::INT64MIN;
1162       } else if (!std::strcmp(rp, "set")) {
1163         orig = kc::INT64MAX;
1164       } else {
1165         orig = kc::atoi(rp);
1166       }
1167     } else {
1168       orig = 0;
1169     }
1170     rp = kt::strmapget(inmap, "xt");
1171     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1172     RV rv;
1173     opcounts_[thid][CNTSET]++;
1174     num = db->increment(kbuf, ksiz, num, orig, xt);
1175     if (num != kc::INT64MIN) {
1176       rv = kt::RPCClient::RVSUCCESS;
1177       set_message(outmap, "num", "%lld", (long long)num);
1178     } else {
1179       opcounts_[thid][CNTSETMISS]++;
1180       const kc::BasicDB::Error& e = db->error();
1181       set_db_error(outmap, e);
1182       if (e == kc::BasicDB::Error::LOGIC) {
1183         rv = kt::RPCClient::RVELOGIC;
1184       } else {
1185         log_db_error(serv, e);
1186         rv = kt::RPCClient::RVEINTERNAL;
1187       }
1188     }
1189     return rv;
1190   }
1191   // process the increment_double procedure
do_increment_double(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1192   RV do_increment_double(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1193                          kt::TimedDB* db,
1194                          const std::map<std::string, std::string>& inmap,
1195                          std::map<std::string, std::string>& outmap) {
1196     uint32_t thid = sess->thread_id();
1197     if (!db) {
1198       set_message(outmap, "ERROR", "no such database");
1199       return kt::RPCClient::RVEINVALID;
1200     }
1201     size_t ksiz;
1202     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1203     const char* nstr = kt::strmapget(inmap, "num");
1204     if (!kbuf || !nstr) {
1205       set_message(outmap, "ERROR", "invalid parameters");
1206       return kt::RPCClient::RVEINVALID;
1207     }
1208     double num = kc::atof(nstr);
1209     const char* rp = kt::strmapget(inmap, "orig");
1210     double orig;
1211     if (rp) {
1212       if (!std::strcmp(rp, "try")) {
1213         orig = -kc::inf();
1214       } else if (!std::strcmp(rp, "set")) {
1215         orig = kc::inf();
1216       } else {
1217         orig = kc::atof(rp);
1218       }
1219     } else {
1220       orig = 0;
1221     }
1222     rp = kt::strmapget(inmap, "xt");
1223     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1224     RV rv;
1225     opcounts_[thid][CNTSET]++;
1226     num = db->increment_double(kbuf, ksiz, num, orig, xt);
1227     if (!kc::chknan(num)) {
1228       rv = kt::RPCClient::RVSUCCESS;
1229       set_message(outmap, "num", "%f", num);
1230     } else {
1231       opcounts_[thid][CNTSETMISS]++;
1232       const kc::BasicDB::Error& e = db->error();
1233       set_db_error(outmap, e);
1234       if (e == kc::BasicDB::Error::LOGIC) {
1235         rv = kt::RPCClient::RVELOGIC;
1236       } else {
1237         log_db_error(serv, e);
1238         rv = kt::RPCClient::RVEINTERNAL;
1239       }
1240     }
1241     return rv;
1242   }
1243   // process the cas procedure
do_cas(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1244   RV do_cas(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1245             kt::TimedDB* db,
1246             const std::map<std::string, std::string>& inmap,
1247             std::map<std::string, std::string>& outmap) {
1248     uint32_t thid = sess->thread_id();
1249     if (!db) {
1250       set_message(outmap, "ERROR", "no such database");
1251       return kt::RPCClient::RVEINVALID;
1252     }
1253     size_t ksiz;
1254     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1255     if (!kbuf) {
1256       set_message(outmap, "ERROR", "invalid parameters");
1257       return kt::RPCClient::RVEINVALID;
1258     }
1259     size_t ovsiz;
1260     const char* ovbuf = kt::strmapget(inmap, "oval", &ovsiz);
1261     size_t nvsiz;
1262     const char* nvbuf = kt::strmapget(inmap, "nval", &nvsiz);
1263     const char* rp = kt::strmapget(inmap, "xt");
1264     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1265     RV rv;
1266     opcounts_[thid][CNTSET]++;
1267     if (db->cas(kbuf, ksiz, ovbuf, ovsiz, nvbuf, nvsiz, xt)) {
1268       rv = kt::RPCClient::RVSUCCESS;
1269     } else {
1270       opcounts_[thid][CNTSETMISS]++;
1271       const kc::BasicDB::Error& e = db->error();
1272       set_db_error(outmap, e);
1273       if (e == kc::BasicDB::Error::LOGIC) {
1274         rv = kt::RPCClient::RVELOGIC;
1275       } else {
1276         log_db_error(serv, e);
1277         rv = kt::RPCClient::RVEINTERNAL;
1278       }
1279     }
1280     return rv;
1281   }
1282   // process the remove procedure
do_remove(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1283   RV do_remove(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1284                kt::TimedDB* db,
1285                const std::map<std::string, std::string>& inmap,
1286                std::map<std::string, std::string>& outmap) {
1287     uint32_t thid = sess->thread_id();
1288     if (!db) {
1289       set_message(outmap, "ERROR", "no such database");
1290       return kt::RPCClient::RVEINVALID;
1291     }
1292     size_t ksiz;
1293     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1294     if (!kbuf) {
1295       set_message(outmap, "ERROR", "invalid parameters");
1296       return kt::RPCClient::RVEINVALID;
1297     }
1298     RV rv;
1299     opcounts_[thid][CNTREMOVE]++;
1300     if (db->remove(kbuf, ksiz)) {
1301       rv = kt::RPCClient::RVSUCCESS;
1302     } else {
1303       opcounts_[thid][CNTREMOVEMISS]++;
1304       const kc::BasicDB::Error& e = db->error();
1305       set_db_error(outmap, e);
1306       if (e == kc::BasicDB::Error::NOREC) {
1307         rv = kt::RPCClient::RVELOGIC;
1308       } else {
1309         log_db_error(serv, e);
1310         rv = kt::RPCClient::RVEINTERNAL;
1311       }
1312     }
1313     return rv;
1314   }
1315   // process the get procedure
do_get(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1316   RV do_get(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1317             kt::TimedDB* db,
1318             const std::map<std::string, std::string>& inmap,
1319             std::map<std::string, std::string>& outmap) {
1320     uint32_t thid = sess->thread_id();
1321     if (!db) {
1322       set_message(outmap, "ERROR", "no such database");
1323       return kt::RPCClient::RVEINVALID;
1324     }
1325     size_t ksiz;
1326     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1327     if (!kbuf) {
1328       set_message(outmap, "ERROR", "invalid parameters");
1329       return kt::RPCClient::RVEINVALID;
1330     }
1331     RV rv;
1332     opcounts_[thid][CNTGET]++;
1333     size_t vsiz;
1334     int64_t xt;
1335     const char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
1336     if (vbuf) {
1337       outmap["value"] = std::string(vbuf, vsiz);
1338       if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
1339       delete[] vbuf;
1340       rv = kt::RPCClient::RVSUCCESS;
1341     } else {
1342       opcounts_[thid][CNTGETMISS]++;
1343       const kc::BasicDB::Error& e = db->error();
1344       set_db_error(outmap, e);
1345       if (e == kc::BasicDB::Error::NOREC) {
1346         rv = kt::RPCClient::RVELOGIC;
1347       } else {
1348         log_db_error(serv, e);
1349         rv = kt::RPCClient::RVEINTERNAL;
1350       }
1351     }
1352     return rv;
1353   }
1354   // process the check procedure
do_check(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1355   RV do_check(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1356               kt::TimedDB* db,
1357               const std::map<std::string, std::string>& inmap,
1358               std::map<std::string, std::string>& outmap) {
1359     uint32_t thid = sess->thread_id();
1360     if (!db) {
1361       set_message(outmap, "ERROR", "no such database");
1362       return kt::RPCClient::RVEINVALID;
1363     }
1364     size_t ksiz;
1365     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1366     if (!kbuf) {
1367       set_message(outmap, "ERROR", "invalid parameters");
1368       return kt::RPCClient::RVEINVALID;
1369     }
1370     RV rv;
1371     opcounts_[thid][CNTGET]++;
1372     int64_t xt;
1373     int32_t vsiz = db->check(kbuf, ksiz, &xt);
1374     if (vsiz >= 0) {
1375       set_message(outmap, "vsiz", "%lld", (long long)vsiz);
1376       if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
1377       rv = kt::RPCClient::RVSUCCESS;
1378     } else {
1379       opcounts_[thid][CNTGETMISS]++;
1380       const kc::BasicDB::Error& e = db->error();
1381       set_db_error(outmap, e);
1382       if (e == kc::BasicDB::Error::NOREC) {
1383         rv = kt::RPCClient::RVELOGIC;
1384       } else {
1385         log_db_error(serv, e);
1386         rv = kt::RPCClient::RVEINTERNAL;
1387       }
1388     }
1389     return rv;
1390   }
1391   // process the seize procedure
do_seize(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1392   RV do_seize(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1393               kt::TimedDB* db,
1394               const std::map<std::string, std::string>& inmap,
1395               std::map<std::string, std::string>& outmap) {
1396     uint32_t thid = sess->thread_id();
1397     if (!db) {
1398       set_message(outmap, "ERROR", "no such database");
1399       return kt::RPCClient::RVEINVALID;
1400     }
1401     size_t ksiz;
1402     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1403     if (!kbuf) {
1404       set_message(outmap, "ERROR", "invalid parameters");
1405       return kt::RPCClient::RVEINVALID;
1406     }
1407     RV rv;
1408     opcounts_[thid][CNTREMOVE]++;
1409     opcounts_[thid][CNTGET]++;
1410     size_t vsiz;
1411     int64_t xt;
1412     const char* vbuf = db->seize(kbuf, ksiz, &vsiz, &xt);
1413     if (vbuf) {
1414       outmap["value"] = std::string(vbuf, vsiz);
1415       if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
1416       delete[] vbuf;
1417       rv = kt::RPCClient::RVSUCCESS;
1418     } else {
1419       opcounts_[thid][CNTREMOVEMISS]++;
1420       opcounts_[thid][CNTGETMISS]++;
1421       const kc::BasicDB::Error& e = db->error();
1422       set_db_error(outmap, e);
1423       if (e == kc::BasicDB::Error::NOREC) {
1424         rv = kt::RPCClient::RVELOGIC;
1425       } else {
1426         log_db_error(serv, e);
1427         rv = kt::RPCClient::RVEINTERNAL;
1428       }
1429     }
1430     return rv;
1431   }
1432   // process the set_bulk procedure
do_set_bulk(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1433   RV do_set_bulk(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1434                  kt::TimedDB* db,
1435                  const std::map<std::string, std::string>& inmap,
1436                  std::map<std::string, std::string>& outmap) {
1437     uint32_t thid = sess->thread_id();
1438     if (!db) {
1439       set_message(outmap, "ERROR", "no such database");
1440       return kt::RPCClient::RVEINVALID;
1441     }
1442     const char* rp = kt::strmapget(inmap, "xt");
1443     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1444     rp = kt::strmapget(inmap, "atomic");
1445     bool atomic = rp ? true : false;
1446     std::map<std::string, std::string> recs;
1447     std::map<std::string, std::string>::const_iterator it = inmap.begin();
1448     std::map<std::string, std::string>::const_iterator itend = inmap.end();
1449     while (it != itend) {
1450       const char* kbuf = it->first.data();
1451       size_t ksiz = it->first.size();
1452       if (ksiz > 0 && *kbuf == '_') {
1453         std::string key(kbuf + 1, ksiz - 1);
1454         std::string value(it->second.data(), it->second.size());
1455         recs[key] = value;
1456       }
1457       ++it;
1458     }
1459     RV rv;
1460     opcounts_[thid][CNTSET] += recs.size();
1461     int64_t num = db->set_bulk(recs, xt, atomic);
1462     if (num >= 0) {
1463       opcounts_[thid][CNTSETMISS] += recs.size() - (size_t)num;
1464       rv = kt::RPCClient::RVSUCCESS;
1465       set_message(outmap, "num", "%lld", (long long)num);
1466     } else {
1467       opcounts_[thid][CNTSETMISS] += recs.size();
1468       const kc::BasicDB::Error& e = db->error();
1469       set_db_error(outmap, e);
1470       log_db_error(serv, e);
1471       rv = kt::RPCClient::RVEINTERNAL;
1472     }
1473     return rv;
1474   }
1475   // process the remove_bulk procedure
do_remove_bulk(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1476   RV do_remove_bulk(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1477                     kt::TimedDB* db,
1478                     const std::map<std::string, std::string>& inmap,
1479                     std::map<std::string, std::string>& outmap) {
1480     uint32_t thid = sess->thread_id();
1481     if (!db) {
1482       set_message(outmap, "ERROR", "no such database");
1483       return kt::RPCClient::RVEINVALID;
1484     }
1485     const char* rp = kt::strmapget(inmap, "atomic");
1486     bool atomic = rp ? true : false;
1487     std::vector<std::string> keys;
1488     keys.reserve(inmap.size());
1489     std::map<std::string, std::string>::const_iterator it = inmap.begin();
1490     std::map<std::string, std::string>::const_iterator itend = inmap.end();
1491     while (it != itend) {
1492       const char* kbuf = it->first.data();
1493       size_t ksiz = it->first.size();
1494       if (ksiz > 0 && *kbuf == '_') {
1495         std::string key(kbuf + 1, ksiz - 1);
1496         keys.push_back(key);
1497       }
1498       ++it;
1499     }
1500     RV rv;
1501     opcounts_[thid][CNTREMOVE] += keys.size();
1502     int64_t num = db->remove_bulk(keys, atomic);
1503     if (num >= 0) {
1504       opcounts_[thid][CNTREMOVEMISS] += keys.size() - (size_t)num;
1505       rv = kt::RPCClient::RVSUCCESS;
1506       set_message(outmap, "num", "%lld", (long long)num);
1507     } else {
1508       opcounts_[thid][CNTREMOVEMISS] += keys.size();
1509       const kc::BasicDB::Error& e = db->error();
1510       set_db_error(outmap, e);
1511       log_db_error(serv, e);
1512       rv = kt::RPCClient::RVEINTERNAL;
1513     }
1514     return rv;
1515   }
1516   // process the get_bulk procedure
do_get_bulk(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1517   RV do_get_bulk(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1518                  kt::TimedDB* db,
1519                  const std::map<std::string, std::string>& inmap,
1520                  std::map<std::string, std::string>& outmap) {
1521     uint32_t thid = sess->thread_id();
1522     if (!db) {
1523       set_message(outmap, "ERROR", "no such database");
1524       return kt::RPCClient::RVEINVALID;
1525     }
1526     const char* rp = kt::strmapget(inmap, "atomic");
1527     bool atomic = rp ? true : false;
1528     std::vector<std::string> keys;
1529     keys.reserve(inmap.size());
1530     std::map<std::string, std::string>::const_iterator it = inmap.begin();
1531     std::map<std::string, std::string>::const_iterator itend = inmap.end();
1532     while (it != itend) {
1533       const char* kbuf = it->first.data();
1534       size_t ksiz = it->first.size();
1535       if (ksiz > 0 && *kbuf == '_') {
1536         std::string key(kbuf + 1, ksiz - 1);
1537         keys.push_back(key);
1538       }
1539       ++it;
1540     }
1541     RV rv;
1542     opcounts_[thid][CNTGET] += keys.size();
1543     std::map<std::string, std::string> recs;
1544     int64_t num = db->get_bulk(keys, &recs, atomic);
1545     if (num >= 0) {
1546       opcounts_[thid][CNTGETMISS] += keys.size() - (size_t)num;
1547       rv = kt::RPCClient::RVSUCCESS;
1548       std::map<std::string, std::string>::iterator it = recs.begin();
1549       std::map<std::string, std::string>::iterator itend = recs.end();
1550       while (it != itend) {
1551         std::string key("_");
1552         key.append(it->first);
1553         outmap[key] = it->second;
1554         ++it;
1555       }
1556       set_message(outmap, "num", "%lld", (long long)num);
1557     } else {
1558       opcounts_[thid][CNTGETMISS] += keys.size();
1559       const kc::BasicDB::Error& e = db->error();
1560       set_db_error(outmap, e);
1561       log_db_error(serv, e);
1562       rv = kt::RPCClient::RVEINTERNAL;
1563     }
1564     return rv;
1565   }
1566   // process the vacuum procedure
do_vacuum(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1567   RV do_vacuum(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1568                kt::TimedDB* db,
1569                const std::map<std::string, std::string>& inmap,
1570                std::map<std::string, std::string>& outmap) {
1571     uint32_t thid = sess->thread_id();
1572     if (!db) {
1573       set_message(outmap, "ERROR", "no such database");
1574       return kt::RPCClient::RVEINVALID;
1575     }
1576     const char* rp = kt::strmapget(inmap, "step");
1577     int64_t step = rp ? kc::atoi(rp) : 0;
1578     RV rv;
1579     opcounts_[thid][CNTMISC]++;
1580     if (db->vacuum(step)) {
1581       rv = kt::RPCClient::RVSUCCESS;
1582     } else {
1583       const kc::BasicDB::Error& e = db->error();
1584       set_db_error(outmap, e);
1585       log_db_error(serv, e);
1586       rv = kt::RPCClient::RVEINTERNAL;
1587     }
1588     return rv;
1589   }
1590   // process the match_prefix procedure
do_match_prefix(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1591   RV do_match_prefix(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1592                      kt::TimedDB* db,
1593                      const std::map<std::string, std::string>& inmap,
1594                      std::map<std::string, std::string>& outmap) {
1595     uint32_t thid = sess->thread_id();
1596     if (!db) {
1597       set_message(outmap, "ERROR", "no such database");
1598       return kt::RPCClient::RVEINVALID;
1599     }
1600     size_t psiz;
1601     const char* pbuf = kt::strmapget(inmap, "prefix", &psiz);
1602     if (!pbuf) {
1603       set_message(outmap, "ERROR", "invalid parameters");
1604       return kt::RPCClient::RVEINVALID;
1605     }
1606     const char* rp = kt::strmapget(inmap, "max");
1607     int64_t max = rp ? kc::atoi(rp) : -1;
1608     std::vector<std::string> keys;
1609     RV rv;
1610     opcounts_[thid][CNTMISC]++;
1611     int64_t num = db->match_prefix(std::string(pbuf, psiz), &keys, max);
1612     if (num >= 0) {
1613       std::vector<std::string>::iterator it = keys.begin();
1614       std::vector<std::string>::iterator itend = keys.end();
1615       int64_t cnt = 0;
1616       while (it != itend) {
1617         std::string key = "_";
1618         key.append(*it);
1619         outmap[key] = kc::strprintf("%lld", (long long)cnt);
1620         ++cnt;
1621         ++it;
1622       }
1623       set_message(outmap, "num", "%lld", (long long)num);
1624       rv = kt::RPCClient::RVSUCCESS;
1625     } else {
1626       const kc::BasicDB::Error& e = db->error();
1627       set_db_error(outmap, e);
1628       log_db_error(serv, e);
1629       rv = kt::RPCClient::RVEINTERNAL;
1630     }
1631     return rv;
1632   }
1633   // process the match_regex procedure
do_match_regex(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1634   RV do_match_regex(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1635                     kt::TimedDB* db,
1636                     const std::map<std::string, std::string>& inmap,
1637                     std::map<std::string, std::string>& outmap) {
1638     uint32_t thid = sess->thread_id();
1639     if (!db) {
1640       set_message(outmap, "ERROR", "no such database");
1641       return kt::RPCClient::RVEINVALID;
1642     }
1643     size_t psiz;
1644     const char* pbuf = kt::strmapget(inmap, "regex", &psiz);
1645     if (!pbuf) {
1646       set_message(outmap, "ERROR", "invalid parameters");
1647       return kt::RPCClient::RVEINVALID;
1648     }
1649     const char* rp = kt::strmapget(inmap, "max");
1650     int64_t max = rp ? kc::atoi(rp) : -1;
1651     std::vector<std::string> keys;
1652     RV rv;
1653     opcounts_[thid][CNTMISC]++;
1654     int64_t num = db->match_regex(std::string(pbuf, psiz), &keys, max);
1655     if (num >= 0) {
1656       std::vector<std::string>::iterator it = keys.begin();
1657       std::vector<std::string>::iterator itend = keys.end();
1658       int64_t cnt = 0;
1659       while (it != itend) {
1660         std::string key = "_";
1661         key.append(*it);
1662         outmap[key] = kc::strprintf("%lld", (long long)cnt);
1663         ++cnt;
1664         ++it;
1665       }
1666       set_message(outmap, "num", "%lld", (long long)num);
1667       rv = kt::RPCClient::RVSUCCESS;
1668     } else {
1669       const kc::BasicDB::Error& e = db->error();
1670       set_db_error(outmap, e);
1671       if (e == kc::BasicDB::Error::LOGIC) {
1672         rv = kt::RPCClient::RVELOGIC;
1673       } else {
1674         log_db_error(serv, e);
1675         rv = kt::RPCClient::RVEINTERNAL;
1676       }
1677     }
1678     return rv;
1679   }
1680   // process the match_similar procedure
do_match_similar(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1681   RV do_match_similar(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1682                       kt::TimedDB* db,
1683                       const std::map<std::string, std::string>& inmap,
1684                       std::map<std::string, std::string>& outmap) {
1685     uint32_t thid = sess->thread_id();
1686     if (!db) {
1687       set_message(outmap, "ERROR", "no such database");
1688       return kt::RPCClient::RVEINVALID;
1689     }
1690     size_t osiz;
1691     const char* obuf = kt::strmapget(inmap, "origin", &osiz);
1692     if (!obuf) {
1693       set_message(outmap, "ERROR", "invalid parameters");
1694       return kt::RPCClient::RVEINVALID;
1695     }
1696     const char* rp = kt::strmapget(inmap, "range");
1697     int64_t range = rp ? kc::atoi(rp) : 1;
1698     if (range < 0) range = 1;
1699     rp = kt::strmapget(inmap, "utf");
1700     bool utf = rp ? true : false;
1701     rp = kt::strmapget(inmap, "max");
1702     int64_t max = rp ? kc::atoi(rp) : -1;
1703     std::vector<std::string> keys;
1704     RV rv;
1705     opcounts_[thid][CNTMISC]++;
1706     int64_t num = db->match_similar(std::string(obuf, osiz), range, utf, &keys, max);
1707     if (num >= 0) {
1708       std::vector<std::string>::iterator it = keys.begin();
1709       std::vector<std::string>::iterator itend = keys.end();
1710       int64_t cnt = 0;
1711       while (it != itend) {
1712         std::string key = "_";
1713         key.append(*it);
1714         outmap[key] = kc::strprintf("%lld", (long long)cnt);
1715         ++cnt;
1716         ++it;
1717       }
1718       set_message(outmap, "num", "%lld", (long long)num);
1719       rv = kt::RPCClient::RVSUCCESS;
1720     } else {
1721       const kc::BasicDB::Error& e = db->error();
1722       set_db_error(outmap, e);
1723       if (e == kc::BasicDB::Error::LOGIC) {
1724         rv = kt::RPCClient::RVELOGIC;
1725       } else {
1726         log_db_error(serv, e);
1727         rv = kt::RPCClient::RVEINTERNAL;
1728       }
1729     }
1730     return rv;
1731   }
1732   // process the cur_jump procedure
do_cur_jump(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1733   RV do_cur_jump(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1734                  kt::TimedDB::Cursor* cur,
1735                  const std::map<std::string, std::string>& inmap,
1736                  std::map<std::string, std::string>& outmap) {
1737     uint32_t thid = sess->thread_id();
1738     if (!cur) {
1739       set_message(outmap, "ERROR", "no such cursor");
1740       return kt::RPCClient::RVEINVALID;
1741     }
1742     size_t ksiz;
1743     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1744     RV rv;
1745     opcounts_[thid][CNTMISC]++;
1746     if (kbuf) {
1747       if (cur->jump(kbuf, ksiz)) {
1748         rv = kt::RPCClient::RVSUCCESS;
1749       } else {
1750         const kc::BasicDB::Error& e = cur->error();
1751         set_db_error(outmap, e);
1752         if (e == kc::BasicDB::Error::NOREC) {
1753           rv = kt::RPCClient::RVELOGIC;
1754         } else {
1755           log_db_error(serv, e);
1756           rv = kt::RPCClient::RVEINTERNAL;
1757         }
1758       }
1759     } else {
1760       if (cur->jump()) {
1761         rv = kt::RPCClient::RVSUCCESS;
1762       } else {
1763         const kc::BasicDB::Error& e = cur->error();
1764         set_db_error(outmap, e);
1765         if (e == kc::BasicDB::Error::NOREC) {
1766           rv = kt::RPCClient::RVELOGIC;
1767         } else {
1768           log_db_error(serv, e);
1769           rv = kt::RPCClient::RVEINTERNAL;
1770         }
1771       }
1772     }
1773     return rv;
1774   }
1775   // process the cur_jump_back procedure
do_cur_jump_back(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1776   RV do_cur_jump_back(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1777                       kt::TimedDB::Cursor* cur,
1778                       const std::map<std::string, std::string>& inmap,
1779                       std::map<std::string, std::string>& outmap) {
1780     uint32_t thid = sess->thread_id();
1781     if (!cur) {
1782       set_message(outmap, "ERROR", "no such cursor");
1783       return kt::RPCClient::RVEINVALID;
1784     }
1785     size_t ksiz;
1786     const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1787     RV rv;
1788     opcounts_[thid][CNTMISC]++;
1789     if (kbuf) {
1790       if (cur->jump_back(kbuf, ksiz)) {
1791         rv = kt::RPCClient::RVSUCCESS;
1792       } else {
1793         const kc::BasicDB::Error& e = cur->error();
1794         set_db_error(outmap, e);
1795         switch (e.code()) {
1796           case kc::BasicDB::Error::NOIMPL: {
1797             rv = kt::RPCClient::RVENOIMPL;
1798             break;
1799           }
1800           case kc::BasicDB::Error::NOREC: {
1801             rv = kt::RPCClient::RVELOGIC;
1802             break;
1803           }
1804           default: {
1805             log_db_error(serv, e);
1806             rv = kt::RPCClient::RVEINTERNAL;
1807             break;
1808           }
1809         }
1810       }
1811     } else {
1812       if (cur->jump_back()) {
1813         rv = kt::RPCClient::RVSUCCESS;
1814       } else {
1815         const kc::BasicDB::Error& e = cur->error();
1816         set_db_error(outmap, e);
1817         switch (e.code()) {
1818           case kc::BasicDB::Error::NOIMPL: {
1819             rv = kt::RPCClient::RVENOIMPL;
1820             break;
1821           }
1822           case kc::BasicDB::Error::NOREC: {
1823             rv = kt::RPCClient::RVELOGIC;
1824             break;
1825           }
1826           default: {
1827             log_db_error(serv, e);
1828             rv = kt::RPCClient::RVEINTERNAL;
1829             break;
1830           }
1831         }
1832       }
1833     }
1834     return rv;
1835   }
1836   // process the cur_step procedure
do_cur_step(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1837   RV do_cur_step(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1838                  kt::TimedDB::Cursor* cur,
1839                  const std::map<std::string, std::string>& inmap,
1840                  std::map<std::string, std::string>& outmap) {
1841     uint32_t thid = sess->thread_id();
1842     if (!cur) {
1843       set_message(outmap, "ERROR", "no such cursor");
1844       return kt::RPCClient::RVEINVALID;
1845     }
1846     RV rv;
1847     opcounts_[thid][CNTMISC]++;
1848     if (cur->step()) {
1849       rv = kt::RPCClient::RVSUCCESS;
1850     } else {
1851       const kc::BasicDB::Error& e = cur->error();
1852       set_db_error(outmap, e);
1853       if (e == kc::BasicDB::Error::NOREC) {
1854         rv = kt::RPCClient::RVELOGIC;
1855       } else {
1856         log_db_error(serv, e);
1857         rv = kt::RPCClient::RVEINTERNAL;
1858       }
1859     }
1860     return rv;
1861   }
1862   // process the cur_step_back procedure
do_cur_step_back(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1863   RV do_cur_step_back(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1864                       kt::TimedDB::Cursor* cur,
1865                       const std::map<std::string, std::string>& inmap,
1866                       std::map<std::string, std::string>& outmap) {
1867     uint32_t thid = sess->thread_id();
1868     if (!cur) {
1869       set_message(outmap, "ERROR", "no such cursor");
1870       return kt::RPCClient::RVEINVALID;
1871     }
1872     RV rv;
1873     opcounts_[thid][CNTMISC]++;
1874     if (cur->step_back()) {
1875       rv = kt::RPCClient::RVSUCCESS;
1876     } else {
1877       const kc::BasicDB::Error& e = cur->error();
1878       set_db_error(outmap, e);
1879       switch (e.code()) {
1880         case kc::BasicDB::Error::NOIMPL: {
1881           rv = kt::RPCClient::RVENOIMPL;
1882           break;
1883         }
1884         case kc::BasicDB::Error::NOREC: {
1885           rv = kt::RPCClient::RVELOGIC;
1886           break;
1887         }
1888         default: {
1889           log_db_error(serv, e);
1890           rv = kt::RPCClient::RVEINTERNAL;
1891           break;
1892         }
1893       }
1894     }
1895     return rv;
1896   }
1897   // process the cur_set_value procedure
do_cur_set_value(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1898   RV do_cur_set_value(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1899                       kt::TimedDB::Cursor* cur,
1900                       const std::map<std::string, std::string>& inmap,
1901                       std::map<std::string, std::string>& outmap) {
1902     uint32_t thid = sess->thread_id();
1903     if (!cur) {
1904       set_message(outmap, "ERROR", "no such cursor");
1905       return kt::RPCClient::RVEINVALID;
1906     }
1907     size_t vsiz;
1908     const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1909     if (!vbuf) {
1910       set_message(outmap, "ERROR", "invalid parameters");
1911       return kt::RPCClient::RVEINVALID;
1912     }
1913     const char* rp = kt::strmapget(inmap, "step");
1914     bool step = rp ? true : false;
1915     rp = kt::strmapget(inmap, "xt");
1916     int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1917     RV rv;
1918     opcounts_[thid][CNTSET]++;
1919     if (cur->set_value(vbuf, vsiz, xt, step)) {
1920       rv = kt::RPCClient::RVSUCCESS;
1921     } else {
1922       opcounts_[thid][CNTSETMISS]++;
1923       const kc::BasicDB::Error& e = cur->error();
1924       set_db_error(outmap, e);
1925       if (e == kc::BasicDB::Error::NOREC) {
1926         rv = kt::RPCClient::RVELOGIC;
1927       } else {
1928         log_db_error(serv, e);
1929         rv = kt::RPCClient::RVEINTERNAL;
1930       }
1931     }
1932     return rv;
1933   }
1934   // process the remove procedure
do_cur_remove(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1935   RV do_cur_remove(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1936                    kt::TimedDB::Cursor* cur,
1937                    const std::map<std::string, std::string>& inmap,
1938                    std::map<std::string, std::string>& outmap) {
1939     uint32_t thid = sess->thread_id();
1940     if (!cur) {
1941       set_message(outmap, "ERROR", "no such cursor");
1942       return kt::RPCClient::RVEINVALID;
1943     }
1944     RV rv;
1945     opcounts_[thid][CNTREMOVE]++;
1946     if (cur->remove()) {
1947       rv = kt::RPCClient::RVSUCCESS;
1948     } else {
1949       opcounts_[thid][CNTREMOVEMISS]++;
1950       const kc::BasicDB::Error& e = cur->error();
1951       set_db_error(outmap, e);
1952       if (e == kc::BasicDB::Error::NOREC) {
1953         rv = kt::RPCClient::RVELOGIC;
1954       } else {
1955         log_db_error(serv, e);
1956         rv = kt::RPCClient::RVEINTERNAL;
1957       }
1958     }
1959     return rv;
1960   }
1961   // process the cur_get_key procedure
do_cur_get_key(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1962   RV do_cur_get_key(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1963                     kt::TimedDB::Cursor* cur,
1964                     const std::map<std::string, std::string>& inmap,
1965                     std::map<std::string, std::string>& outmap) {
1966     uint32_t thid = sess->thread_id();
1967     if (!cur) {
1968       set_message(outmap, "ERROR", "no such cursor");
1969       return kt::RPCClient::RVEINVALID;
1970     }
1971     const char* rp = kt::strmapget(inmap, "step");
1972     bool step = rp ? true : false;
1973     RV rv;
1974     opcounts_[thid][CNTGET]++;
1975     size_t ksiz;
1976     char* kbuf = cur->get_key(&ksiz, step);
1977     if (kbuf) {
1978       outmap["key"] = std::string(kbuf, ksiz);
1979       delete[] kbuf;
1980       rv = kt::RPCClient::RVSUCCESS;
1981     } else {
1982       opcounts_[thid][CNTGETMISS]++;
1983       const kc::BasicDB::Error& e = cur->error();
1984       set_db_error(outmap, e);
1985       if (e == kc::BasicDB::Error::NOREC) {
1986         rv = kt::RPCClient::RVELOGIC;
1987       } else {
1988         log_db_error(serv, e);
1989         rv = kt::RPCClient::RVEINTERNAL;
1990       }
1991     }
1992     return rv;
1993   }
1994   // process the cur_get_value procedure
do_cur_get_value(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1995   RV do_cur_get_value(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1996                       kt::TimedDB::Cursor* cur,
1997                       const std::map<std::string, std::string>& inmap,
1998                       std::map<std::string, std::string>& outmap) {
1999     uint32_t thid = sess->thread_id();
2000     if (!cur) {
2001       set_message(outmap, "ERROR", "no such cursor");
2002       return kt::RPCClient::RVEINVALID;
2003     }
2004     const char* rp = kt::strmapget(inmap, "step");
2005     bool step = rp ? true : false;
2006     RV rv;
2007     opcounts_[thid][CNTGET]++;
2008     size_t vsiz;
2009     char* vbuf = cur->get_value(&vsiz, step);
2010     if (vbuf) {
2011       outmap["value"] = std::string(vbuf, vsiz);
2012       delete[] vbuf;
2013       rv = kt::RPCClient::RVSUCCESS;
2014     } else {
2015       opcounts_[thid][CNTGETMISS]++;
2016       const kc::BasicDB::Error& e = cur->error();
2017       set_db_error(outmap, e);
2018       if (e == kc::BasicDB::Error::NOREC) {
2019         rv = kt::RPCClient::RVELOGIC;
2020       } else {
2021         log_db_error(serv, e);
2022         rv = kt::RPCClient::RVEINTERNAL;
2023       }
2024     }
2025     return rv;
2026   }
2027   // process the cur_get procedure
do_cur_get(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)2028   RV do_cur_get(kt::RPCServer* serv, kt::RPCServer::Session* sess,
2029                 kt::TimedDB::Cursor* cur,
2030                 const std::map<std::string, std::string>& inmap,
2031                 std::map<std::string, std::string>& outmap) {
2032     uint32_t thid = sess->thread_id();
2033     if (!cur) {
2034       set_message(outmap, "ERROR", "no such cursor");
2035       return kt::RPCClient::RVEINVALID;
2036     }
2037     const char* rp = kt::strmapget(inmap, "step");
2038     bool step = rp ? true : false;
2039     RV rv;
2040     opcounts_[thid][CNTGET]++;
2041     size_t ksiz, vsiz;
2042     const char* vbuf;
2043     int64_t xt;
2044     char* kbuf = cur->get(&ksiz, &vbuf, &vsiz, &xt, step);
2045     if (kbuf) {
2046       outmap["key"] = std::string(kbuf, ksiz);
2047       outmap["value"] = std::string(vbuf, vsiz);
2048       if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
2049       delete[] kbuf;
2050       rv = kt::RPCClient::RVSUCCESS;
2051     } else {
2052       opcounts_[thid][CNTGETMISS]++;
2053       const kc::BasicDB::Error& e = cur->error();
2054       set_db_error(outmap, e);
2055       if (e == kc::BasicDB::Error::NOREC) {
2056         rv = kt::RPCClient::RVELOGIC;
2057       } else {
2058         log_db_error(serv, e);
2059         rv = kt::RPCClient::RVEINTERNAL;
2060       }
2061     }
2062     return rv;
2063   }
2064   // process the cur_seize procedure
do_cur_seize(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)2065   RV do_cur_seize(kt::RPCServer* serv, kt::RPCServer::Session* sess,
2066                   kt::TimedDB::Cursor* cur,
2067                   const std::map<std::string, std::string>& inmap,
2068                   std::map<std::string, std::string>& outmap) {
2069     uint32_t thid = sess->thread_id();
2070     if (!cur) {
2071       set_message(outmap, "ERROR", "no such cursor");
2072       return kt::RPCClient::RVEINVALID;
2073     }
2074     RV rv;
2075     opcounts_[thid][CNTGET]++;
2076     size_t ksiz, vsiz;
2077     const char* vbuf;
2078     int64_t xt;
2079     char* kbuf = cur->seize(&ksiz, &vbuf, &vsiz, &xt);
2080     if (kbuf) {
2081       outmap["key"] = std::string(kbuf, ksiz);
2082       outmap["value"] = std::string(vbuf, vsiz);
2083       if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
2084       delete[] kbuf;
2085       rv = kt::RPCClient::RVSUCCESS;
2086     } else {
2087       opcounts_[thid][CNTGETMISS]++;
2088       const kc::BasicDB::Error& e = cur->error();
2089       set_db_error(outmap, e);
2090       if (e == kc::BasicDB::Error::NOREC) {
2091         rv = kt::RPCClient::RVELOGIC;
2092       } else {
2093         log_db_error(serv, e);
2094         rv = kt::RPCClient::RVEINTERNAL;
2095       }
2096     }
2097     return rv;
2098   }
2099   // process the restful get command
do_rest_get(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2100   int32_t do_rest_get(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2101                       kt::TimedDB* db, const char* kbuf, size_t ksiz,
2102                       const std::map<std::string, std::string>& reqheads,
2103                       const std::string& reqbody,
2104                       std::map<std::string, std::string>& resheads,
2105                       std::string& resbody,
2106                       const std::map<std::string, std::string>& misc) {
2107     uint32_t thid = sess->thread_id();
2108     int32_t code;
2109     opcounts_[thid][CNTGET]++;
2110     size_t vsiz;
2111     int64_t xt;
2112     const char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
2113     if (vbuf) {
2114       resbody.append(vbuf, vsiz);
2115       if (xt < kt::TimedDB::XTMAX) {
2116         char buf[48];
2117         kt::datestrhttp(xt, 0, buf);
2118         resheads["x-kt-xt"] = buf;
2119       }
2120       delete[] vbuf;
2121       code = 200;
2122     } else {
2123       opcounts_[thid][CNTGETMISS]++;
2124       const kc::BasicDB::Error& e = db->error();
2125       kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2126       if (e == kc::BasicDB::Error::NOREC) {
2127         code = 404;
2128       } else {
2129         log_db_error(serv, e);
2130         code = 500;
2131       }
2132     }
2133     return code;
2134   }
2135   // process the restful head command
do_rest_head(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2136   int32_t do_rest_head(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2137                        kt::TimedDB* db, const char* kbuf, size_t ksiz,
2138                        const std::map<std::string, std::string>& reqheads,
2139                        const std::string& reqbody,
2140                        std::map<std::string, std::string>& resheads,
2141                        std::string& resbody,
2142                        const std::map<std::string, std::string>& misc) {
2143     uint32_t thid = sess->thread_id();
2144     int32_t code;
2145     opcounts_[thid][CNTGET]++;
2146     size_t vsiz;
2147     int64_t xt;
2148     const char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
2149     if (vbuf) {
2150       if (xt < kt::TimedDB::XTMAX) {
2151         char buf[48];
2152         kt::datestrhttp(xt, 0, buf);
2153         resheads["x-kt-xt"] = buf;
2154       }
2155       kc::strprintf(&resheads["content-length"], "%lld", (long long)vsiz);
2156       delete[] vbuf;
2157       code = 200;
2158     } else {
2159       opcounts_[thid][CNTGETMISS]++;
2160       const kc::BasicDB::Error& e = db->error();
2161       kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2162       resheads["content-length"] = "0";
2163       if (e == kc::BasicDB::Error::NOREC) {
2164         code = 404;
2165       } else {
2166         log_db_error(serv, e);
2167         code = 500;
2168       }
2169     }
2170     return code;
2171   }
2172   // process the restful put command
do_rest_put(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2173   int32_t do_rest_put(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2174                       kt::TimedDB* db, const char* kbuf, size_t ksiz,
2175                       const std::map<std::string, std::string>& reqheads,
2176                       const std::string& reqbody,
2177                       std::map<std::string, std::string>& resheads,
2178                       std::string& resbody,
2179                       const std::map<std::string, std::string>& misc) {
2180     uint32_t thid = sess->thread_id();
2181     int32_t mode = 0;
2182     const char* rp = kt::strmapget(reqheads, "x-kt-mode");
2183     if (rp) {
2184       if (!kc::stricmp(rp, "add")) {
2185         mode = 1;
2186       } else if (!kc::stricmp(rp, "replace")) {
2187         mode = 2;
2188       }
2189     }
2190     rp = kt::strmapget(reqheads, "x-kt-xt");
2191     int64_t xt = rp ? kt::strmktime(rp) : -1;
2192     xt = xt > 0 && xt < kt::TimedDB::XTMAX ? -xt : kc::INT64MAX;
2193     int32_t code;
2194     opcounts_[thid][CNTSET]++;
2195 
2196     bool rv;
2197     switch (mode) {
2198       default: {
2199         rv = db->set(kbuf, ksiz, reqbody.data(), reqbody.size(), xt);
2200         break;
2201       }
2202       case 1: {
2203         rv = db->add(kbuf, ksiz, reqbody.data(), reqbody.size(), xt);
2204         break;
2205       }
2206       case 2: {
2207         rv = db->replace(kbuf, ksiz, reqbody.data(), reqbody.size(), xt);
2208         break;
2209       }
2210     }
2211     if (rv) {
2212       const char* url = kt::strmapget(misc, "url");
2213       if (url) resheads["location"] = url;
2214       code = 201;
2215     } else {
2216       opcounts_[thid][CNTSETMISS]++;
2217       const kc::BasicDB::Error& e = db->error();
2218       kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2219       if (e == kc::BasicDB::Error::DUPREC || e == kc::BasicDB::Error::NOREC) {
2220         code = 450;
2221       } else {
2222         log_db_error(serv, e);
2223         code = 500;
2224       }
2225     }
2226     return code;
2227   }
2228   // process the restful delete command
do_rest_delete(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2229   int32_t do_rest_delete(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2230                          kt::TimedDB* db, const char* kbuf, size_t ksiz,
2231                          const std::map<std::string, std::string>& reqheads,
2232                          const std::string& reqbody,
2233                          std::map<std::string, std::string>& resheads,
2234                          std::string& resbody,
2235                          const std::map<std::string, std::string>& misc) {
2236     uint32_t thid = sess->thread_id();
2237     int32_t code;
2238     opcounts_[thid][CNTREMOVE]++;
2239     if (db->remove(kbuf, ksiz)) {
2240       code = 204;
2241     } else {
2242       opcounts_[thid][CNTREMOVEMISS]++;
2243       const kc::BasicDB::Error& e = db->error();
2244       kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2245       if (e == kc::BasicDB::Error::NOREC) {
2246         code = 404;
2247       } else {
2248         log_db_error(serv, e);
2249         code = 500;
2250       }
2251     }
2252     return code;
2253   }
2254   // process the binary replication command
do_bin_replication(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2255   bool do_bin_replication(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2256     char tbuf[sizeof(uint32_t)+sizeof(uint64_t)+sizeof(uint16_t)];
2257     if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2258     const char* rp = tbuf;
2259     uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2260     rp += sizeof(flags);
2261     uint64_t ts = kc::readfixnum(rp, sizeof(ts));
2262     rp += sizeof(ts);
2263     uint16_t sid = kc::readfixnum(rp, sizeof(sid));
2264     bool white = flags & kt::ReplicationClient::WHITESID;
2265     bool err = false;
2266     if (ulog_) {
2267       kt::UpdateLogger::Reader ulrd;
2268       if (ulrd.open(ulog_, ts)) {
2269         char c = kt::RemoteDB::BMREPLICATION;
2270         if (sess->send(&c, 1)) {
2271           serv->log(kt::ThreadedServer::Logger::SYSTEM, "a slave was connected: ts=%llu sid=%u",
2272                     (unsigned long long)ts, sid);
2273           char stack[kc::NUMBUFSIZ+RECBUFSIZ*4];
2274           uint64_t rts = 0;
2275           int32_t miss = 0;
2276           while (!err && !serv->aborted()) {
2277             size_t msiz;
2278             uint64_t mts;
2279             char* mbuf = ulrd.read(&msiz, &mts);
2280             if (mbuf) {
2281               size_t rsiz;
2282               uint16_t rsid = 0;
2283               uint16_t rdbid = 0;
2284               const char* rbuf = DBUpdateLogger::parse(mbuf, msiz, &rsiz, &rsid, &rdbid);
2285               if (white) {
2286                 if (rsid != sid) rbuf = NULL;
2287               } else {
2288                 if (rsid == sid) rbuf = NULL;
2289               }
2290               if (rbuf) {
2291                 miss = 0;
2292                 size_t nsiz = 1 + sizeof(uint64_t) + sizeof(uint32_t) + msiz;
2293                 char* nbuf = nsiz > sizeof(stack) ? new char[nsiz] : stack;
2294                 char* wp = nbuf;
2295                 *(wp++) = kt::RemoteDB::BMREPLICATION;
2296                 kc::writefixnum(wp, mts, sizeof(uint64_t));
2297                 wp += sizeof(uint64_t);
2298                 kc::writefixnum(wp, msiz, sizeof(uint32_t));
2299                 wp += sizeof(uint32_t);
2300                 std::memcpy(wp, mbuf, msiz);
2301                 if (!sess->send(nbuf, nsiz)) err = true;
2302                 if (nbuf != stack) delete[] nbuf;
2303               } else {
2304                 miss++;
2305                 if (miss >= Slave::DUMMYFREQ) {
2306                   char hbuf[1+sizeof(uint64_t)+sizeof(uint32_t)];
2307                   char* wp = hbuf;
2308                   *(wp++) = kt::RemoteDB::BMREPLICATION;
2309                   kc::writefixnum(wp, mts, sizeof(uint64_t));
2310                   wp += sizeof(uint64_t);
2311                   kc::writefixnum(wp, 0, sizeof(uint32_t));
2312                   if (!sess->send(hbuf, sizeof(hbuf))) err = true;
2313                   miss = 0;
2314                 }
2315               }
2316               if (mts > rts) rts = mts;
2317               delete[] mbuf;
2318             } else {
2319               uint64_t cc = kt::UpdateLogger::clock_pure();
2320               if (cc > 1000000000) cc -= 1000000000;
2321               if (cc < rts) cc = rts;
2322               char hbuf[1+sizeof(uint64_t)];
2323               char* wp = hbuf;
2324               *(wp++) = kt::RemoteDB::BMNOP;
2325               kc::writefixnum(wp, cc, sizeof(uint64_t));
2326               if (!sess->send(hbuf, sizeof(hbuf)) ||
2327                   sess->receive_byte() != kt::RemoteDB::BMREPLICATION)
2328                 err = true;
2329               kc::Thread::sleep(0.1);
2330             }
2331           }
2332           serv->log(kt::ThreadedServer::Logger::SYSTEM, "a slave was disconnected: sid=%u", sid);
2333           if (!ulrd.close()) {
2334             serv->log(kt::ThreadedServer::Logger::ERROR, "closing an update log reader failed");
2335             err = true;
2336           }
2337         } else {
2338           err = true;
2339         }
2340       } else {
2341         serv->log(kt::ThreadedServer::Logger::ERROR, "opening an update log reader failed");
2342         char c = kt::RemoteDB::BMERROR;
2343         sess->send(&c, 1);
2344         err = true;
2345       }
2346     } else {
2347       char c = kt::RemoteDB::BMERROR;
2348       sess->send(&c, 1);
2349       serv->log(kt::ThreadedServer::Logger::INFO, "no update log allows no replication");
2350       err = true;
2351     }
2352     return !err;
2353   }
2354   // process the binary play_script command
do_bin_play_script(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2355   bool do_bin_play_script(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2356     uint32_t thid = sess->thread_id();
2357     char tbuf[sizeof(uint32_t)+sizeof(uint32_t)+sizeof(uint32_t)];
2358     if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2359     const char* rp = tbuf;
2360     uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2361     rp += sizeof(flags);
2362     uint32_t nsiz = kc::readfixnum(rp, sizeof(nsiz));
2363     rp += sizeof(nsiz);
2364     uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2365     rp += sizeof(rnum);
2366     if (nsiz > kt::RemoteDB::DATAMAXSIZ) return false;
2367     bool norep = flags & kt::RemoteDB::BONOREPLY;
2368     bool err = false;
2369     char nstack[kc::NUMBUFSIZ+RECBUFSIZ];
2370     char* nbuf = nsiz + 1 > sizeof(nstack) ? new char[nsiz+1] : nstack;
2371     if (sess->receive(nbuf, nsiz)) {
2372       nbuf[nsiz] = '\0';
2373       char stack[kc::NUMBUFSIZ+RECBUFSIZ*4];
2374       std::map<std::string, std::string> scrinmap;
2375       for (uint32_t i = 0; !err && i < rnum; i++) {
2376         char hbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2377         if (sess->receive(hbuf, sizeof(hbuf))) {
2378           rp = hbuf;
2379           uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2380           rp += sizeof(ksiz);
2381           uint32_t vsiz = kc::readfixnum(rp, sizeof(vsiz));
2382           rp += sizeof(vsiz);
2383           if (ksiz <= kt::RemoteDB::DATAMAXSIZ && vsiz <= kt::RemoteDB::DATAMAXSIZ) {
2384             size_t rsiz = ksiz + vsiz;
2385             char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
2386             if (sess->receive(rbuf, rsiz)) {
2387               std::string key(rbuf, ksiz);
2388               std::string value(rbuf + ksiz, vsiz);
2389               scrinmap[key] = value;
2390             } else {
2391               err = true;
2392             }
2393             if (rbuf != stack) delete[] rbuf;
2394           } else {
2395             err = true;
2396           }
2397         } else {
2398           err = true;
2399         }
2400       }
2401       if (!err) {
2402         if (scrprocs_) {
2403           ScriptProcessor* scrproc = scrprocs_ + thid;
2404           opcounts_[thid][CNTSCRIPT]++;
2405           std::map<std::string, std::string> scroutmap;
2406           RV rv = scrproc->call(nbuf, scrinmap, scroutmap);
2407           if (rv == kt::RPCClient::RVSUCCESS) {
2408             size_t osiz = 1 + sizeof(uint32_t);
2409             std::map<std::string, std::string>::iterator it = scroutmap.begin();
2410             std::map<std::string, std::string>::iterator itend = scroutmap.end();
2411             while (it != itend) {
2412               osiz += sizeof(uint32_t) + sizeof(uint32_t) + it->first.size() + it->second.size();
2413               ++it;
2414             }
2415             char* obuf = new char[osiz];
2416             char* wp = obuf;
2417             *(wp++) = kt::RemoteDB::BMPLAYSCRIPT;
2418             kc::writefixnum(wp, scroutmap.size(), sizeof(uint32_t));
2419             wp += sizeof(uint32_t);
2420             it = scroutmap.begin();
2421             itend = scroutmap.end();
2422             while (it != itend) {
2423               kc::writefixnum(wp, it->first.size(), sizeof(uint32_t));
2424               wp += sizeof(uint32_t);
2425               kc::writefixnum(wp, it->second.size(), sizeof(uint32_t));
2426               wp += sizeof(uint32_t);
2427               std::memcpy(wp, it->first.data(), it->first.size());
2428               wp += it->first.size();
2429               std::memcpy(wp, it->second.data(), it->second.size());
2430               wp += it->second.size();
2431               ++it;
2432             }
2433             if (!norep && !sess->send(obuf, osiz)) err = true;
2434             delete[] obuf;
2435           } else {
2436             char c = kt::RemoteDB::BMERROR;
2437             if (!norep) sess->send(&c, 1);
2438           }
2439         } else {
2440           char c = kt::RemoteDB::BMERROR;
2441           if (!norep) sess->send(&c, 1);
2442         }
2443       }
2444     }
2445     if (nbuf != nstack) delete[] nbuf;
2446     return !err;
2447   }
2448   // process the binary set_bulk command
do_bin_set_bulk(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2449   bool do_bin_set_bulk(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2450     uint32_t thid = sess->thread_id();
2451     char tbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2452     if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2453     const char* rp = tbuf;
2454     uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2455     rp += sizeof(flags);
2456     uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2457     rp += sizeof(rnum);
2458     bool norep = flags & kt::RemoteDB::BONOREPLY;
2459     bool err = false;
2460     uint32_t hits = 0;
2461     char stack[kc::NUMBUFSIZ+RECBUFSIZ*4];
2462     for (uint32_t i = 0; !err && i < rnum; i++) {
2463       char hbuf[sizeof(uint16_t)+sizeof(uint32_t)+sizeof(uint32_t)+sizeof(int64_t)];
2464       if (sess->receive(hbuf, sizeof(hbuf))) {
2465         rp = hbuf;
2466         uint16_t dbidx = kc::readfixnum(rp, sizeof(dbidx));
2467         rp += sizeof(dbidx);
2468         uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2469         rp += sizeof(ksiz);
2470         uint32_t vsiz = kc::readfixnum(rp, sizeof(vsiz));
2471         rp += sizeof(vsiz);
2472         int64_t xt = kc::readfixnum(rp, sizeof(xt));
2473         rp += sizeof(xt);
2474         if (ksiz <= kt::RemoteDB::DATAMAXSIZ && vsiz <= kt::RemoteDB::DATAMAXSIZ) {
2475           size_t rsiz = ksiz + vsiz;
2476           char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
2477           if (sess->receive(rbuf, rsiz)) {
2478             if (dbidx < dbnum_) {
2479               kt::TimedDB* db = dbs_ + dbidx;
2480               opcounts_[thid][CNTSET]++;
2481               if (db->set(rbuf, ksiz, rbuf + ksiz, vsiz, xt)) {
2482                 hits++;
2483               } else {
2484                 opcounts_[thid][CNTSETMISS]++;
2485                 err = true;
2486               }
2487             }
2488           } else {
2489             err = true;
2490           }
2491           if (rbuf != stack) delete[] rbuf;
2492         } else {
2493           err = true;
2494         }
2495       } else {
2496         err = true;
2497       }
2498     }
2499     if (err) {
2500       char c = kt::RemoteDB::BMERROR;
2501       if (!norep) sess->send(&c, 1);
2502     } else {
2503       char hbuf[1+sizeof(hits)];
2504       char* wp = hbuf;
2505       *(wp++) = kt::RemoteDB::BMSETBULK;
2506       kc::writefixnum(wp, hits, sizeof(hits));
2507       if (!norep && !sess->send(hbuf, sizeof(hbuf))) err = true;
2508     }
2509     return !err;
2510   }
2511   // process the binary remove_bulk command
do_bin_remove_bulk(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2512   bool do_bin_remove_bulk(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2513     uint32_t thid = sess->thread_id();
2514     char tbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2515     if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2516     const char* rp = tbuf;
2517     uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2518     rp += sizeof(flags);
2519     uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2520     rp += sizeof(rnum);
2521     bool norep = flags & kt::RemoteDB::BONOREPLY;
2522     bool err = false;
2523     uint32_t hits = 0;
2524     char stack[kc::NUMBUFSIZ+RECBUFSIZ*2];
2525     for (uint32_t i = 0; !err && i < rnum; i++) {
2526       char hbuf[sizeof(uint16_t)+sizeof(uint32_t)];
2527       if (sess->receive(hbuf, sizeof(hbuf))) {
2528         rp = hbuf;
2529         uint16_t dbidx = kc::readfixnum(rp, sizeof(dbidx));
2530         rp += sizeof(dbidx);
2531         uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2532         rp += sizeof(ksiz);
2533         if (ksiz <= kt::RemoteDB::DATAMAXSIZ) {
2534           char* kbuf = ksiz > sizeof(stack) ? new char[ksiz] : stack;
2535           if (sess->receive(kbuf, ksiz)) {
2536             if (dbidx < dbnum_) {
2537               kt::TimedDB* db = dbs_ + dbidx;
2538               opcounts_[thid][CNTREMOVE]++;
2539               if (db->remove(kbuf, ksiz)) {
2540                 hits++;
2541               } else {
2542                 opcounts_[thid][CNTREMOVEMISS]++;
2543                 if (db->error() != kc::BasicDB::Error::NOREC) err = true;
2544               }
2545             }
2546           } else {
2547             err = true;
2548           }
2549           if (kbuf != stack) delete[] kbuf;
2550         } else {
2551           err = true;
2552         }
2553       } else {
2554         err = true;
2555       }
2556     }
2557     if (err) {
2558       char c = kt::RemoteDB::BMERROR;
2559       if (!norep) sess->send(&c, 1);
2560     } else {
2561       char hbuf[1+sizeof(hits)];
2562       char* wp = hbuf;
2563       *(wp++) = kt::RemoteDB::BMREMOVEBULK;
2564       kc::writefixnum(wp, hits, sizeof(hits));
2565       if (!norep && !sess->send(hbuf, sizeof(hbuf))) err = true;
2566     }
2567     return !err;
2568   }
2569   // process the binary get_bulk command
do_bin_get_bulk(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2570   bool do_bin_get_bulk(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2571     uint32_t thid = sess->thread_id();
2572     char tbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2573     if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2574     const char* rp = tbuf;
2575     uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2576     rp += sizeof(flags);
2577     uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2578     rp += sizeof(rnum);
2579     bool err = false;
2580     uint32_t hits = 0;
2581     char stack[kc::NUMBUFSIZ+RECBUFSIZ*2];
2582     size_t oasiz = kc::NUMBUFSIZ + RECBUFSIZ * 2;
2583     char* obuf = (char*)kc::xmalloc(oasiz);
2584     size_t osiz = 1 + sizeof(uint32_t);
2585     std::memset(obuf, 0, osiz);
2586     for (uint32_t i = 0; !err && i < rnum; i++) {
2587       char hbuf[sizeof(uint16_t)+sizeof(uint32_t)];
2588       if (sess->receive(hbuf, sizeof(hbuf))) {
2589         rp = hbuf;
2590         uint16_t dbidx = kc::readfixnum(rp, sizeof(dbidx));
2591         rp += sizeof(dbidx);
2592         uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2593         rp += sizeof(ksiz);
2594         if (ksiz <= kt::RemoteDB::DATAMAXSIZ) {
2595           char* kbuf = ksiz > sizeof(stack) ? new char[ksiz] : stack;
2596           if (sess->receive(kbuf, ksiz)) {
2597             if (dbidx < dbnum_) {
2598               kt::TimedDB* db = dbs_ + dbidx;
2599               opcounts_[thid][CNTGET]++;
2600               size_t vsiz;
2601               int64_t xt;
2602               char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
2603               if (vbuf) {
2604                 hits++;
2605                 size_t usiz = sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) +
2606                     sizeof(int64_t) + ksiz + vsiz;
2607                 if (osiz + usiz > oasiz) {
2608                   oasiz = oasiz * 2 + usiz;
2609                   obuf = (char*)kc::xrealloc(obuf, oasiz);
2610                 }
2611                 kc::writefixnum(obuf + osiz, dbidx, sizeof(uint16_t));
2612                 osiz += sizeof(uint16_t);
2613                 kc::writefixnum(obuf + osiz, ksiz, sizeof(uint32_t));
2614                 osiz += sizeof(uint32_t);
2615                 kc::writefixnum(obuf + osiz, vsiz, sizeof(uint32_t));
2616                 osiz += sizeof(uint32_t);
2617                 kc::writefixnum(obuf + osiz, xt, sizeof(int64_t));
2618                 osiz += sizeof(int64_t);
2619                 std::memcpy(obuf + osiz, kbuf, ksiz);
2620                 osiz += ksiz;
2621                 std::memcpy(obuf + osiz, vbuf, vsiz);
2622                 osiz += vsiz;
2623                 delete[] vbuf;
2624               } else {
2625                 opcounts_[thid][CNTGETMISS]++;
2626                 if (db->error() != kc::BasicDB::Error::NOREC) err = true;
2627               }
2628             }
2629           } else {
2630             err = true;
2631           }
2632           if (kbuf != stack) delete[] kbuf;
2633         } else {
2634           err = true;
2635         }
2636       } else {
2637         err = true;
2638       }
2639     }
2640     if (err) {
2641       char c = kt::RemoteDB::BMERROR;
2642       sess->send(&c, 1);
2643     } else {
2644       *obuf = kt::RemoteDB::BMGETBULK;
2645       kc::writefixnum(obuf + 1, hits, sizeof(hits));
2646       if (!sess->send(obuf, osiz)) err = true;
2647     }
2648     kc::xfree(obuf);
2649     return !err;
2650   }
2651   // session local storage
2652   class SLS : public kt::RPCServer::Session::Data {
2653     friend class Worker;
2654    private:
SLS()2655     SLS() : curs_() {}
~SLS()2656     ~SLS() {
2657       std::map<int64_t, kt::TimedDB::Cursor*>::iterator it = curs_.begin();
2658       std::map<int64_t, kt::TimedDB::Cursor*>::iterator itend = curs_.end();
2659       while (it != itend) {
2660         kt::TimedDB::Cursor* cur = it->second;
2661         delete cur;
2662         ++it;
2663       }
2664     }
create(kt::RPCServer::Session * sess)2665     static SLS* create(kt::RPCServer::Session* sess) {
2666       SLS* sls = (SLS*)sess->data();
2667       if (!sls) {
2668         sls = new SLS;
2669         sess->set_data(sls);
2670       }
2671       return sls;
2672     }
2673     std::map<int64_t, kt::TimedDB::Cursor*> curs_;
2674   };
2675   int32_t thnum_;
2676   kc::CondMap* const condmap_;
2677   kt::TimedDB* const dbs_;
2678   const int32_t dbnum_;
2679   const std::map<std::string, int32_t>& dbmap_;
2680   const int32_t omode_;
2681   const double asi_;
2682   const bool ash_;
2683   const char* const bgspath_;
2684   const double bgsi_;
2685   kc::Compressor* const bgscomp_;
2686   kt::UpdateLogger* const ulog_;
2687   DBUpdateLogger* const ulogdbs_;
2688   const char* const cmdpath_;
2689   ScriptProcessor* const scrprocs_;
2690   OpCount* const opcounts_;
2691   uint64_t idlecnt_;
2692   double asnext_;
2693   double bgsnext_;
2694   Slave* slave_;
2695 };
2696 
2697 
2698 // main routine
main(int argc,char ** argv)2699 int main(int argc, char** argv) {
2700   g_progname = argv[0];
2701   g_procid = kc::getpid();
2702   g_starttime = kc::time();
2703   kc::setstdiobin();
2704   kt::setkillsignalhandler(killserver);
2705   if (argc > 1 && !std::strcmp(argv[1], "--version")) {
2706     printversion();
2707     return 0;
2708   }
2709   int32_t rv = run(argc, argv);
2710   return rv;
2711 }
2712 
2713 
2714 // print the usage and exit
usage()2715 static void usage() {
2716   eprintf("%s: Kyoto Tycoon: a handy cache/storage server\n", g_progname);
2717   eprintf("\n");
2718   eprintf("usage:\n");
2719   eprintf("  %s [-host str] [-port num] [-tout num] [-th num] [-log file] [-li|-ls|-le|-lz]"
2720           " [-ulog dir] [-ulim num] [-uasi num] [-sid num] [-ord] [-oat|-oas|-onl|-otl|-onr]"
2721           " [-asi num] [-ash] [-bgs dir] [-bgsi num] [-bgc str]"
2722           " [-dmn] [-pid file] [-cmd dir] [-scr file]"
2723           " [-mhost str] [-mport num] [-rts file] [-riv num]"
2724           " [-plsv file] [-plex str] [-pldb file] [db...]\n", g_progname);
2725   eprintf("\n");
2726   std::exit(1);
2727 }
2728 
2729 
2730 // kill the running server
killserver(int signum)2731 static void killserver(int signum) {
2732   if (g_serv) {
2733     g_serv->stop();
2734     g_serv = NULL;
2735     if (g_daemon && signum == SIGHUP) g_restart = true;
2736     if (signum == SIGUSR1) g_restart = true;
2737   }
2738 }
2739 
2740 
2741 // parse arguments of the command
run(int argc,char ** argv)2742 static int32_t run(int argc, char** argv) {
2743   bool argbrk = false;
2744   std::vector<std::string> dbpaths;
2745   const char* host = NULL;
2746   int32_t port = kt::DEFPORT;
2747   double tout = DEFTOUT;
2748   int32_t thnum = DEFTHNUM;
2749   const char* logpath = NULL;
2750   uint32_t logkinds = kc::UINT32MAX;
2751   const char* ulogpath = NULL;
2752   int64_t ulim = DEFULIM;
2753   double uasi = 0;
2754   int32_t sid = -1;
2755   int32_t omode = kc::BasicDB::OWRITER | kc::BasicDB::OCREATE;
2756   double asi = 0;
2757   bool ash = false;
2758   const char* bgspath = NULL;
2759   double bgsi = DEFBGSI;
2760   kc::Compressor* bgscomp = NULL;
2761   bool dmn = false;
2762   const char* pidpath = NULL;
2763   const char* cmdpath = NULL;
2764   const char* scrpath = NULL;
2765   const char* mhost = NULL;
2766   int32_t mport = kt::DEFPORT;
2767   const char* rtspath = NULL;
2768   double riv = DEFRIV;
2769   const char* plsvpath = NULL;
2770   const char* plsvex = "";
2771   const char* pldbpath = NULL;
2772   for (int32_t i = 1; i < argc; i++) {
2773     if (!argbrk && argv[i][0] == '-') {
2774       if (!std::strcmp(argv[i], "--")) {
2775         argbrk = true;
2776       } else if (!std::strcmp(argv[i], "-host")) {
2777         if (++i >= argc) usage();
2778         host = argv[i];
2779       } else if (!std::strcmp(argv[i], "-port")) {
2780         if (++i >= argc) usage();
2781         port = kc::atoix(argv[i]);
2782       } else if (!std::strcmp(argv[i], "-tout")) {
2783         if (++i >= argc) usage();
2784         tout = kc::atof(argv[i]);
2785       } else if (!std::strcmp(argv[i], "-th")) {
2786         if (++i >= argc) usage();
2787         thnum = kc::atof(argv[i]);
2788       } else if (!std::strcmp(argv[i], "-log")) {
2789         if (++i >= argc) usage();
2790         logpath = argv[i];
2791       } else if (!std::strcmp(argv[i], "-li")) {
2792         logkinds = Logger::INFO | Logger::SYSTEM | Logger::ERROR;
2793       } else if (!std::strcmp(argv[i], "-ls")) {
2794         logkinds = Logger::SYSTEM | Logger::ERROR;
2795       } else if (!std::strcmp(argv[i], "-le")) {
2796         logkinds = Logger::ERROR;
2797       } else if (!std::strcmp(argv[i], "-lz")) {
2798         logkinds = 0;
2799       } else if (!std::strcmp(argv[i], "-ulog")) {
2800         if (++i >= argc) usage();
2801         ulogpath = argv[i];
2802       } else if (!std::strcmp(argv[i], "-ulim")) {
2803         if (++i >= argc) usage();
2804         ulim = kc::atoix(argv[i]);
2805       } else if (!std::strcmp(argv[i], "-uasi")) {
2806         if (++i >= argc) usage();
2807         uasi = kc::atof(argv[i]);
2808       } else if (!std::strcmp(argv[i], "-sid")) {
2809         if (++i >= argc) usage();
2810         sid = kc::atoix(argv[i]);
2811       } else if (!std::strcmp(argv[i], "-ord")) {
2812         omode &= ~kc::BasicDB::OWRITER;
2813         omode |= kc::BasicDB::OREADER;
2814       } else if (!std::strcmp(argv[i], "-oat")) {
2815         omode |= kc::BasicDB::OAUTOTRAN;
2816       } else if (!std::strcmp(argv[i], "-oas")) {
2817         omode |= kc::BasicDB::OAUTOSYNC;
2818       } else if (!std::strcmp(argv[i], "-onl")) {
2819         omode |= kc::BasicDB::ONOLOCK;
2820       } else if (!std::strcmp(argv[i], "-otl")) {
2821         omode |= kc::BasicDB::OTRYLOCK;
2822       } else if (!std::strcmp(argv[i], "-onr")) {
2823         omode |= kc::BasicDB::ONOREPAIR;
2824       } else if (!std::strcmp(argv[i], "-asi")) {
2825         if (++i >= argc) usage();
2826         asi = kc::atof(argv[i]);
2827       } else if (!std::strcmp(argv[i], "-ash")) {
2828         ash = true;
2829       } else if (!std::strcmp(argv[i], "-bgs")) {
2830         if (++i >= argc) usage();
2831         bgspath = argv[i];
2832       } else if (!std::strcmp(argv[i], "-bgsi")) {
2833         if (++i >= argc) usage();
2834         bgsi = kc::atof(argv[i]);
2835       } else if (!std::strcmp(argv[i], "-bgsc")) {
2836         if (++i >= argc) usage();
2837         const char* cn = argv[i];
2838         if (!kc::stricmp(cn, "zlib") || !kc::stricmp(cn, "gz")) {
2839           bgscomp = new kc::ZLIBCompressor<kc::ZLIB::RAW>;
2840         } else if (!kc::stricmp(cn, "lzo") || !kc::stricmp(cn, "oz")) {
2841           bgscomp = new kc::LZOCompressor<kc::LZO::RAW>;
2842         } else if (!kc::stricmp(cn, "lzma") || !kc::stricmp(cn, "xz")) {
2843           bgscomp = new kc::LZMACompressor<kc::LZMA::RAW>;
2844         }
2845       } else if (!std::strcmp(argv[i], "-dmn")) {
2846         dmn = true;
2847       } else if (!std::strcmp(argv[i], "-pid")) {
2848         if (++i >= argc) usage();
2849         pidpath = argv[i];
2850       } else if (!std::strcmp(argv[i], "-cmd")) {
2851         if (++i >= argc) usage();
2852         cmdpath = argv[i];
2853       } else if (!std::strcmp(argv[i], "-scr")) {
2854         if (++i >= argc) usage();
2855         scrpath = argv[i];
2856       } else if (!std::strcmp(argv[i], "-mhost")) {
2857         if (++i >= argc) usage();
2858         mhost = argv[i];
2859       } else if (!std::strcmp(argv[i], "-mport")) {
2860         if (++i >= argc) usage();
2861         mport = kc::atoix(argv[i]);
2862       } else if (!std::strcmp(argv[i], "-rts")) {
2863         if (++i >= argc) usage();
2864         rtspath = argv[i];
2865       } else if (!std::strcmp(argv[i], "-riv")) {
2866         if (++i >= argc) usage();
2867         riv = kc::atof(argv[i]);
2868       } else if (!std::strcmp(argv[i], "-plsv")) {
2869         if (++i >= argc) usage();
2870         plsvpath = argv[i];
2871       } else if (!std::strcmp(argv[i], "-plex")) {
2872         if (++i >= argc) usage();
2873         plsvex = argv[i];
2874       } else if (!std::strcmp(argv[i], "-pldb")) {
2875         if (++i >= argc) usage();
2876         pldbpath = argv[i];
2877       } else {
2878         usage();
2879       }
2880     } else {
2881       argbrk = true;
2882       dbpaths.push_back(argv[i]);
2883     }
2884   }
2885   if (port < 1 || thnum < 1 || mport < 1) usage();
2886   if (thnum > THREADMAX) thnum = THREADMAX;
2887   if (dbpaths.empty()) {
2888     if (pldbpath) usage();
2889     dbpaths.push_back(":");
2890   }
2891   int32_t rv = proc(dbpaths, host, port, tout, thnum, logpath, logkinds,
2892                     ulogpath, ulim, uasi, sid, omode, asi, ash, bgspath, bgsi, bgscomp,
2893                     dmn, pidpath, cmdpath, scrpath, mhost, mport, rtspath, riv,
2894                     plsvpath, plsvex, pldbpath);
2895   delete bgscomp;
2896   return rv;
2897 }
2898 
2899 
2900 // drive the server process
proc(const std::vector<std::string> & dbpaths,const char * host,int32_t port,double tout,int32_t thnum,const char * logpath,uint32_t logkinds,const char * ulogpath,int64_t ulim,double uasi,int32_t sid,int32_t omode,double asi,bool ash,const char * bgspath,double bgsi,kc::Compressor * bgscomp,bool dmn,const char * pidpath,const char * cmdpath,const char * scrpath,const char * mhost,int32_t mport,const char * rtspath,double riv,const char * plsvpath,const char * plsvex,const char * pldbpath)2901 static int32_t proc(const std::vector<std::string>& dbpaths,
2902                     const char* host, int32_t port, double tout, int32_t thnum,
2903                     const char* logpath, uint32_t logkinds,
2904                     const char* ulogpath, int64_t ulim, double uasi,
2905                     int32_t sid, int32_t omode, double asi, bool ash,
2906                     const char* bgspath, double bgsi, kc::Compressor* bgscomp, bool dmn,
2907                     const char* pidpath, const char* cmdpath, const char* scrpath,
2908                     const char* mhost, int32_t mport, const char* rtspath, double riv,
2909                     const char* plsvpath, const char* plsvex, const char* pldbpath) {
2910   g_daemon = false;
2911   if (dmn) {
2912     if (kc::File::PATHCHR == '/') {
2913       if (logpath && *logpath != kc::File::PATHCHR) {
2914         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, logpath);
2915         return 1;
2916       }
2917       if (ulogpath && *ulogpath != kc::File::PATHCHR) {
2918         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, ulogpath);
2919         return 1;
2920       }
2921       if (bgspath && *bgspath != kc::File::PATHCHR) {
2922         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, bgspath);
2923         return 1;
2924       }
2925       if (pidpath && *pidpath != kc::File::PATHCHR) {
2926         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, pidpath);
2927         return 1;
2928       }
2929       if (cmdpath && *cmdpath != kc::File::PATHCHR) {
2930         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, cmdpath);
2931         return 1;
2932       }
2933       if (scrpath && *scrpath != kc::File::PATHCHR) {
2934         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, scrpath);
2935         return 1;
2936       }
2937       if (rtspath && *rtspath != kc::File::PATHCHR) {
2938         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, rtspath);
2939         return 1;
2940       }
2941       if (plsvpath && *plsvpath != kc::File::PATHCHR) {
2942         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, plsvpath);
2943         return 1;
2944       }
2945       if (pldbpath && *pldbpath != kc::File::PATHCHR) {
2946         eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, pldbpath);
2947         return 1;
2948       }
2949     }
2950     if (!kt::daemonize()) {
2951       eprintf("%s: switching to a daemon failed\n", g_progname);
2952       return 1;
2953     }
2954     g_procid = kc::getpid();
2955     g_daemon = true;
2956   }
2957   if (ulogpath && sid < 0) {
2958     eprintf("%s: update log requires the server ID\n", g_progname);
2959     return 1;
2960   }
2961   if (!cmdpath) cmdpath = kc::File::CDIRSTR;
2962   if (mhost) {
2963     if (sid < 0) {
2964       eprintf("%s: replication requires the server ID\n", g_progname);
2965       return 1;
2966     }
2967     if (!rtspath) {
2968       eprintf("%s: replication requires the replication time stamp file\n", g_progname);
2969       return 1;
2970     }
2971   }
2972   if (sid < 0) sid = 0;
2973   kc::File::Status sbuf;
2974   if (bgspath && !kc::File::status(bgspath, &sbuf) && !kc::File::make_directory(bgspath)) {
2975     eprintf("%s: %s: could not open the directory\n", g_progname, bgspath);
2976     return 1;
2977   }
2978   if (!kc::File::status(cmdpath, &sbuf) || !sbuf.isdir) {
2979     eprintf("%s: %s: no such directory\n", g_progname, cmdpath);
2980     return 1;
2981   }
2982   if (scrpath && !kc::File::status(scrpath)) {
2983     eprintf("%s: %s: no such file\n", g_progname, scrpath);
2984     return 1;
2985   }
2986   if (dbpaths.size() > (size_t)OPENDBMAX) {
2987     eprintf("%s: too much databases\n", g_progname);
2988     return 1;
2989   }
2990   kt::RPCServer serv;
2991   Logger logger;
2992   if (!logger.open(logpath)) {
2993     eprintf("%s: %s: could not open the log file\n", g_progname, logpath ? logpath : "-");
2994     return 1;
2995   }
2996   serv.set_logger(&logger, logkinds);
2997   serv.log(Logger::SYSTEM, "================ [START]: pid=%d", g_procid);
2998   std::string addr = "";
2999   if (host) {
3000     addr = kt::Socket::get_host_address(host);
3001     if (addr.empty()) {
3002       serv.log(Logger::ERROR, "unknown host: %s", host);
3003       return 1;
3004     }
3005   }
3006   kt::SharedLibrary pldblib;
3007   kt::KTDBINIT pldbinit = NULL;
3008   if (pldbpath) {
3009     serv.log(Logger::SYSTEM, "loading a plug-in database file: path=%s", pldbpath);
3010     if (!pldblib.open(pldbpath)) {
3011       serv.log(Logger::ERROR, "could not load a plug-in database file: %s", pldbpath);
3012       return 1;
3013     }
3014     pldbinit = (kt::KTDBINIT)pldblib.symbol(kt::KTDBINITNAME);
3015     if (!pldbinit) {
3016       serv.log(Logger::ERROR, "could not find the initializer: %s: %s",
3017                pldbpath, kt::KTDBINITNAME);
3018       return 1;
3019     }
3020   }
3021   std::string expr = kc::strprintf("%s:%d", addr.c_str(), port);
3022   serv.set_network(expr, tout);
3023   int32_t dbnum = dbpaths.size();
3024   kt::UpdateLogger* ulog = NULL;
3025   DBUpdateLogger* ulogdbs = NULL;
3026   if (ulogpath) {
3027     ulog = new kt::UpdateLogger;
3028     serv.log(Logger::SYSTEM, "opening the update log: path=%s sid=%u", ulogpath, sid);
3029     if (!ulog->open(ulogpath, ulim, uasi)) {
3030       serv.log(Logger::ERROR, "could not open the update log: %s", ulogpath);
3031       delete ulog;
3032       return 1;
3033     }
3034     ulogdbs = new DBUpdateLogger[dbnum];
3035   }
3036   kt::TimedDB* dbs = new kt::TimedDB[dbnum];
3037   DBLogger dblogger(&logger, logkinds);
3038   std::map<std::string, int32_t> dbmap;
3039   for (int32_t i = 0; i < dbnum; i++) {
3040     const std::string& dbpath = dbpaths[i];
3041     serv.log(Logger::SYSTEM, "opening a database: path=%s", dbpath.c_str());
3042     if (logkinds != 0)
3043       dbs[i].tune_logger(&dblogger, kc::BasicDB::Logger::WARN | kc::BasicDB::Logger::ERROR);
3044     if (ulog) {
3045       ulogdbs[i].initialize(ulog, sid, i);
3046       dbs[i].tune_update_trigger(ulogdbs + i);
3047     }
3048     if (pldbinit) dbs[i].set_internal_db(pldbinit());
3049     if (!dbs[i].open(dbpath, omode)) {
3050       const kc::BasicDB::Error& e = dbs[i].error();
3051       serv.log(Logger::ERROR, "could not open a database file: %s: %s: %s",
3052                dbpath.c_str(), e.name(), e.message());
3053       delete[] dbs;
3054       delete[] ulogdbs;
3055       delete ulog;
3056       return 1;
3057     }
3058     std::string path = dbs[i].path();
3059     const char* rp = path.c_str();
3060     const char* pv = std::strrchr(rp, kc::File::PATHCHR);
3061     if (pv) rp = pv + 1;
3062     dbmap[rp] = i;
3063   }
3064   if (bgspath) {
3065     kc::DirStream dir;
3066     if (dir.open(bgspath)) {
3067       std::string name;
3068       while (dir.read(&name)) {
3069         const char* nstr = name.c_str();
3070         const char* pv = std::strrchr(nstr, kc::File::EXTCHR);
3071         int32_t idx = kc::atoi(nstr);
3072         if (*nstr >= '0' && *nstr <= '9' && pv && !kc::stricmp(pv + 1, BGSPATHEXT) &&
3073             idx >= 0 && idx < dbnum) {
3074           std::string path;
3075           kc::strprintf(&path, "%s%c%s", bgspath, kc::File::PATHCHR, nstr);
3076           uint64_t ssts;
3077           int64_t sscount, sssize;
3078           if (kt::TimedDB::status_snapshot_atomic(path, &ssts, &sscount, &sssize)) {
3079             serv.log(Logger::SYSTEM,
3080                      "applying a snapshot file: db=%d ts=%llu count=%lld size=%lld",
3081                      idx, (unsigned long long)ssts, (long long)sscount, (long long)sssize);
3082             if (!dbs[idx].load_snapshot_atomic(path, bgscomp)) {
3083               const kc::BasicDB::Error& e = dbs[idx].error();
3084               serv.log(Logger::ERROR, "could not apply a snapshot: %s: %s",
3085                        e.name(), e.message());
3086             }
3087           }
3088         }
3089       }
3090       dir.close();
3091     }
3092   }
3093   ScriptProcessor* scrprocs = NULL;
3094   if (scrpath) {
3095     serv.log(Logger::SYSTEM, "loading a script file: path=%s", scrpath);
3096     scrprocs = new ScriptProcessor[thnum];
3097     for (int32_t i = 0; i < thnum; i++) {
3098       if (!scrprocs[i].set_resources(i, &serv, dbs, dbnum, &dbmap)) {
3099         serv.log(Logger::ERROR, "could not initialize the scripting processor");
3100         delete[] scrprocs;
3101         delete[] dbs;
3102         delete[] ulogdbs;
3103         delete ulog;
3104         return 1;
3105       }
3106       if (!scrprocs[i].load(scrpath))
3107         serv.log(Logger::ERROR, "could not load a script file: %s", scrpath);
3108     }
3109   }
3110   kt::SharedLibrary plsvlib;
3111   kt::PluggableServer* plsv = NULL;
3112   if (plsvpath) {
3113     serv.log(Logger::SYSTEM, "loading a plug-in server file: path=%s", plsvpath);
3114     if (!plsvlib.open(plsvpath)) {
3115       serv.log(Logger::ERROR, "could not load a plug-in server file: %s", plsvpath);
3116       delete[] scrprocs;
3117       delete[] dbs;
3118       delete[] ulogdbs;
3119       delete ulog;
3120       return 1;
3121     }
3122     kt::KTSERVINIT init = (kt::KTSERVINIT)plsvlib.symbol(kt::KTSERVINITNAME);
3123     if (!init) {
3124       serv.log(Logger::ERROR, "could not find the initializer: %s: %s",
3125                plsvpath, kt::KTSERVINITNAME);
3126       delete[] scrprocs;
3127       delete[] dbs;
3128       delete[] ulogdbs;
3129       delete ulog;
3130       return 1;
3131     }
3132     plsv = init();
3133     plsv->configure(dbs, dbnum, &logger, logkinds, plsvex);
3134   }
3135   OpCount* opcounts = new OpCount[thnum];
3136   for (int32_t i = 0; i < thnum; i++) {
3137     for (int32_t j = 0; j <= CNTMISC; j++) {
3138       opcounts[i][j] = 0;
3139     }
3140   }
3141   kc::CondMap condmap;
3142   Worker worker(thnum, &condmap, dbs, dbnum, dbmap, omode, asi, ash, bgspath, bgsi, bgscomp,
3143                 ulog, ulogdbs, cmdpath, scrprocs, opcounts);
3144   serv.set_worker(&worker, thnum);
3145   if (pidpath) {
3146     char numbuf[kc::NUMBUFSIZ];
3147     size_t nsiz = std::sprintf(numbuf, "%d\n", g_procid);
3148     kc::File::write_file(pidpath, numbuf, nsiz);
3149   }
3150   bool err = false;
3151   while (true) {
3152     g_restart = false;
3153     g_serv = &serv;
3154     Slave slave(sid, rtspath, mhost, mport, riv, &serv, dbs, dbnum, ulog, ulogdbs);
3155     slave.start();
3156     worker.set_misc_conf(&slave);
3157     PlugInDriver pldriver(plsv);
3158     if (plsv) pldriver.start();
3159     if (serv.start()) {
3160       condmap.broadcast_all();
3161       if (!serv.finish()) err = true;
3162     } else {
3163       err = true;
3164     }
3165     kc::Thread::sleep(0.5);
3166     if (plsv) {
3167       plsv->stop();
3168       pldriver.join();
3169       if (pldriver.error()) err = true;
3170       kc::Thread::sleep(0.1);
3171     }
3172     slave.stop();
3173     slave.join();
3174     if (!g_restart || err) break;
3175     logger.close();
3176     if (!logger.open(logpath)) {
3177       eprintf("%s: %s: could not open the log file\n", g_progname, logpath ? logpath : "-");
3178       err = true;
3179       break;
3180     }
3181     if (scrprocs) {
3182       serv.log(Logger::SYSTEM, "reloading a script file: path=%s", scrpath);
3183       for (int32_t i = 0; i < thnum; i++) {
3184         scrprocs[i].clear();
3185         if (!scrprocs[i].set_resources(i, &serv, dbs, dbnum, &dbmap)) {
3186           serv.log(Logger::ERROR, "could not initialize the scripting processor");
3187           err = true;
3188           break;
3189         }
3190         if (!scrprocs[i].load(scrpath))
3191           serv.log(Logger::ERROR, "could not load a script file: %s", scrpath);
3192       }
3193     }
3194     if (err) break;
3195   }
3196   if (pidpath) kc::File::remove(pidpath);
3197   delete[] opcounts;
3198   if (plsv) {
3199     delete plsv;
3200     if (!plsvlib.close()) {
3201       eprintf("%s: closing a shared library failed\n", g_progname);
3202       err = true;
3203     }
3204   }
3205   if (bgspath) {
3206     serv.log(Logger::SYSTEM, "snapshotting databases");
3207     if (!dosnapshot(bgspath, bgscomp, dbs, dbnum, &serv)) err = true;
3208   }
3209   delete[] scrprocs;
3210   for (int32_t i = 0; i < dbnum; i++) {
3211     const std::string& dbpath = dbpaths[i];
3212     serv.log(Logger::SYSTEM, "closing a database: path=%s", dbpath.c_str());
3213     if (!dbs[i].close()) {
3214       const kc::BasicDB::Error& e = dbs[i].error();
3215       serv.log(Logger::ERROR, "could not close a database file: %s: %s: %s",
3216                dbpath.c_str(), e.name(), e.message());
3217       err = true;
3218     }
3219   }
3220   delete[] dbs;
3221   if (ulog) {
3222     delete[] ulogdbs;
3223     if (!ulog->close()) {
3224       eprintf("%s: closing the update log faild\n", g_progname);
3225       err = true;
3226     }
3227     delete ulog;
3228   }
3229   if (pldbinit && !pldblib.close()) {
3230     eprintf("%s: closing a shared library failed\n", g_progname);
3231     err = true;
3232   }
3233   serv.log(Logger::SYSTEM, "================ [FINISH]: pid=%d", g_procid);
3234   return err ? 1 : 0;
3235 }
3236 
3237 
3238 // snapshot all databases
dosnapshot(const char * bgspath,kc::Compressor * bgscomp,kt::TimedDB * dbs,int32_t dbnum,kt::RPCServer * serv)3239 static bool dosnapshot(const char* bgspath, kc::Compressor* bgscomp,
3240                        kt::TimedDB* dbs, int32_t dbnum, kt::RPCServer* serv) {
3241   bool err = false;
3242   for (int32_t i = 0; i < dbnum; i++) {
3243     kt::TimedDB* db = dbs + i;
3244     std::string destpath;
3245     kc::strprintf(&destpath, "%s%c%08d%c%s",
3246                   bgspath, kc::File::PATHCHR, i, kc::File::EXTCHR, BGSPATHEXT);
3247     std::string tmppath;
3248     kc::strprintf(&tmppath, "%s%ctmp", destpath.c_str(), kc::File::EXTCHR);
3249     int32_t cnt = 0;
3250     while (true) {
3251       if (db->dump_snapshot_atomic(tmppath, bgscomp)) {
3252         if (!kc::File::rename(tmppath, destpath)) {
3253           serv->log(Logger::ERROR, "renaming a file failed: %s: %s",
3254                     tmppath.c_str(), destpath.c_str());
3255         }
3256         kc::File::remove(tmppath);
3257         break;
3258       }
3259       kc::File::remove(tmppath);
3260       const kc::BasicDB::Error& e = db->error();
3261       if (e != kc::BasicDB::Error::LOGIC) {
3262         serv->log(Logger::ERROR, "database error: %d: %s: %s", e.code(), e.name(), e.message());
3263         break;
3264       }
3265       if (++cnt >= 3) {
3266         serv->log(Logger::SYSTEM, "snapshotting was abandoned");
3267         err = true;
3268         break;
3269       }
3270       serv->log(Logger::INFO, "retrying snapshot: %d", cnt);
3271     }
3272     kc::Thread::yield();
3273   }
3274   return !err;
3275 }
3276 
3277 
3278 
3279 // END OF FILE
3280