1 /*************************************************************************************************
2 * A handy cache/storage server
3 * Copyright (C) 2009-2012 FAL Labs
4 * This file is part of Kyoto Tycoon.
5 * This program is free software: you can redistribute it and/or modify it under the terms of
6 * the GNU General Public License as published by the Free Software Foundation, either version
7 * 3 of the License, or any later version.
8 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 * See the GNU General Public License for more details.
11 * You should have received a copy of the GNU General Public License along with this program.
12 * If not, see <http://www.gnu.org/licenses/>.
13 *************************************************************************************************/
14
15
16 #include "cmdcommon.h"
17
18
19 enum { // enumeration for operation counting
20 CNTSET, // setting operations
21 CNTSETMISS, // misses of setting operations
22 CNTREMOVE, // removing operations
23 CNTREMOVEMISS, // misses of removing operations
24 CNTGET, // getting operations
25 CNTGETMISS, // misses of getting operations
26 CNTSCRIPT, // scripting operations
27 CNTMISC // miscellaneous operations
28 };
29 typedef uint64_t OpCount[CNTMISC+1]; // counters per thread
30
31
32 // global variables
33 const char* g_progname; // program name
34 int32_t g_procid; // process ID number
35 double g_starttime; // start time
36 bool g_daemon; // daemon flag
37 kt::RPCServer* g_serv; // running RPC server
38 bool g_restart; // restart flag
39
40
41 // function prototypes
42 int main(int argc, char** argv);
43 static void usage();
44 static void killserver(int signum);
45 static int32_t run(int argc, char** argv);
46 static int32_t proc(const std::vector<std::string>& dbpaths,
47 const char* host, int32_t port, double tout, int32_t thnum,
48 const char* logpath, uint32_t logkinds,
49 const char* ulogpath, int64_t ulim, double uasi,
50 int32_t sid, int32_t omode, double asi, bool ash,
51 const char* bgspath, double bgsi, kc::Compressor* bgscomp, bool dmn,
52 const char* pidpath, const char* cmdpath, const char* scrpath,
53 const char* mhost, int32_t mport, const char* rtspath, double riv,
54 const char* plsvpath, const char* plsvex, const char* pldbpath);
55 static bool dosnapshot(const char* bgspath, kc::Compressor* bgscomp,
56 kt::TimedDB* dbs, int32_t dbnum, kt::RPCServer* serv);
57
58
59 // logger implementation
60 class Logger : public kt::RPCServer::Logger {
61 public:
62 // constructor
Logger()63 explicit Logger() : strm_(NULL), lock_() {}
64 // destructor
~Logger()65 ~Logger() {
66 if (strm_) close();
67 }
68 // open the stream
open(const char * path)69 bool open(const char* path) {
70 if (strm_) return false;
71 if (path && *path != '\0' && std::strcmp(path, "-")) {
72 std::ofstream* strm = new std::ofstream;
73 strm->open(path, std::ios_base::out | std::ios_base::binary | std::ios_base::app);
74 if (!*strm) {
75 delete strm;
76 return false;
77 }
78 strm_ = strm;
79 } else {
80 strm_ = &std::cout;
81 }
82 return true;
83 }
84 // close the stream
close()85 void close() {
86 if (!strm_) return;
87 if (strm_ != &std::cout) delete strm_;
88 strm_ = NULL;
89 }
90 // process a log message.
log(Kind kind,const char * message)91 void log(Kind kind, const char* message) {
92 if (!strm_) return;
93 char date[48];
94 kt::datestrwww(kc::nan(), kc::INT32MAX, 6, date);
95 const char* kstr = "MISC";
96 switch (kind) {
97 case kt::RPCServer::Logger::DEBUG: kstr = "DEBUG"; break;
98 case kt::RPCServer::Logger::INFO: kstr = "INFO"; break;
99 case kt::RPCServer::Logger::SYSTEM: kstr = "SYSTEM"; break;
100 case kt::RPCServer::Logger::ERROR: kstr = "ERROR"; break;
101 }
102 lock_.lock();
103 *strm_ << date << ": [" << kstr << "]: " << message << "\n";
104 strm_->flush();
105 lock_.unlock();
106 }
107 private:
108 std::ostream* strm_;
109 kc::Mutex lock_;
110 };
111
112
113 // database logger implementation
114 class DBLogger : public kc::BasicDB::Logger {
115 public:
116 // constructor
DBLogger(::Logger * logger,uint32_t kinds)117 explicit DBLogger(::Logger* logger, uint32_t kinds) : logger_(logger), kinds_(kinds) {}
118 // process a log message.
log(const char * file,int32_t line,const char * func,kc::BasicDB::Logger::Kind kind,const char * message)119 void log(const char* file, int32_t line, const char* func,
120 kc::BasicDB::Logger::Kind kind, const char* message) {
121 kt::RPCServer::Logger::Kind rkind;
122 switch (kind) {
123 default: rkind = kt::RPCServer::Logger::DEBUG; break;
124 case kc::BasicDB::Logger::INFO: rkind = kt::RPCServer::Logger::INFO; break;
125 case kc::BasicDB::Logger::WARN: rkind = kt::RPCServer::Logger::SYSTEM; break;
126 case kc::BasicDB::Logger::ERROR: rkind = kt::RPCServer::Logger::ERROR; break;
127 }
128 if (!(rkind & kinds_)) return;
129 std::string lmsg;
130 kc::strprintf(&lmsg, "[DB]: %s", message);
131 logger_->log(rkind, lmsg.c_str());
132 }
133 private:
134 ::Logger* logger_;
135 uint32_t kinds_;
136 };
137
138
139 // replication slave implemantation
140 class Slave : public kc::Thread {
141 friend class Worker;
142 public:
143 // constructor
Slave(uint16_t sid,const char * rtspath,const char * host,int32_t port,double riv,kt::RPCServer * serv,kt::TimedDB * dbs,int32_t dbnum,kt::UpdateLogger * ulog,DBUpdateLogger * ulogdbs)144 explicit Slave(uint16_t sid, const char* rtspath, const char* host, int32_t port, double riv,
145 kt::RPCServer* serv, kt::TimedDB* dbs, int32_t dbnum,
146 kt::UpdateLogger* ulog, DBUpdateLogger* ulogdbs) :
147 lock_(), sid_(sid), rtspath_(rtspath), host_(""), port_(port), riv_(riv),
148 serv_(serv), dbs_(dbs), dbnum_(dbnum), ulog_(ulog), ulogdbs_(ulogdbs),
149 wrts_(kc::UINT64MAX), rts_(0), alive_(true), hup_(false) {
150 if (host) host_ = host;
151 }
152 // stop the slave
stop()153 void stop() {
154 alive_ = false;
155 }
156 // restart the slave
restart()157 void restart() {
158 hup_ = true;
159 }
160 // set the configuration of the master
set_master(const std::string & host,int32_t port,uint64_t ts,double iv)161 void set_master(const std::string& host, int32_t port, uint64_t ts, double iv) {
162 kc::ScopedSpinLock lock(&lock_);
163 host_ = host;
164 port_ = port;
165 wrts_ = ts;
166 if (iv >= 0) riv_ = iv;
167 }
168 // get the host name of the master
host()169 std::string host() {
170 kc::ScopedSpinLock lock(&lock_);
171 return host_;
172 }
173 // get the port number name of the master
port()174 int32_t port() {
175 kc::ScopedSpinLock lock(&lock_);
176 return port_;
177 }
178 // get the replication time stamp
rts()179 uint64_t rts() {
180 return rts_;
181 }
182 // get the replication interval
riv()183 double riv() {
184 return riv_;
185 }
186 private:
187 static const int32_t DUMMYFREQ = 256;
188 static const size_t RTSFILESIZ = 21;
189 // perform replication
run(void)190 void run(void) {
191 if (!rtspath_) return;
192 kc::File rtsfile;
193 if (!rtsfile.open(rtspath_, kc::File::OWRITER | kc::File::OCREATE, kc::NUMBUFSIZ) ||
194 !rtsfile.truncate(RTSFILESIZ)) {
195 serv_->log(Logger::ERROR, "opening the RTS file failed: path=%s", rtspath_);
196 return;
197 }
198 rts_ = read_rts(&rtsfile);
199 write_rts(&rtsfile, rts_);
200 kc::Thread::sleep(0.2);
201 bool deferred = false;
202 while (true) {
203 lock_.lock();
204 std::string host = host_;
205 int32_t port = port_;
206 uint64_t wrts = wrts_;
207 lock_.unlock();
208 if (!host.empty()) {
209 if (wrts != kc::UINT64MAX) {
210 lock_.lock();
211 wrts_ = kc::UINT64MAX;
212 rts_ = wrts;
213 write_rts(&rtsfile, rts_);
214 lock_.unlock();
215 }
216 kt::ReplicationClient rc;
217 if (rc.open(host, port, 60, rts_, sid_)) {
218 serv_->log(Logger::SYSTEM, "replication started: host=%s port=%d rts=%llu",
219 host.c_str(), port, (unsigned long long)rts_);
220 hup_ = false;
221 double rivsum = 0;
222 while (alive_ && !hup_ && rc.alive()) {
223 size_t msiz;
224 uint64_t mts;
225 char* mbuf = rc.read(&msiz, &mts);
226 if (mbuf) {
227 if (msiz > 0) {
228 size_t rsiz;
229 uint16_t rsid, rdbid;
230 const char* rbuf = DBUpdateLogger::parse(mbuf, msiz, &rsiz, &rsid, &rdbid);
231 if (rbuf && rsid != sid_ && rdbid < dbnum_) {
232 kt::TimedDB* db = dbs_ + rdbid;
233 DBUpdateLogger* ulogdb = ulogdbs_ ? ulogdbs_ + rdbid : NULL;
234 if (ulogdb) ulogdb->set_rsid(rsid);
235 if (!db->recover(rbuf, rsiz)) {
236 const kc::BasicDB::Error& e = db->error();
237 serv_->log(Logger::ERROR, "recovering a database failed: %s: %s",
238 e.name(), e.message());
239 }
240 if (ulogdb) ulogdb->clear_rsid();
241 }
242 rivsum += riv_;
243 } else {
244 rivsum += riv_ * DUMMYFREQ / 4;
245 }
246 delete[] mbuf;
247 while (rivsum > 100 && alive_ && !hup_ && rc.alive()) {
248 kc::Thread::sleep(0.1);
249 rivsum -= 100;
250 }
251 }
252 if (mts > rts_) rts_ = mts;
253 }
254 rc.close();
255 serv_->log(Logger::SYSTEM, "replication finished: host=%s port=%d",
256 host.c_str(), port);
257 write_rts(&rtsfile, rts_);
258 deferred = false;
259 } else {
260 if (!deferred) serv_->log(Logger::SYSTEM, "replication was deferred: host=%s port=%d",
261 host.c_str(), port);
262 deferred = true;
263 }
264 }
265 if (alive_) {
266 kc::Thread::sleep(1);
267 } else {
268 break;
269 }
270 }
271 if (!rtsfile.close()) serv_->log(Logger::ERROR, "closing the RTS file failed");
272 }
273 // read the replication time stamp
read_rts(kc::File * file)274 uint64_t read_rts(kc::File* file) {
275 char buf[RTSFILESIZ];
276 file->read_fast(0, buf, RTSFILESIZ);
277 buf[sizeof(buf)-1] = '\0';
278 return kc::atoi(buf);
279 }
280 // write the replication time stamp
write_rts(kc::File * file,uint64_t rts)281 void write_rts(kc::File* file, uint64_t rts) {
282 char buf[kc::NUMBUFSIZ];
283 std::sprintf(buf, "%020llu\n", (unsigned long long)rts);
284 if (!file->write_fast(0, buf, RTSFILESIZ))
285 serv_->log(Logger::SYSTEM, "writing the time stamp failed");
286 }
287 kc::SpinLock lock_;
288 const uint16_t sid_;
289 const char* const rtspath_;
290 std::string host_;
291 int32_t port_;
292 double riv_;
293 kt::RPCServer* const serv_;
294 kt::TimedDB* const dbs_;
295 const int32_t dbnum_;
296 kt::UpdateLogger* const ulog_;
297 DBUpdateLogger* const ulogdbs_;
298 uint64_t wrts_;
299 uint64_t rts_;
300 bool alive_;
301 bool hup_;
302 };
303
304
305 // plug-in server driver
306 class PlugInDriver : public kc::Thread {
307 public:
308 // constructor
PlugInDriver(kt::PluggableServer * serv)309 explicit PlugInDriver(kt::PluggableServer* serv) : serv_(serv), error_(false) {}
310 // get the error flag
error()311 bool error() {
312 return error_;
313 }
314 private:
315 // perform service
run(void)316 void run(void) {
317 kc::Thread::sleep(0.4);
318 if (serv_->start()) {
319 if (!serv_->finish()) error_ = true;
320 } else {
321 error_ = true;
322 }
323 }
324 kt::PluggableServer* serv_;
325 bool error_;
326 };
327
328
329 // worker implementation
330 class Worker : public kt::RPCServer::Worker {
331 private:
332 class SLS;
333 typedef kt::RPCClient::ReturnValue RV;
334 public:
335 // constructor
Worker(int32_t thnum,kc::CondMap * condmap,kt::TimedDB * dbs,int32_t dbnum,const std::map<std::string,int32_t> & dbmap,int32_t omode,double asi,bool ash,const char * bgspath,double bgsi,kc::Compressor * bgscomp,kt::UpdateLogger * ulog,DBUpdateLogger * ulogdbs,const char * cmdpath,ScriptProcessor * scrprocs,OpCount * opcounts)336 explicit Worker(int32_t thnum, kc::CondMap* condmap, kt::TimedDB* dbs, int32_t dbnum,
337 const std::map<std::string, int32_t>& dbmap, int32_t omode,
338 double asi, bool ash, const char* bgspath, double bgsi,
339 kc::Compressor* bgscomp, kt::UpdateLogger* ulog, DBUpdateLogger* ulogdbs,
340 const char* cmdpath, ScriptProcessor* scrprocs, OpCount* opcounts) :
341 thnum_(thnum), condmap_(condmap), dbs_(dbs), dbnum_(dbnum), dbmap_(dbmap),
342 omode_(omode), asi_(asi), ash_(ash), bgspath_(bgspath), bgsi_(bgsi), bgscomp_(bgscomp),
343 ulog_(ulog), ulogdbs_(ulogdbs), cmdpath_(cmdpath), scrprocs_(scrprocs),
344 opcounts_(opcounts), idlecnt_(0), asnext_(0), bgsnext_(0), slave_(NULL) {
345 asnext_ = kc::time() + asi_;
346 bgsnext_ = kc::time() + bgsi_;
347 }
348 // set miscellaneous configuration
set_misc_conf(Slave * slave)349 void set_misc_conf(Slave* slave) {
350 slave_ = slave;
351 }
352 private:
353 // process each request of RPC.
process(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::string & name,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)354 RV process(kt::RPCServer* serv, kt::RPCServer::Session* sess, const std::string& name,
355 const std::map<std::string, std::string>& inmap,
356 std::map<std::string, std::string>& outmap) {
357 size_t rsiz;
358 const char* rp = kt::strmapget(inmap, "WAIT", &rsiz);
359 if (rp) {
360 std::string condname(rp, rsiz);
361 rp = kt::strmapget(inmap, "WAITTIME");
362 double wsec = rp ? kc::atof(rp) : 0.0;
363 if (wsec <= 0) wsec = DEFTOUT;
364 kt::ThreadedServer* thserv = serv->reveal_core()->reveal_core();
365 if (!condmap_->wait(condname, wsec) || thserv->aborted()) {
366 set_message(outmap, "ERROR", "the condition timed out");
367 return kt::RPCClient::RVETIMEOUT;
368 }
369 }
370 int32_t dbidx = 0;
371 rp = kt::strmapget(inmap, "DB");
372 if (rp && *rp != '\0') {
373 dbidx = -1;
374 if (*rp >= '0' && *rp <= '9') {
375 dbidx = kc::atoi(rp);
376 } else {
377 std::map<std::string, int32_t>::const_iterator it = dbmap_.find(rp);
378 if (it != dbmap_.end()) dbidx = it->second;
379 }
380 }
381 kt::TimedDB* db = dbidx >= 0 && dbidx < dbnum_ ? dbs_ + dbidx : NULL;
382 int64_t curid = -1;
383 rp = kt::strmapget(inmap, "CUR");
384 if (rp && *rp >= '0' && *rp <= '9') curid = kc::atoi(rp);
385 kt::TimedDB::Cursor* cur = NULL;
386 if (curid >= 0) {
387 SLS* sls = SLS::create(sess);
388 std::map<int64_t, kt::TimedDB::Cursor*>::iterator it = sls->curs_.find(curid);
389 if (it == sls->curs_.end()) {
390 if (db) {
391 cur = db->cursor();
392 sls->curs_[curid] = cur;
393 }
394 } else {
395 cur = it->second;
396 if (name == "cur_delete") {
397 sls->curs_.erase(curid);
398 delete cur;
399 return kt::RPCClient::RVSUCCESS;
400 }
401 }
402 }
403 RV rv;
404 if (name == "void") {
405 rv = do_void(serv, sess, inmap, outmap);
406 } else if (name == "echo") {
407 rv = do_echo(serv, sess, inmap, outmap);
408 } else if (name == "report") {
409 rv = do_report(serv, sess, inmap, outmap);
410 } else if (name == "play_script") {
411 rv = do_play_script(serv, sess, inmap, outmap);
412 } else if (name == "tune_replication") {
413 rv = do_tune_replication(serv, sess, inmap, outmap);
414 } else if (name == "ulog_list") {
415 rv = do_ulog_list(serv, sess, inmap, outmap);
416 } else if (name == "ulog_remove") {
417 rv = do_ulog_remove(serv, sess, inmap, outmap);
418 } else if (name == "status") {
419 rv = do_status(serv, sess, db, inmap, outmap);
420 } else if (name == "clear") {
421 rv = do_clear(serv, sess, db, inmap, outmap);
422 } else if (name == "synchronize") {
423 rv = do_synchronize(serv, sess, db, inmap, outmap);
424 } else if (name == "set") {
425 rv = do_set(serv, sess, db, inmap, outmap);
426 } else if (name == "add") {
427 rv = do_add(serv, sess, db, inmap, outmap);
428 } else if (name == "replace") {
429 rv = do_replace(serv, sess, db, inmap, outmap);
430 } else if (name == "append") {
431 rv = do_append(serv, sess, db, inmap, outmap);
432 } else if (name == "increment") {
433 rv = do_increment(serv, sess, db, inmap, outmap);
434 } else if (name == "increment_double") {
435 rv = do_increment_double(serv, sess, db, inmap, outmap);
436 } else if (name == "cas") {
437 rv = do_cas(serv, sess, db, inmap, outmap);
438 } else if (name == "remove") {
439 rv = do_remove(serv, sess, db, inmap, outmap);
440 } else if (name == "get") {
441 rv = do_get(serv, sess, db, inmap, outmap);
442 } else if (name == "check") {
443 rv = do_check(serv, sess, db, inmap, outmap);
444 } else if (name == "seize") {
445 rv = do_seize(serv, sess, db, inmap, outmap);
446 } else if (name == "set_bulk") {
447 rv = do_set_bulk(serv, sess, db, inmap, outmap);
448 } else if (name == "remove_bulk") {
449 rv = do_remove_bulk(serv, sess, db, inmap, outmap);
450 } else if (name == "get_bulk") {
451 rv = do_get_bulk(serv, sess, db, inmap, outmap);
452 } else if (name == "vacuum") {
453 rv = do_vacuum(serv, sess, db, inmap, outmap);
454 } else if (name == "match_prefix") {
455 rv = do_match_prefix(serv, sess, db, inmap, outmap);
456 } else if (name == "match_regex") {
457 rv = do_match_regex(serv, sess, db, inmap, outmap);
458 } else if (name == "match_similar") {
459 rv = do_match_similar(serv, sess, db, inmap, outmap);
460 } else if (name == "cur_jump") {
461 rv = do_cur_jump(serv, sess, cur, inmap, outmap);
462 } else if (name == "cur_jump_back") {
463 rv = do_cur_jump_back(serv, sess, cur, inmap, outmap);
464 } else if (name == "cur_step") {
465 rv = do_cur_step(serv, sess, cur, inmap, outmap);
466 } else if (name == "cur_step_back") {
467 rv = do_cur_step_back(serv, sess, cur, inmap, outmap);
468 } else if (name == "cur_set_value") {
469 rv = do_cur_set_value(serv, sess, cur, inmap, outmap);
470 } else if (name == "cur_remove") {
471 rv = do_cur_remove(serv, sess, cur, inmap, outmap);
472 } else if (name == "cur_get_key") {
473 rv = do_cur_get_key(serv, sess, cur, inmap, outmap);
474 } else if (name == "cur_get_value") {
475 rv = do_cur_get_value(serv, sess, cur, inmap, outmap);
476 } else if (name == "cur_get") {
477 rv = do_cur_get(serv, sess, cur, inmap, outmap);
478 } else if (name == "cur_seize") {
479 rv = do_cur_seize(serv, sess, cur, inmap, outmap);
480 } else {
481 set_message(outmap, "ERROR", "not implemented: %s", name.c_str());
482 rv = kt::RPCClient::RVENOIMPL;
483 }
484 rp = kt::strmapget(inmap, "SIGNAL", &rsiz);
485 if (rp) {
486 std::string condname(rp, rsiz);
487 rp = kt::strmapget(inmap, "SIGNALBROAD");
488 bool broad = rp ? true : false;
489 size_t wnum = broad ? condmap_->broadcast(condname) : condmap_->signal(condname);
490 set_message(outmap, "SIGNALED", "%lld", (long long)wnum);
491 }
492 return rv;
493 }
494 // process each request of the others.
process(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,const std::string & path,kt::HTTPClient::Method method,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)495 int32_t process(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
496 const std::string& path, kt::HTTPClient::Method method,
497 const std::map<std::string, std::string>& reqheads,
498 const std::string& reqbody,
499 std::map<std::string, std::string>& resheads,
500 std::string& resbody,
501 const std::map<std::string, std::string>& misc) {
502 const char* pstr = path.c_str();
503 if (*pstr == '/') pstr++;
504 int32_t dbidx = 0;
505 const char* rp = std::strchr(pstr, '/');
506 if (rp) {
507 std::string dbexpr(pstr, rp - pstr);
508 pstr = rp + 1;
509 if (*pstr == '/') pstr++;
510 size_t desiz;
511 char* destr = kc::urldecode(dbexpr.c_str(), &desiz);
512 if (*destr != '\0') {
513 dbidx = -1;
514 if (*destr >= '0' && *destr <= '9') {
515 dbidx = kc::atoi(destr);
516 } else {
517 std::map<std::string, int32_t>::const_iterator it = dbmap_.find(destr);
518 if (it != dbmap_.end()) dbidx = it->second;
519 }
520 }
521 delete[] destr;
522 }
523 if (dbidx < 0 || dbidx >= dbnum_) {
524 resbody.append("no such database\n");
525 return 400;
526 }
527 kt::TimedDB* db = dbs_ + dbidx;
528 size_t ksiz;
529 char* kbuf = kc::urldecode(pstr, &ksiz);
530 int32_t code;
531 switch (method) {
532 case kt::HTTPClient::MGET: {
533 code = do_rest_get(serv, sess, db, kbuf, ksiz,
534 reqheads, reqbody, resheads, resbody, misc);
535 break;
536 }
537 case kt::HTTPClient::MHEAD: {
538 code = do_rest_head(serv, sess, db, kbuf, ksiz,
539 reqheads, reqbody, resheads, resbody, misc);
540 break;
541 }
542 case kt::HTTPClient::MPUT: {
543 code = do_rest_put(serv, sess, db, kbuf, ksiz,
544 reqheads, reqbody, resheads, resbody, misc);
545 break;
546 }
547 case kt::HTTPClient::MDELETE: {
548 code = do_rest_delete(serv, sess, db, kbuf, ksiz,
549 reqheads, reqbody, resheads, resbody, misc);
550 break;
551 }
552 default: {
553 code = 501;
554 break;
555 }
556 }
557 delete[] kbuf;
558 return code;
559 }
560 // process each binary request
process_binary(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)561 bool process_binary(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
562 int32_t magic = sess->receive_byte();
563 const char* cmd;
564 bool rv;
565 switch (magic) {
566 case kt::RemoteDB::BMREPLICATION: {
567 cmd = "bin_replication";
568 rv = do_bin_replication(serv, sess);
569 break;
570 }
571 case kt::RemoteDB::BMPLAYSCRIPT: {
572 cmd = "bin_play_script";
573 rv = do_bin_play_script(serv, sess);
574 break;
575 }
576 case kt::RemoteDB::BMSETBULK: {
577 cmd = "bin_set_bulk";
578 rv = do_bin_set_bulk(serv, sess);
579 break;
580 }
581 case kt::RemoteDB::BMREMOVEBULK: {
582 cmd = "bin_remove_bulk";
583 rv = do_bin_remove_bulk(serv, sess);
584 break;
585 }
586 case kt::RemoteDB::BMGETBULK: {
587 cmd = "bin_get_bulk";
588 rv = do_bin_get_bulk(serv, sess);
589 break;
590 }
591 default: {
592 cmd = "bin_unknown";
593 rv = false;
594 break;
595 }
596 }
597 std::string expr = sess->expression();
598 serv->log(kt::ThreadedServer::Logger::INFO, "(%s): %s: %d", expr.c_str(), cmd, rv);
599 return rv;
600 }
601 // process each idle event
process_idle(kt::RPCServer * serv)602 void process_idle(kt::RPCServer* serv) {
603 if (omode_ & kc::BasicDB::OWRITER) {
604 int32_t dbidx = idlecnt_++ % dbnum_;
605 kt::TimedDB* db = dbs_ + dbidx;
606 kt::ThreadedServer* thserv = serv->reveal_core()->reveal_core();
607 for (int32_t i = 0; i < 4; i++) {
608 if (thserv->task_count() > 0) break;
609 if (!db->vacuum(2)) {
610 const kc::BasicDB::Error& e = db->error();
611 log_db_error(serv, e);
612 break;
613 }
614 kc::Thread::yield();
615 }
616 }
617 }
618 // process each timer event
process_timer(kt::RPCServer * serv)619 void process_timer(kt::RPCServer* serv) {
620 if (asi_ > 0 && (omode_ & kc::BasicDB::OWRITER) && kc::time() >= asnext_) {
621 serv->log(Logger::INFO, "synchronizing databases");
622 for (int32_t i = 0; i < dbnum_; i++) {
623 kt::TimedDB* db = dbs_ + i;
624 if (!db->synchronize(ash_)) {
625 const kc::BasicDB::Error& e = db->error();
626 log_db_error(serv, e);
627 break;
628 }
629 kc::Thread::yield();
630 }
631 asnext_ = kc::time() + asi_;
632 }
633 if (bgspath_ && bgsi_ > 0 && kc::time() >= bgsnext_) {
634 serv->log(Logger::INFO, "snapshotting databases");
635 dosnapshot(bgspath_, bgscomp_, dbs_, dbnum_, serv);
636 bgsnext_ = kc::time() + bgsi_;
637 }
638 }
639 // process the starting event
process_start(kt::RPCServer * serv)640 void process_start(kt::RPCServer* serv) {
641 kt::maskthreadsignal();
642 }
643 // set the error message
set_message(std::map<std::string,std::string> & outmap,const char * key,const char * format,...)644 void set_message(std::map<std::string, std::string>& outmap, const char* key,
645 const char* format, ...) {
646 std::string message;
647 va_list ap;
648 va_start(ap, format);
649 kc::vstrprintf(&message, format, ap);
650 va_end(ap);
651 outmap[key] = message;
652 }
653 // set the database error message
set_db_error(std::map<std::string,std::string> & outmap,const kc::BasicDB::Error & e)654 void set_db_error(std::map<std::string, std::string>& outmap, const kc::BasicDB::Error& e) {
655 set_message(outmap, "ERROR", "DB: %d: %s: %s", e.code(), e.name(), e.message());
656 }
657 // log the database error message
log_db_error(kt::RPCServer * serv,const kc::BasicDB::Error & e)658 void log_db_error(kt::RPCServer* serv, const kc::BasicDB::Error& e) {
659 log_db_error(serv->reveal_core(), e);
660 }
661 // log the database error message
log_db_error(kt::HTTPServer * serv,const kc::BasicDB::Error & e)662 void log_db_error(kt::HTTPServer* serv, const kc::BasicDB::Error& e) {
663 serv->log(Logger::ERROR, "database error: %d: %s: %s", e.code(), e.name(), e.message());
664 }
665 // process the void procedure
do_void(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)666 RV do_void(kt::RPCServer* serv, kt::RPCServer::Session* sess,
667 const std::map<std::string, std::string>& inmap,
668 std::map<std::string, std::string>& outmap) {
669 return kt::RPCClient::RVSUCCESS;
670 }
671 // process the echo procedure
do_echo(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)672 RV do_echo(kt::RPCServer* serv, kt::RPCServer::Session* sess,
673 const std::map<std::string, std::string>& inmap,
674 std::map<std::string, std::string>& outmap) {
675 outmap.insert(inmap.begin(), inmap.end());
676 return kt::RPCClient::RVSUCCESS;
677 }
678 // process the report procedure
do_report(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)679 RV do_report(kt::RPCServer* serv, kt::RPCServer::Session* sess,
680 const std::map<std::string, std::string>& inmap,
681 std::map<std::string, std::string>& outmap) {
682 int64_t totalcount = 0;
683 int64_t totalsize = 0;
684 for (int32_t i = 0; i < dbnum_; i++) {
685 int64_t count = dbs_[i].count();
686 int64_t size = dbs_[i].size();
687 std::string key;
688 kc::strprintf(&key, "db_%d", i);
689 set_message(outmap, key.c_str(), "count=%lld size=%lld path=%s",
690 (long long)count, (long long)size, dbs_[i].path().c_str());
691 totalcount += count;
692 totalsize += size;
693 }
694 set_message(outmap, "db_total_count", "%lld", (long long)totalcount);
695 set_message(outmap, "db_total_size", "%lld", (long long)totalsize);
696 kt::ThreadedServer* thserv = serv->reveal_core()->reveal_core();
697 set_message(outmap, "serv_conn_count", "%lld", (long long)thserv->connection_count());
698 set_message(outmap, "serv_task_count", "%lld", (long long)thserv->task_count());
699 set_message(outmap, "serv_thread_count", "%lld", (long long)thnum_);
700 double ctime = kc::time();
701 set_message(outmap, "serv_current_time", "%.6f", ctime);
702 set_message(outmap, "serv_running_term", "%.6f", ctime - g_starttime);
703 set_message(outmap, "serv_proc_id", "%d", g_procid);
704 std::map<std::string, std::string> sysinfo;
705 kc::getsysinfo(&sysinfo);
706 std::map<std::string, std::string>::iterator it = sysinfo.begin();
707 std::map<std::string, std::string>::iterator itend = sysinfo.end();
708 while (it != itend) {
709 std::string key;
710 kc::strprintf(&key, "sys_%s", it->first.c_str());
711 set_message(outmap, key.c_str(), it->second.c_str());
712 ++it;
713 }
714 const std::string& mhost = slave_->host();
715 if (!mhost.empty()) {
716 set_message(outmap, "repl_master_host", "%s", mhost.c_str());
717 set_message(outmap, "repl_master_port", "%d", slave_->port());
718 uint64_t rts = slave_->rts();
719 set_message(outmap, "repl_timestamp", "%llu", (unsigned long long)rts);
720 set_message(outmap, "repl_interval", "%.6f", slave_->riv());
721 uint64_t cc = kt::UpdateLogger::clock_pure();
722 uint64_t delay = cc > rts ? cc - rts : 0;
723 set_message(outmap, "repl_delay", "%.6f", delay / 1000000000.0);
724 }
725 OpCount ocsum;
726 for (int32_t i = 0; i <= CNTMISC; i++) {
727 ocsum[i] = 0;
728 }
729 for (int32_t i = 0; i < thnum_; i++) {
730 for (int32_t j = 0; j <= CNTMISC; j++) {
731 ocsum[j] += opcounts_[i][j];
732 }
733 }
734 set_message(outmap, "cnt_set", "%llu", (unsigned long long)ocsum[CNTSET]);
735 set_message(outmap, "cnt_set_misses", "%llu", (unsigned long long)ocsum[CNTSETMISS]);
736 set_message(outmap, "cnt_remove", "%llu", (unsigned long long)ocsum[CNTREMOVE]);
737 set_message(outmap, "cnt_remove_misses", "%llu", (unsigned long long)ocsum[CNTREMOVEMISS]);
738 set_message(outmap, "cnt_get", "%llu", (unsigned long long)ocsum[CNTGET]);
739 set_message(outmap, "cnt_get_misses", "%llu", (unsigned long long)ocsum[CNTGETMISS]);
740 set_message(outmap, "cnt_script", "%llu", (unsigned long long)ocsum[CNTSCRIPT]);
741 set_message(outmap, "cnt_misc", "%llu", (unsigned long long)ocsum[CNTMISC]);
742 set_message(outmap, "conf_kt_version", "%s (%d.%d)", kt::VERSION, kt::LIBVER, kt::LIBREV);
743 set_message(outmap, "conf_kt_features", "%s", kt::FEATURES);
744 set_message(outmap, "conf_kc_version", "%s (%d.%d)", kc::VERSION, kc::LIBVER, kc::LIBREV);
745 set_message(outmap, "conf_kc_features", "%s", kc::FEATURES);
746 set_message(outmap, "conf_os_name", "%s", kc::OSNAME);
747 return kt::RPCClient::RVSUCCESS;
748 }
749 // process the play_script procedure
do_play_script(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)750 RV do_play_script(kt::RPCServer* serv, kt::RPCServer::Session* sess,
751 const std::map<std::string, std::string>& inmap,
752 std::map<std::string, std::string>& outmap) {
753 uint32_t thid = sess->thread_id();
754 if (!scrprocs_) {
755 set_message(outmap, "ERROR", "the scripting extention is disabled");
756 return kt::RPCClient::RVENOIMPL;
757 }
758 ScriptProcessor* scrproc = scrprocs_ + thid;
759 const char* nstr = kt::strmapget(inmap, "name");
760 if (!nstr || *nstr == '\0' || !kt::strisalnum(nstr)) {
761 set_message(outmap, "ERROR", "invalid parameters");
762 return kt::RPCClient::RVEINVALID;
763 }
764 std::map<std::string, std::string> scrinmap;
765 std::map<std::string, std::string>::const_iterator it = inmap.begin();
766 std::map<std::string, std::string>::const_iterator itend = inmap.end();
767 while (it != itend) {
768 const char* kbuf = it->first.data();
769 size_t ksiz = it->first.size();
770 if (ksiz > 0 && *kbuf == '_') {
771 std::string key(kbuf + 1, ksiz - 1);
772 scrinmap[key] = it->second;
773 }
774 ++it;
775 }
776 opcounts_[thid][CNTSCRIPT]++;
777 std::map<std::string, std::string> scroutmap;
778 RV rv = scrproc->call(nstr, scrinmap, scroutmap);
779 if (rv == kt::RPCClient::RVSUCCESS) {
780 it = scroutmap.begin();
781 itend = scroutmap.end();
782 while (it != itend) {
783 std::string key = "_";
784 key.append(it->first);
785 outmap[key] = it->second;
786 ++it;
787 }
788 } else if (rv == kt::RPCClient::RVENOIMPL) {
789 set_message(outmap, "ERROR", "no such scripting procedure");
790 } else {
791 set_message(outmap, "ERROR", "the scripting procedure failed");
792 }
793 return rv;
794 }
795 // process the tune_replication procedure
do_tune_replication(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)796 RV do_tune_replication(kt::RPCServer* serv, kt::RPCServer::Session* sess,
797 const std::map<std::string, std::string>& inmap,
798 std::map<std::string, std::string>& outmap) {
799 if (!slave_->rtspath_) {
800 set_message(outmap, "ERROR", "the RTS file is not set");
801 return kt::RPCClient::RVENOIMPL;
802 }
803 const char* host = kt::strmapget(inmap, "host");
804 if (!host) host = "";
805 const char* rp = kt::strmapget(inmap, "port");
806 int32_t port = rp ? kc::atoi(rp) : 0;
807 if (port < 1) port = kt::DEFPORT;
808 rp = kt::strmapget(inmap, "ts");
809 uint64_t ts = kc::UINT64MAX;
810 if (rp) {
811 if (!std::strcmp(rp, "now")) {
812 ts = kt::UpdateLogger::clock_pure();
813 } else {
814 ts = kc::atoi(rp);
815 }
816 }
817 rp = kt::strmapget(inmap, "iv");
818 double iv = rp ? kc::atof(rp) : -1;
819 char tsstr[kc::NUMBUFSIZ];
820 if (ts == kc::UINT64MAX) {
821 std::sprintf(tsstr, "*");
822 } else {
823 std::sprintf(tsstr, "%llu", (unsigned long long)ts);
824 }
825 char ivstr[kc::NUMBUFSIZ];
826 if (iv < 0) {
827 std::sprintf(ivstr, "*");
828 } else {
829 std::sprintf(ivstr, "%.6f", iv);
830 }
831 serv->log(Logger::SYSTEM, "replication setting was modified: host=%s port=%d ts=%s iv=%s",
832 host, port, tsstr, ivstr);
833 slave_->set_master(host, port, ts, iv);
834 slave_->restart();
835 return kt::RPCClient::RVSUCCESS;
836 }
837 // process the ulog_list procedure
do_ulog_list(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)838 RV do_ulog_list(kt::RPCServer* serv, kt::RPCServer::Session* sess,
839 const std::map<std::string, std::string>& inmap,
840 std::map<std::string, std::string>& outmap) {
841 if (!ulog_) {
842 set_message(outmap, "ERROR", "no update log allows no replication");
843 return kt::RPCClient::RVEINVALID;
844 }
845 std::vector<kt::UpdateLogger::FileStatus> files;
846 ulog_->list_files(&files);
847 std::vector<kt::UpdateLogger::FileStatus>::iterator it = files.begin();
848 std::vector<kt::UpdateLogger::FileStatus>::iterator itend = files.end();
849 while (it != itend) {
850 set_message(outmap, it->path.c_str(), "%llu:%llu",
851 (unsigned long long)it->size, (unsigned long long)it->ts);
852 ++it;
853 }
854 return kt::RPCClient::RVSUCCESS;
855 }
856 // process the ulog_remove procedure
do_ulog_remove(kt::RPCServer * serv,kt::RPCServer::Session * sess,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)857 RV do_ulog_remove(kt::RPCServer* serv, kt::RPCServer::Session* sess,
858 const std::map<std::string, std::string>& inmap,
859 std::map<std::string, std::string>& outmap) {
860 if (!ulog_) {
861 set_message(outmap, "ERROR", "no update log allows no replication");
862 return kt::RPCClient::RVEINVALID;
863 }
864 const char* rp = kt::strmapget(inmap, "ts");
865 uint64_t ts = kc::UINT64MAX;
866 if (rp) {
867 if (!std::strcmp(rp, "now")) {
868 ts = kt::UpdateLogger::clock_pure();
869 } else {
870 ts = kc::atoi(rp);
871 }
872 }
873 bool err = false;
874 std::vector<kt::UpdateLogger::FileStatus> files;
875 ulog_->list_files(&files);
876 std::vector<kt::UpdateLogger::FileStatus>::iterator it = files.begin();
877 std::vector<kt::UpdateLogger::FileStatus>::iterator itend = files.end();
878 if (it != itend) itend--;
879 while (it != itend) {
880 if (it->ts <= ts && !kc::File::remove(it->path)) {
881 set_message(outmap, "ERROR", "removing a file failed: %s", it->path.c_str());
882 serv->log(Logger::ERROR, "removing a file failed: %s", it->path.c_str());
883 err = true;
884 }
885 ++it;
886 }
887 return err ? kt::RPCClient::RVEINTERNAL : kt::RPCClient::RVSUCCESS;
888 }
889 // process the status procedure
do_status(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)890 RV do_status(kt::RPCServer* serv, kt::RPCServer::Session* sess,
891 kt::TimedDB* db,
892 const std::map<std::string, std::string>& inmap,
893 std::map<std::string, std::string>& outmap) {
894 uint32_t thid = sess->thread_id();
895 if (!db) {
896 set_message(outmap, "ERROR", "no such database");
897 return kt::RPCClient::RVEINVALID;
898 }
899 RV rv;
900 opcounts_[thid][CNTMISC]++;
901 std::map<std::string, std::string> status;
902 if (db->status(&status)) {
903 rv = kt::RPCClient::RVSUCCESS;
904 outmap.insert(status.begin(), status.end());
905 } else {
906 const kc::BasicDB::Error& e = db->error();
907 set_db_error(outmap, e);
908 log_db_error(serv, e);
909 rv = kt::RPCClient::RVEINTERNAL;
910 }
911 return rv;
912 }
913 // process the clear procedure
do_clear(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)914 RV do_clear(kt::RPCServer* serv, kt::RPCServer::Session* sess,
915 kt::TimedDB* db,
916 const std::map<std::string, std::string>& inmap,
917 std::map<std::string, std::string>& outmap) {
918 uint32_t thid = sess->thread_id();
919 if (!db) {
920 set_message(outmap, "ERROR", "no such database");
921 return kt::RPCClient::RVEINVALID;
922 }
923 RV rv;
924 opcounts_[thid][CNTMISC]++;
925 if (db->clear()) {
926 rv = kt::RPCClient::RVSUCCESS;
927 } else {
928 const kc::BasicDB::Error& e = db->error();
929 set_db_error(outmap, e);
930 log_db_error(serv, e);
931 rv = kt::RPCClient::RVEINTERNAL;
932 }
933 return rv;
934 }
935 // process the synchronize procedure
do_synchronize(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)936 RV do_synchronize(kt::RPCServer* serv, kt::RPCServer::Session* sess,
937 kt::TimedDB* db,
938 const std::map<std::string, std::string>& inmap,
939 std::map<std::string, std::string>& outmap) {
940 uint32_t thid = sess->thread_id();
941 if (!db) {
942 set_message(outmap, "ERROR", "no such database");
943 return kt::RPCClient::RVEINVALID;
944 }
945 const char* rp = kt::strmapget(inmap, "hard");
946 bool hard = rp ? true : false;
947 rp = kt::strmapget(inmap, "command");
948 std::string command = rp ? rp : "";
949 class Visitor : public kc::BasicDB::FileProcessor {
950 public:
951 Visitor(kt::RPCServer* serv, Worker* worker, const std::string& command) :
952 serv_(serv), worker_(worker), command_(command) {}
953 private:
954 bool process(const std::string& path, int64_t count, int64_t size) {
955 if (command_.size() < 1) return true;
956 const char* cmd = command_.c_str();
957 if (std::strchr(cmd, kc::File::PATHCHR) || !std::strcmp(cmd, kc::File::CDIRSTR) ||
958 !std::strcmp(cmd, kc::File::PDIRSTR)) {
959 serv_->log(Logger::INFO, "invalid command name: %s", cmd);
960 return false;
961 }
962 std::string cmdpath;
963 kc::strprintf(&cmdpath, "%s%c%s", worker_->cmdpath_, kc::File::PATHCHR, cmd);
964 std::vector<std::string> args;
965 args.push_back(cmdpath);
966 args.push_back(path);
967 std::string tsstr;
968 uint64_t cc = worker_->ulog_ ? worker_->ulog_->clock() : kt::UpdateLogger::clock_pure();
969 if (!worker_->slave_->host().empty()) {
970 uint64_t rts = worker_->slave_->rts();
971 if (rts < cc) cc = rts;
972 }
973 kc::strprintf(&tsstr, "%020llu", (unsigned long long)cc);
974 args.push_back(tsstr);
975 serv_->log(Logger::SYSTEM, "executing: %s \"%s\"", cmd, path.c_str());
976 if (kt::executecommand(args) != 0) {
977 serv_->log(Logger::ERROR, "execution failed: %s \"%s\"", cmd, path.c_str());
978 return false;
979 }
980 return true;
981 }
982 kt::RPCServer* serv_;
983 Worker* worker_;
984 std::string command_;
985 };
986 Visitor visitor(serv, this, command);
987 RV rv;
988 opcounts_[thid][CNTMISC]++;
989 if (db->synchronize(hard, &visitor)) {
990 rv = kt::RPCClient::RVSUCCESS;
991 } else {
992 const kc::BasicDB::Error& e = db->error();
993 set_db_error(outmap, e);
994 log_db_error(serv, e);
995 rv = kt::RPCClient::RVEINTERNAL;
996 }
997 return rv;
998 }
999 // process the set procedure
do_set(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1000 RV do_set(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1001 kt::TimedDB* db,
1002 const std::map<std::string, std::string>& inmap,
1003 std::map<std::string, std::string>& outmap) {
1004 uint32_t thid = sess->thread_id();
1005 if (!db) {
1006 set_message(outmap, "ERROR", "no such database");
1007 return kt::RPCClient::RVEINVALID;
1008 }
1009 size_t ksiz;
1010 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1011 size_t vsiz;
1012 const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1013 if (!kbuf || !vbuf) {
1014 set_message(outmap, "ERROR", "invalid parameters");
1015 return kt::RPCClient::RVEINVALID;
1016 }
1017 const char* rp = kt::strmapget(inmap, "xt");
1018 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1019 RV rv;
1020 opcounts_[thid][CNTSET]++;
1021 if (db->set(kbuf, ksiz, vbuf, vsiz, xt)) {
1022 rv = kt::RPCClient::RVSUCCESS;
1023 } else {
1024 opcounts_[thid][CNTSETMISS]++;
1025 const kc::BasicDB::Error& e = db->error();
1026 set_db_error(outmap, e);
1027 log_db_error(serv, e);
1028 rv = kt::RPCClient::RVEINTERNAL;
1029 }
1030 return rv;
1031 }
1032 // process the add procedure
do_add(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1033 RV do_add(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1034 kt::TimedDB* db,
1035 const std::map<std::string, std::string>& inmap,
1036 std::map<std::string, std::string>& outmap) {
1037 uint32_t thid = sess->thread_id();
1038 if (!db) {
1039 set_message(outmap, "ERROR", "no such database");
1040 return kt::RPCClient::RVEINVALID;
1041 }
1042 size_t ksiz;
1043 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1044 size_t vsiz;
1045 const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1046 if (!kbuf || !vbuf) {
1047 set_message(outmap, "ERROR", "invalid parameters");
1048 return kt::RPCClient::RVEINVALID;
1049 }
1050 const char* rp = kt::strmapget(inmap, "xt");
1051 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1052 RV rv;
1053 opcounts_[thid][CNTSET]++;
1054 if (db->add(kbuf, ksiz, vbuf, vsiz, xt)) {
1055 rv = kt::RPCClient::RVSUCCESS;
1056 } else {
1057 opcounts_[thid][CNTSETMISS]++;
1058 const kc::BasicDB::Error& e = db->error();
1059 set_db_error(outmap, e);
1060 if (e == kc::BasicDB::Error::DUPREC) {
1061 rv = kt::RPCClient::RVELOGIC;
1062 } else {
1063 log_db_error(serv, e);
1064 rv = kt::RPCClient::RVEINTERNAL;
1065 }
1066 }
1067 return rv;
1068 }
1069 // process the replace procedure
do_replace(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1070 RV do_replace(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1071 kt::TimedDB* db,
1072 const std::map<std::string, std::string>& inmap,
1073 std::map<std::string, std::string>& outmap) {
1074 uint32_t thid = sess->thread_id();
1075 if (!db) {
1076 set_message(outmap, "ERROR", "no such database");
1077 return kt::RPCClient::RVEINVALID;
1078 }
1079 size_t ksiz;
1080 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1081 size_t vsiz;
1082 const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1083 if (!kbuf || !vbuf) {
1084 set_message(outmap, "ERROR", "invalid parameters");
1085 return kt::RPCClient::RVEINVALID;
1086 }
1087 const char* rp = kt::strmapget(inmap, "xt");
1088 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1089 RV rv;
1090 opcounts_[thid][CNTSET]++;
1091 if (db->replace(kbuf, ksiz, vbuf, vsiz, xt)) {
1092 rv = kt::RPCClient::RVSUCCESS;
1093 } else {
1094 opcounts_[thid][CNTSETMISS]++;
1095 const kc::BasicDB::Error& e = db->error();
1096 set_db_error(outmap, e);
1097 if (e == kc::BasicDB::Error::NOREC) {
1098 rv = kt::RPCClient::RVELOGIC;
1099 } else {
1100 log_db_error(serv, e);
1101 rv = kt::RPCClient::RVEINTERNAL;
1102 }
1103 }
1104 return rv;
1105 }
1106 // process the append procedure
do_append(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1107 RV do_append(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1108 kt::TimedDB* db,
1109 const std::map<std::string, std::string>& inmap,
1110 std::map<std::string, std::string>& outmap) {
1111 uint32_t thid = sess->thread_id();
1112 if (!db) {
1113 set_message(outmap, "ERROR", "no such database");
1114 return kt::RPCClient::RVEINVALID;
1115 }
1116 size_t ksiz;
1117 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1118 size_t vsiz;
1119 const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1120 if (!kbuf || !vbuf) {
1121 set_message(outmap, "ERROR", "invalid parameters");
1122 return kt::RPCClient::RVEINVALID;
1123 }
1124 const char* rp = kt::strmapget(inmap, "xt");
1125 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1126 RV rv;
1127 opcounts_[thid][CNTSET]++;
1128 if (db->append(kbuf, ksiz, vbuf, vsiz, xt)) {
1129 rv = kt::RPCClient::RVSUCCESS;
1130 } else {
1131 opcounts_[thid][CNTSETMISS]++;
1132 const kc::BasicDB::Error& e = db->error();
1133 set_db_error(outmap, e);
1134 log_db_error(serv, e);
1135 rv = kt::RPCClient::RVEINTERNAL;
1136 }
1137 return rv;
1138 }
1139 // process the increment procedure
do_increment(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1140 RV do_increment(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1141 kt::TimedDB* db,
1142 const std::map<std::string, std::string>& inmap,
1143 std::map<std::string, std::string>& outmap) {
1144 uint32_t thid = sess->thread_id();
1145 if (!db) {
1146 set_message(outmap, "ERROR", "no such database");
1147 return kt::RPCClient::RVEINVALID;
1148 }
1149 size_t ksiz;
1150 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1151 const char* nstr = kt::strmapget(inmap, "num");
1152 if (!kbuf || !nstr) {
1153 set_message(outmap, "ERROR", "invalid parameters");
1154 return kt::RPCClient::RVEINVALID;
1155 }
1156 int64_t num = kc::atoi(nstr);
1157 const char* rp = kt::strmapget(inmap, "orig");
1158 int64_t orig;
1159 if (rp) {
1160 if (!std::strcmp(rp, "try")) {
1161 orig = kc::INT64MIN;
1162 } else if (!std::strcmp(rp, "set")) {
1163 orig = kc::INT64MAX;
1164 } else {
1165 orig = kc::atoi(rp);
1166 }
1167 } else {
1168 orig = 0;
1169 }
1170 rp = kt::strmapget(inmap, "xt");
1171 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1172 RV rv;
1173 opcounts_[thid][CNTSET]++;
1174 num = db->increment(kbuf, ksiz, num, orig, xt);
1175 if (num != kc::INT64MIN) {
1176 rv = kt::RPCClient::RVSUCCESS;
1177 set_message(outmap, "num", "%lld", (long long)num);
1178 } else {
1179 opcounts_[thid][CNTSETMISS]++;
1180 const kc::BasicDB::Error& e = db->error();
1181 set_db_error(outmap, e);
1182 if (e == kc::BasicDB::Error::LOGIC) {
1183 rv = kt::RPCClient::RVELOGIC;
1184 } else {
1185 log_db_error(serv, e);
1186 rv = kt::RPCClient::RVEINTERNAL;
1187 }
1188 }
1189 return rv;
1190 }
1191 // process the increment_double procedure
do_increment_double(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1192 RV do_increment_double(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1193 kt::TimedDB* db,
1194 const std::map<std::string, std::string>& inmap,
1195 std::map<std::string, std::string>& outmap) {
1196 uint32_t thid = sess->thread_id();
1197 if (!db) {
1198 set_message(outmap, "ERROR", "no such database");
1199 return kt::RPCClient::RVEINVALID;
1200 }
1201 size_t ksiz;
1202 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1203 const char* nstr = kt::strmapget(inmap, "num");
1204 if (!kbuf || !nstr) {
1205 set_message(outmap, "ERROR", "invalid parameters");
1206 return kt::RPCClient::RVEINVALID;
1207 }
1208 double num = kc::atof(nstr);
1209 const char* rp = kt::strmapget(inmap, "orig");
1210 double orig;
1211 if (rp) {
1212 if (!std::strcmp(rp, "try")) {
1213 orig = -kc::inf();
1214 } else if (!std::strcmp(rp, "set")) {
1215 orig = kc::inf();
1216 } else {
1217 orig = kc::atof(rp);
1218 }
1219 } else {
1220 orig = 0;
1221 }
1222 rp = kt::strmapget(inmap, "xt");
1223 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1224 RV rv;
1225 opcounts_[thid][CNTSET]++;
1226 num = db->increment_double(kbuf, ksiz, num, orig, xt);
1227 if (!kc::chknan(num)) {
1228 rv = kt::RPCClient::RVSUCCESS;
1229 set_message(outmap, "num", "%f", num);
1230 } else {
1231 opcounts_[thid][CNTSETMISS]++;
1232 const kc::BasicDB::Error& e = db->error();
1233 set_db_error(outmap, e);
1234 if (e == kc::BasicDB::Error::LOGIC) {
1235 rv = kt::RPCClient::RVELOGIC;
1236 } else {
1237 log_db_error(serv, e);
1238 rv = kt::RPCClient::RVEINTERNAL;
1239 }
1240 }
1241 return rv;
1242 }
1243 // process the cas procedure
do_cas(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1244 RV do_cas(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1245 kt::TimedDB* db,
1246 const std::map<std::string, std::string>& inmap,
1247 std::map<std::string, std::string>& outmap) {
1248 uint32_t thid = sess->thread_id();
1249 if (!db) {
1250 set_message(outmap, "ERROR", "no such database");
1251 return kt::RPCClient::RVEINVALID;
1252 }
1253 size_t ksiz;
1254 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1255 if (!kbuf) {
1256 set_message(outmap, "ERROR", "invalid parameters");
1257 return kt::RPCClient::RVEINVALID;
1258 }
1259 size_t ovsiz;
1260 const char* ovbuf = kt::strmapget(inmap, "oval", &ovsiz);
1261 size_t nvsiz;
1262 const char* nvbuf = kt::strmapget(inmap, "nval", &nvsiz);
1263 const char* rp = kt::strmapget(inmap, "xt");
1264 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1265 RV rv;
1266 opcounts_[thid][CNTSET]++;
1267 if (db->cas(kbuf, ksiz, ovbuf, ovsiz, nvbuf, nvsiz, xt)) {
1268 rv = kt::RPCClient::RVSUCCESS;
1269 } else {
1270 opcounts_[thid][CNTSETMISS]++;
1271 const kc::BasicDB::Error& e = db->error();
1272 set_db_error(outmap, e);
1273 if (e == kc::BasicDB::Error::LOGIC) {
1274 rv = kt::RPCClient::RVELOGIC;
1275 } else {
1276 log_db_error(serv, e);
1277 rv = kt::RPCClient::RVEINTERNAL;
1278 }
1279 }
1280 return rv;
1281 }
1282 // process the remove procedure
do_remove(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1283 RV do_remove(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1284 kt::TimedDB* db,
1285 const std::map<std::string, std::string>& inmap,
1286 std::map<std::string, std::string>& outmap) {
1287 uint32_t thid = sess->thread_id();
1288 if (!db) {
1289 set_message(outmap, "ERROR", "no such database");
1290 return kt::RPCClient::RVEINVALID;
1291 }
1292 size_t ksiz;
1293 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1294 if (!kbuf) {
1295 set_message(outmap, "ERROR", "invalid parameters");
1296 return kt::RPCClient::RVEINVALID;
1297 }
1298 RV rv;
1299 opcounts_[thid][CNTREMOVE]++;
1300 if (db->remove(kbuf, ksiz)) {
1301 rv = kt::RPCClient::RVSUCCESS;
1302 } else {
1303 opcounts_[thid][CNTREMOVEMISS]++;
1304 const kc::BasicDB::Error& e = db->error();
1305 set_db_error(outmap, e);
1306 if (e == kc::BasicDB::Error::NOREC) {
1307 rv = kt::RPCClient::RVELOGIC;
1308 } else {
1309 log_db_error(serv, e);
1310 rv = kt::RPCClient::RVEINTERNAL;
1311 }
1312 }
1313 return rv;
1314 }
1315 // process the get procedure
do_get(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1316 RV do_get(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1317 kt::TimedDB* db,
1318 const std::map<std::string, std::string>& inmap,
1319 std::map<std::string, std::string>& outmap) {
1320 uint32_t thid = sess->thread_id();
1321 if (!db) {
1322 set_message(outmap, "ERROR", "no such database");
1323 return kt::RPCClient::RVEINVALID;
1324 }
1325 size_t ksiz;
1326 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1327 if (!kbuf) {
1328 set_message(outmap, "ERROR", "invalid parameters");
1329 return kt::RPCClient::RVEINVALID;
1330 }
1331 RV rv;
1332 opcounts_[thid][CNTGET]++;
1333 size_t vsiz;
1334 int64_t xt;
1335 const char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
1336 if (vbuf) {
1337 outmap["value"] = std::string(vbuf, vsiz);
1338 if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
1339 delete[] vbuf;
1340 rv = kt::RPCClient::RVSUCCESS;
1341 } else {
1342 opcounts_[thid][CNTGETMISS]++;
1343 const kc::BasicDB::Error& e = db->error();
1344 set_db_error(outmap, e);
1345 if (e == kc::BasicDB::Error::NOREC) {
1346 rv = kt::RPCClient::RVELOGIC;
1347 } else {
1348 log_db_error(serv, e);
1349 rv = kt::RPCClient::RVEINTERNAL;
1350 }
1351 }
1352 return rv;
1353 }
1354 // process the check procedure
do_check(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1355 RV do_check(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1356 kt::TimedDB* db,
1357 const std::map<std::string, std::string>& inmap,
1358 std::map<std::string, std::string>& outmap) {
1359 uint32_t thid = sess->thread_id();
1360 if (!db) {
1361 set_message(outmap, "ERROR", "no such database");
1362 return kt::RPCClient::RVEINVALID;
1363 }
1364 size_t ksiz;
1365 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1366 if (!kbuf) {
1367 set_message(outmap, "ERROR", "invalid parameters");
1368 return kt::RPCClient::RVEINVALID;
1369 }
1370 RV rv;
1371 opcounts_[thid][CNTGET]++;
1372 int64_t xt;
1373 int32_t vsiz = db->check(kbuf, ksiz, &xt);
1374 if (vsiz >= 0) {
1375 set_message(outmap, "vsiz", "%lld", (long long)vsiz);
1376 if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
1377 rv = kt::RPCClient::RVSUCCESS;
1378 } else {
1379 opcounts_[thid][CNTGETMISS]++;
1380 const kc::BasicDB::Error& e = db->error();
1381 set_db_error(outmap, e);
1382 if (e == kc::BasicDB::Error::NOREC) {
1383 rv = kt::RPCClient::RVELOGIC;
1384 } else {
1385 log_db_error(serv, e);
1386 rv = kt::RPCClient::RVEINTERNAL;
1387 }
1388 }
1389 return rv;
1390 }
1391 // process the seize procedure
do_seize(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1392 RV do_seize(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1393 kt::TimedDB* db,
1394 const std::map<std::string, std::string>& inmap,
1395 std::map<std::string, std::string>& outmap) {
1396 uint32_t thid = sess->thread_id();
1397 if (!db) {
1398 set_message(outmap, "ERROR", "no such database");
1399 return kt::RPCClient::RVEINVALID;
1400 }
1401 size_t ksiz;
1402 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1403 if (!kbuf) {
1404 set_message(outmap, "ERROR", "invalid parameters");
1405 return kt::RPCClient::RVEINVALID;
1406 }
1407 RV rv;
1408 opcounts_[thid][CNTREMOVE]++;
1409 opcounts_[thid][CNTGET]++;
1410 size_t vsiz;
1411 int64_t xt;
1412 const char* vbuf = db->seize(kbuf, ksiz, &vsiz, &xt);
1413 if (vbuf) {
1414 outmap["value"] = std::string(vbuf, vsiz);
1415 if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
1416 delete[] vbuf;
1417 rv = kt::RPCClient::RVSUCCESS;
1418 } else {
1419 opcounts_[thid][CNTREMOVEMISS]++;
1420 opcounts_[thid][CNTGETMISS]++;
1421 const kc::BasicDB::Error& e = db->error();
1422 set_db_error(outmap, e);
1423 if (e == kc::BasicDB::Error::NOREC) {
1424 rv = kt::RPCClient::RVELOGIC;
1425 } else {
1426 log_db_error(serv, e);
1427 rv = kt::RPCClient::RVEINTERNAL;
1428 }
1429 }
1430 return rv;
1431 }
1432 // process the set_bulk procedure
do_set_bulk(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1433 RV do_set_bulk(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1434 kt::TimedDB* db,
1435 const std::map<std::string, std::string>& inmap,
1436 std::map<std::string, std::string>& outmap) {
1437 uint32_t thid = sess->thread_id();
1438 if (!db) {
1439 set_message(outmap, "ERROR", "no such database");
1440 return kt::RPCClient::RVEINVALID;
1441 }
1442 const char* rp = kt::strmapget(inmap, "xt");
1443 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1444 rp = kt::strmapget(inmap, "atomic");
1445 bool atomic = rp ? true : false;
1446 std::map<std::string, std::string> recs;
1447 std::map<std::string, std::string>::const_iterator it = inmap.begin();
1448 std::map<std::string, std::string>::const_iterator itend = inmap.end();
1449 while (it != itend) {
1450 const char* kbuf = it->first.data();
1451 size_t ksiz = it->first.size();
1452 if (ksiz > 0 && *kbuf == '_') {
1453 std::string key(kbuf + 1, ksiz - 1);
1454 std::string value(it->second.data(), it->second.size());
1455 recs[key] = value;
1456 }
1457 ++it;
1458 }
1459 RV rv;
1460 opcounts_[thid][CNTSET] += recs.size();
1461 int64_t num = db->set_bulk(recs, xt, atomic);
1462 if (num >= 0) {
1463 opcounts_[thid][CNTSETMISS] += recs.size() - (size_t)num;
1464 rv = kt::RPCClient::RVSUCCESS;
1465 set_message(outmap, "num", "%lld", (long long)num);
1466 } else {
1467 opcounts_[thid][CNTSETMISS] += recs.size();
1468 const kc::BasicDB::Error& e = db->error();
1469 set_db_error(outmap, e);
1470 log_db_error(serv, e);
1471 rv = kt::RPCClient::RVEINTERNAL;
1472 }
1473 return rv;
1474 }
1475 // process the remove_bulk procedure
do_remove_bulk(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1476 RV do_remove_bulk(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1477 kt::TimedDB* db,
1478 const std::map<std::string, std::string>& inmap,
1479 std::map<std::string, std::string>& outmap) {
1480 uint32_t thid = sess->thread_id();
1481 if (!db) {
1482 set_message(outmap, "ERROR", "no such database");
1483 return kt::RPCClient::RVEINVALID;
1484 }
1485 const char* rp = kt::strmapget(inmap, "atomic");
1486 bool atomic = rp ? true : false;
1487 std::vector<std::string> keys;
1488 keys.reserve(inmap.size());
1489 std::map<std::string, std::string>::const_iterator it = inmap.begin();
1490 std::map<std::string, std::string>::const_iterator itend = inmap.end();
1491 while (it != itend) {
1492 const char* kbuf = it->first.data();
1493 size_t ksiz = it->first.size();
1494 if (ksiz > 0 && *kbuf == '_') {
1495 std::string key(kbuf + 1, ksiz - 1);
1496 keys.push_back(key);
1497 }
1498 ++it;
1499 }
1500 RV rv;
1501 opcounts_[thid][CNTREMOVE] += keys.size();
1502 int64_t num = db->remove_bulk(keys, atomic);
1503 if (num >= 0) {
1504 opcounts_[thid][CNTREMOVEMISS] += keys.size() - (size_t)num;
1505 rv = kt::RPCClient::RVSUCCESS;
1506 set_message(outmap, "num", "%lld", (long long)num);
1507 } else {
1508 opcounts_[thid][CNTREMOVEMISS] += keys.size();
1509 const kc::BasicDB::Error& e = db->error();
1510 set_db_error(outmap, e);
1511 log_db_error(serv, e);
1512 rv = kt::RPCClient::RVEINTERNAL;
1513 }
1514 return rv;
1515 }
1516 // process the get_bulk procedure
do_get_bulk(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1517 RV do_get_bulk(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1518 kt::TimedDB* db,
1519 const std::map<std::string, std::string>& inmap,
1520 std::map<std::string, std::string>& outmap) {
1521 uint32_t thid = sess->thread_id();
1522 if (!db) {
1523 set_message(outmap, "ERROR", "no such database");
1524 return kt::RPCClient::RVEINVALID;
1525 }
1526 const char* rp = kt::strmapget(inmap, "atomic");
1527 bool atomic = rp ? true : false;
1528 std::vector<std::string> keys;
1529 keys.reserve(inmap.size());
1530 std::map<std::string, std::string>::const_iterator it = inmap.begin();
1531 std::map<std::string, std::string>::const_iterator itend = inmap.end();
1532 while (it != itend) {
1533 const char* kbuf = it->first.data();
1534 size_t ksiz = it->first.size();
1535 if (ksiz > 0 && *kbuf == '_') {
1536 std::string key(kbuf + 1, ksiz - 1);
1537 keys.push_back(key);
1538 }
1539 ++it;
1540 }
1541 RV rv;
1542 opcounts_[thid][CNTGET] += keys.size();
1543 std::map<std::string, std::string> recs;
1544 int64_t num = db->get_bulk(keys, &recs, atomic);
1545 if (num >= 0) {
1546 opcounts_[thid][CNTGETMISS] += keys.size() - (size_t)num;
1547 rv = kt::RPCClient::RVSUCCESS;
1548 std::map<std::string, std::string>::iterator it = recs.begin();
1549 std::map<std::string, std::string>::iterator itend = recs.end();
1550 while (it != itend) {
1551 std::string key("_");
1552 key.append(it->first);
1553 outmap[key] = it->second;
1554 ++it;
1555 }
1556 set_message(outmap, "num", "%lld", (long long)num);
1557 } else {
1558 opcounts_[thid][CNTGETMISS] += keys.size();
1559 const kc::BasicDB::Error& e = db->error();
1560 set_db_error(outmap, e);
1561 log_db_error(serv, e);
1562 rv = kt::RPCClient::RVEINTERNAL;
1563 }
1564 return rv;
1565 }
1566 // process the vacuum procedure
do_vacuum(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1567 RV do_vacuum(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1568 kt::TimedDB* db,
1569 const std::map<std::string, std::string>& inmap,
1570 std::map<std::string, std::string>& outmap) {
1571 uint32_t thid = sess->thread_id();
1572 if (!db) {
1573 set_message(outmap, "ERROR", "no such database");
1574 return kt::RPCClient::RVEINVALID;
1575 }
1576 const char* rp = kt::strmapget(inmap, "step");
1577 int64_t step = rp ? kc::atoi(rp) : 0;
1578 RV rv;
1579 opcounts_[thid][CNTMISC]++;
1580 if (db->vacuum(step)) {
1581 rv = kt::RPCClient::RVSUCCESS;
1582 } else {
1583 const kc::BasicDB::Error& e = db->error();
1584 set_db_error(outmap, e);
1585 log_db_error(serv, e);
1586 rv = kt::RPCClient::RVEINTERNAL;
1587 }
1588 return rv;
1589 }
1590 // process the match_prefix procedure
do_match_prefix(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1591 RV do_match_prefix(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1592 kt::TimedDB* db,
1593 const std::map<std::string, std::string>& inmap,
1594 std::map<std::string, std::string>& outmap) {
1595 uint32_t thid = sess->thread_id();
1596 if (!db) {
1597 set_message(outmap, "ERROR", "no such database");
1598 return kt::RPCClient::RVEINVALID;
1599 }
1600 size_t psiz;
1601 const char* pbuf = kt::strmapget(inmap, "prefix", &psiz);
1602 if (!pbuf) {
1603 set_message(outmap, "ERROR", "invalid parameters");
1604 return kt::RPCClient::RVEINVALID;
1605 }
1606 const char* rp = kt::strmapget(inmap, "max");
1607 int64_t max = rp ? kc::atoi(rp) : -1;
1608 std::vector<std::string> keys;
1609 RV rv;
1610 opcounts_[thid][CNTMISC]++;
1611 int64_t num = db->match_prefix(std::string(pbuf, psiz), &keys, max);
1612 if (num >= 0) {
1613 std::vector<std::string>::iterator it = keys.begin();
1614 std::vector<std::string>::iterator itend = keys.end();
1615 int64_t cnt = 0;
1616 while (it != itend) {
1617 std::string key = "_";
1618 key.append(*it);
1619 outmap[key] = kc::strprintf("%lld", (long long)cnt);
1620 ++cnt;
1621 ++it;
1622 }
1623 set_message(outmap, "num", "%lld", (long long)num);
1624 rv = kt::RPCClient::RVSUCCESS;
1625 } else {
1626 const kc::BasicDB::Error& e = db->error();
1627 set_db_error(outmap, e);
1628 log_db_error(serv, e);
1629 rv = kt::RPCClient::RVEINTERNAL;
1630 }
1631 return rv;
1632 }
1633 // process the match_regex procedure
do_match_regex(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1634 RV do_match_regex(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1635 kt::TimedDB* db,
1636 const std::map<std::string, std::string>& inmap,
1637 std::map<std::string, std::string>& outmap) {
1638 uint32_t thid = sess->thread_id();
1639 if (!db) {
1640 set_message(outmap, "ERROR", "no such database");
1641 return kt::RPCClient::RVEINVALID;
1642 }
1643 size_t psiz;
1644 const char* pbuf = kt::strmapget(inmap, "regex", &psiz);
1645 if (!pbuf) {
1646 set_message(outmap, "ERROR", "invalid parameters");
1647 return kt::RPCClient::RVEINVALID;
1648 }
1649 const char* rp = kt::strmapget(inmap, "max");
1650 int64_t max = rp ? kc::atoi(rp) : -1;
1651 std::vector<std::string> keys;
1652 RV rv;
1653 opcounts_[thid][CNTMISC]++;
1654 int64_t num = db->match_regex(std::string(pbuf, psiz), &keys, max);
1655 if (num >= 0) {
1656 std::vector<std::string>::iterator it = keys.begin();
1657 std::vector<std::string>::iterator itend = keys.end();
1658 int64_t cnt = 0;
1659 while (it != itend) {
1660 std::string key = "_";
1661 key.append(*it);
1662 outmap[key] = kc::strprintf("%lld", (long long)cnt);
1663 ++cnt;
1664 ++it;
1665 }
1666 set_message(outmap, "num", "%lld", (long long)num);
1667 rv = kt::RPCClient::RVSUCCESS;
1668 } else {
1669 const kc::BasicDB::Error& e = db->error();
1670 set_db_error(outmap, e);
1671 if (e == kc::BasicDB::Error::LOGIC) {
1672 rv = kt::RPCClient::RVELOGIC;
1673 } else {
1674 log_db_error(serv, e);
1675 rv = kt::RPCClient::RVEINTERNAL;
1676 }
1677 }
1678 return rv;
1679 }
1680 // process the match_similar procedure
do_match_similar(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB * db,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1681 RV do_match_similar(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1682 kt::TimedDB* db,
1683 const std::map<std::string, std::string>& inmap,
1684 std::map<std::string, std::string>& outmap) {
1685 uint32_t thid = sess->thread_id();
1686 if (!db) {
1687 set_message(outmap, "ERROR", "no such database");
1688 return kt::RPCClient::RVEINVALID;
1689 }
1690 size_t osiz;
1691 const char* obuf = kt::strmapget(inmap, "origin", &osiz);
1692 if (!obuf) {
1693 set_message(outmap, "ERROR", "invalid parameters");
1694 return kt::RPCClient::RVEINVALID;
1695 }
1696 const char* rp = kt::strmapget(inmap, "range");
1697 int64_t range = rp ? kc::atoi(rp) : 1;
1698 if (range < 0) range = 1;
1699 rp = kt::strmapget(inmap, "utf");
1700 bool utf = rp ? true : false;
1701 rp = kt::strmapget(inmap, "max");
1702 int64_t max = rp ? kc::atoi(rp) : -1;
1703 std::vector<std::string> keys;
1704 RV rv;
1705 opcounts_[thid][CNTMISC]++;
1706 int64_t num = db->match_similar(std::string(obuf, osiz), range, utf, &keys, max);
1707 if (num >= 0) {
1708 std::vector<std::string>::iterator it = keys.begin();
1709 std::vector<std::string>::iterator itend = keys.end();
1710 int64_t cnt = 0;
1711 while (it != itend) {
1712 std::string key = "_";
1713 key.append(*it);
1714 outmap[key] = kc::strprintf("%lld", (long long)cnt);
1715 ++cnt;
1716 ++it;
1717 }
1718 set_message(outmap, "num", "%lld", (long long)num);
1719 rv = kt::RPCClient::RVSUCCESS;
1720 } else {
1721 const kc::BasicDB::Error& e = db->error();
1722 set_db_error(outmap, e);
1723 if (e == kc::BasicDB::Error::LOGIC) {
1724 rv = kt::RPCClient::RVELOGIC;
1725 } else {
1726 log_db_error(serv, e);
1727 rv = kt::RPCClient::RVEINTERNAL;
1728 }
1729 }
1730 return rv;
1731 }
1732 // process the cur_jump procedure
do_cur_jump(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1733 RV do_cur_jump(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1734 kt::TimedDB::Cursor* cur,
1735 const std::map<std::string, std::string>& inmap,
1736 std::map<std::string, std::string>& outmap) {
1737 uint32_t thid = sess->thread_id();
1738 if (!cur) {
1739 set_message(outmap, "ERROR", "no such cursor");
1740 return kt::RPCClient::RVEINVALID;
1741 }
1742 size_t ksiz;
1743 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1744 RV rv;
1745 opcounts_[thid][CNTMISC]++;
1746 if (kbuf) {
1747 if (cur->jump(kbuf, ksiz)) {
1748 rv = kt::RPCClient::RVSUCCESS;
1749 } else {
1750 const kc::BasicDB::Error& e = cur->error();
1751 set_db_error(outmap, e);
1752 if (e == kc::BasicDB::Error::NOREC) {
1753 rv = kt::RPCClient::RVELOGIC;
1754 } else {
1755 log_db_error(serv, e);
1756 rv = kt::RPCClient::RVEINTERNAL;
1757 }
1758 }
1759 } else {
1760 if (cur->jump()) {
1761 rv = kt::RPCClient::RVSUCCESS;
1762 } else {
1763 const kc::BasicDB::Error& e = cur->error();
1764 set_db_error(outmap, e);
1765 if (e == kc::BasicDB::Error::NOREC) {
1766 rv = kt::RPCClient::RVELOGIC;
1767 } else {
1768 log_db_error(serv, e);
1769 rv = kt::RPCClient::RVEINTERNAL;
1770 }
1771 }
1772 }
1773 return rv;
1774 }
1775 // process the cur_jump_back procedure
do_cur_jump_back(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1776 RV do_cur_jump_back(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1777 kt::TimedDB::Cursor* cur,
1778 const std::map<std::string, std::string>& inmap,
1779 std::map<std::string, std::string>& outmap) {
1780 uint32_t thid = sess->thread_id();
1781 if (!cur) {
1782 set_message(outmap, "ERROR", "no such cursor");
1783 return kt::RPCClient::RVEINVALID;
1784 }
1785 size_t ksiz;
1786 const char* kbuf = kt::strmapget(inmap, "key", &ksiz);
1787 RV rv;
1788 opcounts_[thid][CNTMISC]++;
1789 if (kbuf) {
1790 if (cur->jump_back(kbuf, ksiz)) {
1791 rv = kt::RPCClient::RVSUCCESS;
1792 } else {
1793 const kc::BasicDB::Error& e = cur->error();
1794 set_db_error(outmap, e);
1795 switch (e.code()) {
1796 case kc::BasicDB::Error::NOIMPL: {
1797 rv = kt::RPCClient::RVENOIMPL;
1798 break;
1799 }
1800 case kc::BasicDB::Error::NOREC: {
1801 rv = kt::RPCClient::RVELOGIC;
1802 break;
1803 }
1804 default: {
1805 log_db_error(serv, e);
1806 rv = kt::RPCClient::RVEINTERNAL;
1807 break;
1808 }
1809 }
1810 }
1811 } else {
1812 if (cur->jump_back()) {
1813 rv = kt::RPCClient::RVSUCCESS;
1814 } else {
1815 const kc::BasicDB::Error& e = cur->error();
1816 set_db_error(outmap, e);
1817 switch (e.code()) {
1818 case kc::BasicDB::Error::NOIMPL: {
1819 rv = kt::RPCClient::RVENOIMPL;
1820 break;
1821 }
1822 case kc::BasicDB::Error::NOREC: {
1823 rv = kt::RPCClient::RVELOGIC;
1824 break;
1825 }
1826 default: {
1827 log_db_error(serv, e);
1828 rv = kt::RPCClient::RVEINTERNAL;
1829 break;
1830 }
1831 }
1832 }
1833 }
1834 return rv;
1835 }
1836 // process the cur_step procedure
do_cur_step(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1837 RV do_cur_step(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1838 kt::TimedDB::Cursor* cur,
1839 const std::map<std::string, std::string>& inmap,
1840 std::map<std::string, std::string>& outmap) {
1841 uint32_t thid = sess->thread_id();
1842 if (!cur) {
1843 set_message(outmap, "ERROR", "no such cursor");
1844 return kt::RPCClient::RVEINVALID;
1845 }
1846 RV rv;
1847 opcounts_[thid][CNTMISC]++;
1848 if (cur->step()) {
1849 rv = kt::RPCClient::RVSUCCESS;
1850 } else {
1851 const kc::BasicDB::Error& e = cur->error();
1852 set_db_error(outmap, e);
1853 if (e == kc::BasicDB::Error::NOREC) {
1854 rv = kt::RPCClient::RVELOGIC;
1855 } else {
1856 log_db_error(serv, e);
1857 rv = kt::RPCClient::RVEINTERNAL;
1858 }
1859 }
1860 return rv;
1861 }
1862 // process the cur_step_back procedure
do_cur_step_back(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1863 RV do_cur_step_back(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1864 kt::TimedDB::Cursor* cur,
1865 const std::map<std::string, std::string>& inmap,
1866 std::map<std::string, std::string>& outmap) {
1867 uint32_t thid = sess->thread_id();
1868 if (!cur) {
1869 set_message(outmap, "ERROR", "no such cursor");
1870 return kt::RPCClient::RVEINVALID;
1871 }
1872 RV rv;
1873 opcounts_[thid][CNTMISC]++;
1874 if (cur->step_back()) {
1875 rv = kt::RPCClient::RVSUCCESS;
1876 } else {
1877 const kc::BasicDB::Error& e = cur->error();
1878 set_db_error(outmap, e);
1879 switch (e.code()) {
1880 case kc::BasicDB::Error::NOIMPL: {
1881 rv = kt::RPCClient::RVENOIMPL;
1882 break;
1883 }
1884 case kc::BasicDB::Error::NOREC: {
1885 rv = kt::RPCClient::RVELOGIC;
1886 break;
1887 }
1888 default: {
1889 log_db_error(serv, e);
1890 rv = kt::RPCClient::RVEINTERNAL;
1891 break;
1892 }
1893 }
1894 }
1895 return rv;
1896 }
1897 // process the cur_set_value procedure
do_cur_set_value(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1898 RV do_cur_set_value(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1899 kt::TimedDB::Cursor* cur,
1900 const std::map<std::string, std::string>& inmap,
1901 std::map<std::string, std::string>& outmap) {
1902 uint32_t thid = sess->thread_id();
1903 if (!cur) {
1904 set_message(outmap, "ERROR", "no such cursor");
1905 return kt::RPCClient::RVEINVALID;
1906 }
1907 size_t vsiz;
1908 const char* vbuf = kt::strmapget(inmap, "value", &vsiz);
1909 if (!vbuf) {
1910 set_message(outmap, "ERROR", "invalid parameters");
1911 return kt::RPCClient::RVEINVALID;
1912 }
1913 const char* rp = kt::strmapget(inmap, "step");
1914 bool step = rp ? true : false;
1915 rp = kt::strmapget(inmap, "xt");
1916 int64_t xt = rp ? kc::atoi(rp) : kc::INT64MAX;
1917 RV rv;
1918 opcounts_[thid][CNTSET]++;
1919 if (cur->set_value(vbuf, vsiz, xt, step)) {
1920 rv = kt::RPCClient::RVSUCCESS;
1921 } else {
1922 opcounts_[thid][CNTSETMISS]++;
1923 const kc::BasicDB::Error& e = cur->error();
1924 set_db_error(outmap, e);
1925 if (e == kc::BasicDB::Error::NOREC) {
1926 rv = kt::RPCClient::RVELOGIC;
1927 } else {
1928 log_db_error(serv, e);
1929 rv = kt::RPCClient::RVEINTERNAL;
1930 }
1931 }
1932 return rv;
1933 }
1934 // process the remove procedure
do_cur_remove(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1935 RV do_cur_remove(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1936 kt::TimedDB::Cursor* cur,
1937 const std::map<std::string, std::string>& inmap,
1938 std::map<std::string, std::string>& outmap) {
1939 uint32_t thid = sess->thread_id();
1940 if (!cur) {
1941 set_message(outmap, "ERROR", "no such cursor");
1942 return kt::RPCClient::RVEINVALID;
1943 }
1944 RV rv;
1945 opcounts_[thid][CNTREMOVE]++;
1946 if (cur->remove()) {
1947 rv = kt::RPCClient::RVSUCCESS;
1948 } else {
1949 opcounts_[thid][CNTREMOVEMISS]++;
1950 const kc::BasicDB::Error& e = cur->error();
1951 set_db_error(outmap, e);
1952 if (e == kc::BasicDB::Error::NOREC) {
1953 rv = kt::RPCClient::RVELOGIC;
1954 } else {
1955 log_db_error(serv, e);
1956 rv = kt::RPCClient::RVEINTERNAL;
1957 }
1958 }
1959 return rv;
1960 }
1961 // process the cur_get_key procedure
do_cur_get_key(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1962 RV do_cur_get_key(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1963 kt::TimedDB::Cursor* cur,
1964 const std::map<std::string, std::string>& inmap,
1965 std::map<std::string, std::string>& outmap) {
1966 uint32_t thid = sess->thread_id();
1967 if (!cur) {
1968 set_message(outmap, "ERROR", "no such cursor");
1969 return kt::RPCClient::RVEINVALID;
1970 }
1971 const char* rp = kt::strmapget(inmap, "step");
1972 bool step = rp ? true : false;
1973 RV rv;
1974 opcounts_[thid][CNTGET]++;
1975 size_t ksiz;
1976 char* kbuf = cur->get_key(&ksiz, step);
1977 if (kbuf) {
1978 outmap["key"] = std::string(kbuf, ksiz);
1979 delete[] kbuf;
1980 rv = kt::RPCClient::RVSUCCESS;
1981 } else {
1982 opcounts_[thid][CNTGETMISS]++;
1983 const kc::BasicDB::Error& e = cur->error();
1984 set_db_error(outmap, e);
1985 if (e == kc::BasicDB::Error::NOREC) {
1986 rv = kt::RPCClient::RVELOGIC;
1987 } else {
1988 log_db_error(serv, e);
1989 rv = kt::RPCClient::RVEINTERNAL;
1990 }
1991 }
1992 return rv;
1993 }
1994 // process the cur_get_value procedure
do_cur_get_value(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)1995 RV do_cur_get_value(kt::RPCServer* serv, kt::RPCServer::Session* sess,
1996 kt::TimedDB::Cursor* cur,
1997 const std::map<std::string, std::string>& inmap,
1998 std::map<std::string, std::string>& outmap) {
1999 uint32_t thid = sess->thread_id();
2000 if (!cur) {
2001 set_message(outmap, "ERROR", "no such cursor");
2002 return kt::RPCClient::RVEINVALID;
2003 }
2004 const char* rp = kt::strmapget(inmap, "step");
2005 bool step = rp ? true : false;
2006 RV rv;
2007 opcounts_[thid][CNTGET]++;
2008 size_t vsiz;
2009 char* vbuf = cur->get_value(&vsiz, step);
2010 if (vbuf) {
2011 outmap["value"] = std::string(vbuf, vsiz);
2012 delete[] vbuf;
2013 rv = kt::RPCClient::RVSUCCESS;
2014 } else {
2015 opcounts_[thid][CNTGETMISS]++;
2016 const kc::BasicDB::Error& e = cur->error();
2017 set_db_error(outmap, e);
2018 if (e == kc::BasicDB::Error::NOREC) {
2019 rv = kt::RPCClient::RVELOGIC;
2020 } else {
2021 log_db_error(serv, e);
2022 rv = kt::RPCClient::RVEINTERNAL;
2023 }
2024 }
2025 return rv;
2026 }
2027 // process the cur_get procedure
do_cur_get(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)2028 RV do_cur_get(kt::RPCServer* serv, kt::RPCServer::Session* sess,
2029 kt::TimedDB::Cursor* cur,
2030 const std::map<std::string, std::string>& inmap,
2031 std::map<std::string, std::string>& outmap) {
2032 uint32_t thid = sess->thread_id();
2033 if (!cur) {
2034 set_message(outmap, "ERROR", "no such cursor");
2035 return kt::RPCClient::RVEINVALID;
2036 }
2037 const char* rp = kt::strmapget(inmap, "step");
2038 bool step = rp ? true : false;
2039 RV rv;
2040 opcounts_[thid][CNTGET]++;
2041 size_t ksiz, vsiz;
2042 const char* vbuf;
2043 int64_t xt;
2044 char* kbuf = cur->get(&ksiz, &vbuf, &vsiz, &xt, step);
2045 if (kbuf) {
2046 outmap["key"] = std::string(kbuf, ksiz);
2047 outmap["value"] = std::string(vbuf, vsiz);
2048 if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
2049 delete[] kbuf;
2050 rv = kt::RPCClient::RVSUCCESS;
2051 } else {
2052 opcounts_[thid][CNTGETMISS]++;
2053 const kc::BasicDB::Error& e = cur->error();
2054 set_db_error(outmap, e);
2055 if (e == kc::BasicDB::Error::NOREC) {
2056 rv = kt::RPCClient::RVELOGIC;
2057 } else {
2058 log_db_error(serv, e);
2059 rv = kt::RPCClient::RVEINTERNAL;
2060 }
2061 }
2062 return rv;
2063 }
2064 // process the cur_seize procedure
do_cur_seize(kt::RPCServer * serv,kt::RPCServer::Session * sess,kt::TimedDB::Cursor * cur,const std::map<std::string,std::string> & inmap,std::map<std::string,std::string> & outmap)2065 RV do_cur_seize(kt::RPCServer* serv, kt::RPCServer::Session* sess,
2066 kt::TimedDB::Cursor* cur,
2067 const std::map<std::string, std::string>& inmap,
2068 std::map<std::string, std::string>& outmap) {
2069 uint32_t thid = sess->thread_id();
2070 if (!cur) {
2071 set_message(outmap, "ERROR", "no such cursor");
2072 return kt::RPCClient::RVEINVALID;
2073 }
2074 RV rv;
2075 opcounts_[thid][CNTGET]++;
2076 size_t ksiz, vsiz;
2077 const char* vbuf;
2078 int64_t xt;
2079 char* kbuf = cur->seize(&ksiz, &vbuf, &vsiz, &xt);
2080 if (kbuf) {
2081 outmap["key"] = std::string(kbuf, ksiz);
2082 outmap["value"] = std::string(vbuf, vsiz);
2083 if (xt < kt::TimedDB::XTMAX) set_message(outmap, "xt", "%lld", (long long)xt);
2084 delete[] kbuf;
2085 rv = kt::RPCClient::RVSUCCESS;
2086 } else {
2087 opcounts_[thid][CNTGETMISS]++;
2088 const kc::BasicDB::Error& e = cur->error();
2089 set_db_error(outmap, e);
2090 if (e == kc::BasicDB::Error::NOREC) {
2091 rv = kt::RPCClient::RVELOGIC;
2092 } else {
2093 log_db_error(serv, e);
2094 rv = kt::RPCClient::RVEINTERNAL;
2095 }
2096 }
2097 return rv;
2098 }
2099 // process the restful get command
do_rest_get(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2100 int32_t do_rest_get(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2101 kt::TimedDB* db, const char* kbuf, size_t ksiz,
2102 const std::map<std::string, std::string>& reqheads,
2103 const std::string& reqbody,
2104 std::map<std::string, std::string>& resheads,
2105 std::string& resbody,
2106 const std::map<std::string, std::string>& misc) {
2107 uint32_t thid = sess->thread_id();
2108 int32_t code;
2109 opcounts_[thid][CNTGET]++;
2110 size_t vsiz;
2111 int64_t xt;
2112 const char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
2113 if (vbuf) {
2114 resbody.append(vbuf, vsiz);
2115 if (xt < kt::TimedDB::XTMAX) {
2116 char buf[48];
2117 kt::datestrhttp(xt, 0, buf);
2118 resheads["x-kt-xt"] = buf;
2119 }
2120 delete[] vbuf;
2121 code = 200;
2122 } else {
2123 opcounts_[thid][CNTGETMISS]++;
2124 const kc::BasicDB::Error& e = db->error();
2125 kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2126 if (e == kc::BasicDB::Error::NOREC) {
2127 code = 404;
2128 } else {
2129 log_db_error(serv, e);
2130 code = 500;
2131 }
2132 }
2133 return code;
2134 }
2135 // process the restful head command
do_rest_head(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2136 int32_t do_rest_head(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2137 kt::TimedDB* db, const char* kbuf, size_t ksiz,
2138 const std::map<std::string, std::string>& reqheads,
2139 const std::string& reqbody,
2140 std::map<std::string, std::string>& resheads,
2141 std::string& resbody,
2142 const std::map<std::string, std::string>& misc) {
2143 uint32_t thid = sess->thread_id();
2144 int32_t code;
2145 opcounts_[thid][CNTGET]++;
2146 size_t vsiz;
2147 int64_t xt;
2148 const char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
2149 if (vbuf) {
2150 if (xt < kt::TimedDB::XTMAX) {
2151 char buf[48];
2152 kt::datestrhttp(xt, 0, buf);
2153 resheads["x-kt-xt"] = buf;
2154 }
2155 kc::strprintf(&resheads["content-length"], "%lld", (long long)vsiz);
2156 delete[] vbuf;
2157 code = 200;
2158 } else {
2159 opcounts_[thid][CNTGETMISS]++;
2160 const kc::BasicDB::Error& e = db->error();
2161 kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2162 resheads["content-length"] = "0";
2163 if (e == kc::BasicDB::Error::NOREC) {
2164 code = 404;
2165 } else {
2166 log_db_error(serv, e);
2167 code = 500;
2168 }
2169 }
2170 return code;
2171 }
2172 // process the restful put command
do_rest_put(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2173 int32_t do_rest_put(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2174 kt::TimedDB* db, const char* kbuf, size_t ksiz,
2175 const std::map<std::string, std::string>& reqheads,
2176 const std::string& reqbody,
2177 std::map<std::string, std::string>& resheads,
2178 std::string& resbody,
2179 const std::map<std::string, std::string>& misc) {
2180 uint32_t thid = sess->thread_id();
2181 int32_t mode = 0;
2182 const char* rp = kt::strmapget(reqheads, "x-kt-mode");
2183 if (rp) {
2184 if (!kc::stricmp(rp, "add")) {
2185 mode = 1;
2186 } else if (!kc::stricmp(rp, "replace")) {
2187 mode = 2;
2188 }
2189 }
2190 rp = kt::strmapget(reqheads, "x-kt-xt");
2191 int64_t xt = rp ? kt::strmktime(rp) : -1;
2192 xt = xt > 0 && xt < kt::TimedDB::XTMAX ? -xt : kc::INT64MAX;
2193 int32_t code;
2194 opcounts_[thid][CNTSET]++;
2195
2196 bool rv;
2197 switch (mode) {
2198 default: {
2199 rv = db->set(kbuf, ksiz, reqbody.data(), reqbody.size(), xt);
2200 break;
2201 }
2202 case 1: {
2203 rv = db->add(kbuf, ksiz, reqbody.data(), reqbody.size(), xt);
2204 break;
2205 }
2206 case 2: {
2207 rv = db->replace(kbuf, ksiz, reqbody.data(), reqbody.size(), xt);
2208 break;
2209 }
2210 }
2211 if (rv) {
2212 const char* url = kt::strmapget(misc, "url");
2213 if (url) resheads["location"] = url;
2214 code = 201;
2215 } else {
2216 opcounts_[thid][CNTSETMISS]++;
2217 const kc::BasicDB::Error& e = db->error();
2218 kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2219 if (e == kc::BasicDB::Error::DUPREC || e == kc::BasicDB::Error::NOREC) {
2220 code = 450;
2221 } else {
2222 log_db_error(serv, e);
2223 code = 500;
2224 }
2225 }
2226 return code;
2227 }
2228 // process the restful delete command
do_rest_delete(kt::HTTPServer * serv,kt::HTTPServer::Session * sess,kt::TimedDB * db,const char * kbuf,size_t ksiz,const std::map<std::string,std::string> & reqheads,const std::string & reqbody,std::map<std::string,std::string> & resheads,std::string & resbody,const std::map<std::string,std::string> & misc)2229 int32_t do_rest_delete(kt::HTTPServer* serv, kt::HTTPServer::Session* sess,
2230 kt::TimedDB* db, const char* kbuf, size_t ksiz,
2231 const std::map<std::string, std::string>& reqheads,
2232 const std::string& reqbody,
2233 std::map<std::string, std::string>& resheads,
2234 std::string& resbody,
2235 const std::map<std::string, std::string>& misc) {
2236 uint32_t thid = sess->thread_id();
2237 int32_t code;
2238 opcounts_[thid][CNTREMOVE]++;
2239 if (db->remove(kbuf, ksiz)) {
2240 code = 204;
2241 } else {
2242 opcounts_[thid][CNTREMOVEMISS]++;
2243 const kc::BasicDB::Error& e = db->error();
2244 kc::strprintf(&resheads["x-kt-error"], "DB: %d: %s: %s", e.code(), e.name(), e.message());
2245 if (e == kc::BasicDB::Error::NOREC) {
2246 code = 404;
2247 } else {
2248 log_db_error(serv, e);
2249 code = 500;
2250 }
2251 }
2252 return code;
2253 }
2254 // process the binary replication command
do_bin_replication(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2255 bool do_bin_replication(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2256 char tbuf[sizeof(uint32_t)+sizeof(uint64_t)+sizeof(uint16_t)];
2257 if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2258 const char* rp = tbuf;
2259 uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2260 rp += sizeof(flags);
2261 uint64_t ts = kc::readfixnum(rp, sizeof(ts));
2262 rp += sizeof(ts);
2263 uint16_t sid = kc::readfixnum(rp, sizeof(sid));
2264 bool white = flags & kt::ReplicationClient::WHITESID;
2265 bool err = false;
2266 if (ulog_) {
2267 kt::UpdateLogger::Reader ulrd;
2268 if (ulrd.open(ulog_, ts)) {
2269 char c = kt::RemoteDB::BMREPLICATION;
2270 if (sess->send(&c, 1)) {
2271 serv->log(kt::ThreadedServer::Logger::SYSTEM, "a slave was connected: ts=%llu sid=%u",
2272 (unsigned long long)ts, sid);
2273 char stack[kc::NUMBUFSIZ+RECBUFSIZ*4];
2274 uint64_t rts = 0;
2275 int32_t miss = 0;
2276 while (!err && !serv->aborted()) {
2277 size_t msiz;
2278 uint64_t mts;
2279 char* mbuf = ulrd.read(&msiz, &mts);
2280 if (mbuf) {
2281 size_t rsiz;
2282 uint16_t rsid = 0;
2283 uint16_t rdbid = 0;
2284 const char* rbuf = DBUpdateLogger::parse(mbuf, msiz, &rsiz, &rsid, &rdbid);
2285 if (white) {
2286 if (rsid != sid) rbuf = NULL;
2287 } else {
2288 if (rsid == sid) rbuf = NULL;
2289 }
2290 if (rbuf) {
2291 miss = 0;
2292 size_t nsiz = 1 + sizeof(uint64_t) + sizeof(uint32_t) + msiz;
2293 char* nbuf = nsiz > sizeof(stack) ? new char[nsiz] : stack;
2294 char* wp = nbuf;
2295 *(wp++) = kt::RemoteDB::BMREPLICATION;
2296 kc::writefixnum(wp, mts, sizeof(uint64_t));
2297 wp += sizeof(uint64_t);
2298 kc::writefixnum(wp, msiz, sizeof(uint32_t));
2299 wp += sizeof(uint32_t);
2300 std::memcpy(wp, mbuf, msiz);
2301 if (!sess->send(nbuf, nsiz)) err = true;
2302 if (nbuf != stack) delete[] nbuf;
2303 } else {
2304 miss++;
2305 if (miss >= Slave::DUMMYFREQ) {
2306 char hbuf[1+sizeof(uint64_t)+sizeof(uint32_t)];
2307 char* wp = hbuf;
2308 *(wp++) = kt::RemoteDB::BMREPLICATION;
2309 kc::writefixnum(wp, mts, sizeof(uint64_t));
2310 wp += sizeof(uint64_t);
2311 kc::writefixnum(wp, 0, sizeof(uint32_t));
2312 if (!sess->send(hbuf, sizeof(hbuf))) err = true;
2313 miss = 0;
2314 }
2315 }
2316 if (mts > rts) rts = mts;
2317 delete[] mbuf;
2318 } else {
2319 uint64_t cc = kt::UpdateLogger::clock_pure();
2320 if (cc > 1000000000) cc -= 1000000000;
2321 if (cc < rts) cc = rts;
2322 char hbuf[1+sizeof(uint64_t)];
2323 char* wp = hbuf;
2324 *(wp++) = kt::RemoteDB::BMNOP;
2325 kc::writefixnum(wp, cc, sizeof(uint64_t));
2326 if (!sess->send(hbuf, sizeof(hbuf)) ||
2327 sess->receive_byte() != kt::RemoteDB::BMREPLICATION)
2328 err = true;
2329 kc::Thread::sleep(0.1);
2330 }
2331 }
2332 serv->log(kt::ThreadedServer::Logger::SYSTEM, "a slave was disconnected: sid=%u", sid);
2333 if (!ulrd.close()) {
2334 serv->log(kt::ThreadedServer::Logger::ERROR, "closing an update log reader failed");
2335 err = true;
2336 }
2337 } else {
2338 err = true;
2339 }
2340 } else {
2341 serv->log(kt::ThreadedServer::Logger::ERROR, "opening an update log reader failed");
2342 char c = kt::RemoteDB::BMERROR;
2343 sess->send(&c, 1);
2344 err = true;
2345 }
2346 } else {
2347 char c = kt::RemoteDB::BMERROR;
2348 sess->send(&c, 1);
2349 serv->log(kt::ThreadedServer::Logger::INFO, "no update log allows no replication");
2350 err = true;
2351 }
2352 return !err;
2353 }
2354 // process the binary play_script command
do_bin_play_script(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2355 bool do_bin_play_script(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2356 uint32_t thid = sess->thread_id();
2357 char tbuf[sizeof(uint32_t)+sizeof(uint32_t)+sizeof(uint32_t)];
2358 if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2359 const char* rp = tbuf;
2360 uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2361 rp += sizeof(flags);
2362 uint32_t nsiz = kc::readfixnum(rp, sizeof(nsiz));
2363 rp += sizeof(nsiz);
2364 uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2365 rp += sizeof(rnum);
2366 if (nsiz > kt::RemoteDB::DATAMAXSIZ) return false;
2367 bool norep = flags & kt::RemoteDB::BONOREPLY;
2368 bool err = false;
2369 char nstack[kc::NUMBUFSIZ+RECBUFSIZ];
2370 char* nbuf = nsiz + 1 > sizeof(nstack) ? new char[nsiz+1] : nstack;
2371 if (sess->receive(nbuf, nsiz)) {
2372 nbuf[nsiz] = '\0';
2373 char stack[kc::NUMBUFSIZ+RECBUFSIZ*4];
2374 std::map<std::string, std::string> scrinmap;
2375 for (uint32_t i = 0; !err && i < rnum; i++) {
2376 char hbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2377 if (sess->receive(hbuf, sizeof(hbuf))) {
2378 rp = hbuf;
2379 uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2380 rp += sizeof(ksiz);
2381 uint32_t vsiz = kc::readfixnum(rp, sizeof(vsiz));
2382 rp += sizeof(vsiz);
2383 if (ksiz <= kt::RemoteDB::DATAMAXSIZ && vsiz <= kt::RemoteDB::DATAMAXSIZ) {
2384 size_t rsiz = ksiz + vsiz;
2385 char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
2386 if (sess->receive(rbuf, rsiz)) {
2387 std::string key(rbuf, ksiz);
2388 std::string value(rbuf + ksiz, vsiz);
2389 scrinmap[key] = value;
2390 } else {
2391 err = true;
2392 }
2393 if (rbuf != stack) delete[] rbuf;
2394 } else {
2395 err = true;
2396 }
2397 } else {
2398 err = true;
2399 }
2400 }
2401 if (!err) {
2402 if (scrprocs_) {
2403 ScriptProcessor* scrproc = scrprocs_ + thid;
2404 opcounts_[thid][CNTSCRIPT]++;
2405 std::map<std::string, std::string> scroutmap;
2406 RV rv = scrproc->call(nbuf, scrinmap, scroutmap);
2407 if (rv == kt::RPCClient::RVSUCCESS) {
2408 size_t osiz = 1 + sizeof(uint32_t);
2409 std::map<std::string, std::string>::iterator it = scroutmap.begin();
2410 std::map<std::string, std::string>::iterator itend = scroutmap.end();
2411 while (it != itend) {
2412 osiz += sizeof(uint32_t) + sizeof(uint32_t) + it->first.size() + it->second.size();
2413 ++it;
2414 }
2415 char* obuf = new char[osiz];
2416 char* wp = obuf;
2417 *(wp++) = kt::RemoteDB::BMPLAYSCRIPT;
2418 kc::writefixnum(wp, scroutmap.size(), sizeof(uint32_t));
2419 wp += sizeof(uint32_t);
2420 it = scroutmap.begin();
2421 itend = scroutmap.end();
2422 while (it != itend) {
2423 kc::writefixnum(wp, it->first.size(), sizeof(uint32_t));
2424 wp += sizeof(uint32_t);
2425 kc::writefixnum(wp, it->second.size(), sizeof(uint32_t));
2426 wp += sizeof(uint32_t);
2427 std::memcpy(wp, it->first.data(), it->first.size());
2428 wp += it->first.size();
2429 std::memcpy(wp, it->second.data(), it->second.size());
2430 wp += it->second.size();
2431 ++it;
2432 }
2433 if (!norep && !sess->send(obuf, osiz)) err = true;
2434 delete[] obuf;
2435 } else {
2436 char c = kt::RemoteDB::BMERROR;
2437 if (!norep) sess->send(&c, 1);
2438 }
2439 } else {
2440 char c = kt::RemoteDB::BMERROR;
2441 if (!norep) sess->send(&c, 1);
2442 }
2443 }
2444 }
2445 if (nbuf != nstack) delete[] nbuf;
2446 return !err;
2447 }
2448 // process the binary set_bulk command
do_bin_set_bulk(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2449 bool do_bin_set_bulk(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2450 uint32_t thid = sess->thread_id();
2451 char tbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2452 if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2453 const char* rp = tbuf;
2454 uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2455 rp += sizeof(flags);
2456 uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2457 rp += sizeof(rnum);
2458 bool norep = flags & kt::RemoteDB::BONOREPLY;
2459 bool err = false;
2460 uint32_t hits = 0;
2461 char stack[kc::NUMBUFSIZ+RECBUFSIZ*4];
2462 for (uint32_t i = 0; !err && i < rnum; i++) {
2463 char hbuf[sizeof(uint16_t)+sizeof(uint32_t)+sizeof(uint32_t)+sizeof(int64_t)];
2464 if (sess->receive(hbuf, sizeof(hbuf))) {
2465 rp = hbuf;
2466 uint16_t dbidx = kc::readfixnum(rp, sizeof(dbidx));
2467 rp += sizeof(dbidx);
2468 uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2469 rp += sizeof(ksiz);
2470 uint32_t vsiz = kc::readfixnum(rp, sizeof(vsiz));
2471 rp += sizeof(vsiz);
2472 int64_t xt = kc::readfixnum(rp, sizeof(xt));
2473 rp += sizeof(xt);
2474 if (ksiz <= kt::RemoteDB::DATAMAXSIZ && vsiz <= kt::RemoteDB::DATAMAXSIZ) {
2475 size_t rsiz = ksiz + vsiz;
2476 char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
2477 if (sess->receive(rbuf, rsiz)) {
2478 if (dbidx < dbnum_) {
2479 kt::TimedDB* db = dbs_ + dbidx;
2480 opcounts_[thid][CNTSET]++;
2481 if (db->set(rbuf, ksiz, rbuf + ksiz, vsiz, xt)) {
2482 hits++;
2483 } else {
2484 opcounts_[thid][CNTSETMISS]++;
2485 err = true;
2486 }
2487 }
2488 } else {
2489 err = true;
2490 }
2491 if (rbuf != stack) delete[] rbuf;
2492 } else {
2493 err = true;
2494 }
2495 } else {
2496 err = true;
2497 }
2498 }
2499 if (err) {
2500 char c = kt::RemoteDB::BMERROR;
2501 if (!norep) sess->send(&c, 1);
2502 } else {
2503 char hbuf[1+sizeof(hits)];
2504 char* wp = hbuf;
2505 *(wp++) = kt::RemoteDB::BMSETBULK;
2506 kc::writefixnum(wp, hits, sizeof(hits));
2507 if (!norep && !sess->send(hbuf, sizeof(hbuf))) err = true;
2508 }
2509 return !err;
2510 }
2511 // process the binary remove_bulk command
do_bin_remove_bulk(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2512 bool do_bin_remove_bulk(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2513 uint32_t thid = sess->thread_id();
2514 char tbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2515 if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2516 const char* rp = tbuf;
2517 uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2518 rp += sizeof(flags);
2519 uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2520 rp += sizeof(rnum);
2521 bool norep = flags & kt::RemoteDB::BONOREPLY;
2522 bool err = false;
2523 uint32_t hits = 0;
2524 char stack[kc::NUMBUFSIZ+RECBUFSIZ*2];
2525 for (uint32_t i = 0; !err && i < rnum; i++) {
2526 char hbuf[sizeof(uint16_t)+sizeof(uint32_t)];
2527 if (sess->receive(hbuf, sizeof(hbuf))) {
2528 rp = hbuf;
2529 uint16_t dbidx = kc::readfixnum(rp, sizeof(dbidx));
2530 rp += sizeof(dbidx);
2531 uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2532 rp += sizeof(ksiz);
2533 if (ksiz <= kt::RemoteDB::DATAMAXSIZ) {
2534 char* kbuf = ksiz > sizeof(stack) ? new char[ksiz] : stack;
2535 if (sess->receive(kbuf, ksiz)) {
2536 if (dbidx < dbnum_) {
2537 kt::TimedDB* db = dbs_ + dbidx;
2538 opcounts_[thid][CNTREMOVE]++;
2539 if (db->remove(kbuf, ksiz)) {
2540 hits++;
2541 } else {
2542 opcounts_[thid][CNTREMOVEMISS]++;
2543 if (db->error() != kc::BasicDB::Error::NOREC) err = true;
2544 }
2545 }
2546 } else {
2547 err = true;
2548 }
2549 if (kbuf != stack) delete[] kbuf;
2550 } else {
2551 err = true;
2552 }
2553 } else {
2554 err = true;
2555 }
2556 }
2557 if (err) {
2558 char c = kt::RemoteDB::BMERROR;
2559 if (!norep) sess->send(&c, 1);
2560 } else {
2561 char hbuf[1+sizeof(hits)];
2562 char* wp = hbuf;
2563 *(wp++) = kt::RemoteDB::BMREMOVEBULK;
2564 kc::writefixnum(wp, hits, sizeof(hits));
2565 if (!norep && !sess->send(hbuf, sizeof(hbuf))) err = true;
2566 }
2567 return !err;
2568 }
2569 // process the binary get_bulk command
do_bin_get_bulk(kt::ThreadedServer * serv,kt::ThreadedServer::Session * sess)2570 bool do_bin_get_bulk(kt::ThreadedServer* serv, kt::ThreadedServer::Session* sess) {
2571 uint32_t thid = sess->thread_id();
2572 char tbuf[sizeof(uint32_t)+sizeof(uint32_t)];
2573 if (!sess->receive(tbuf, sizeof(tbuf))) return false;
2574 const char* rp = tbuf;
2575 uint32_t flags = kc::readfixnum(rp, sizeof(flags));
2576 rp += sizeof(flags);
2577 uint32_t rnum = kc::readfixnum(rp, sizeof(rnum));
2578 rp += sizeof(rnum);
2579 bool err = false;
2580 uint32_t hits = 0;
2581 char stack[kc::NUMBUFSIZ+RECBUFSIZ*2];
2582 size_t oasiz = kc::NUMBUFSIZ + RECBUFSIZ * 2;
2583 char* obuf = (char*)kc::xmalloc(oasiz);
2584 size_t osiz = 1 + sizeof(uint32_t);
2585 std::memset(obuf, 0, osiz);
2586 for (uint32_t i = 0; !err && i < rnum; i++) {
2587 char hbuf[sizeof(uint16_t)+sizeof(uint32_t)];
2588 if (sess->receive(hbuf, sizeof(hbuf))) {
2589 rp = hbuf;
2590 uint16_t dbidx = kc::readfixnum(rp, sizeof(dbidx));
2591 rp += sizeof(dbidx);
2592 uint32_t ksiz = kc::readfixnum(rp, sizeof(ksiz));
2593 rp += sizeof(ksiz);
2594 if (ksiz <= kt::RemoteDB::DATAMAXSIZ) {
2595 char* kbuf = ksiz > sizeof(stack) ? new char[ksiz] : stack;
2596 if (sess->receive(kbuf, ksiz)) {
2597 if (dbidx < dbnum_) {
2598 kt::TimedDB* db = dbs_ + dbidx;
2599 opcounts_[thid][CNTGET]++;
2600 size_t vsiz;
2601 int64_t xt;
2602 char* vbuf = db->get(kbuf, ksiz, &vsiz, &xt);
2603 if (vbuf) {
2604 hits++;
2605 size_t usiz = sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) +
2606 sizeof(int64_t) + ksiz + vsiz;
2607 if (osiz + usiz > oasiz) {
2608 oasiz = oasiz * 2 + usiz;
2609 obuf = (char*)kc::xrealloc(obuf, oasiz);
2610 }
2611 kc::writefixnum(obuf + osiz, dbidx, sizeof(uint16_t));
2612 osiz += sizeof(uint16_t);
2613 kc::writefixnum(obuf + osiz, ksiz, sizeof(uint32_t));
2614 osiz += sizeof(uint32_t);
2615 kc::writefixnum(obuf + osiz, vsiz, sizeof(uint32_t));
2616 osiz += sizeof(uint32_t);
2617 kc::writefixnum(obuf + osiz, xt, sizeof(int64_t));
2618 osiz += sizeof(int64_t);
2619 std::memcpy(obuf + osiz, kbuf, ksiz);
2620 osiz += ksiz;
2621 std::memcpy(obuf + osiz, vbuf, vsiz);
2622 osiz += vsiz;
2623 delete[] vbuf;
2624 } else {
2625 opcounts_[thid][CNTGETMISS]++;
2626 if (db->error() != kc::BasicDB::Error::NOREC) err = true;
2627 }
2628 }
2629 } else {
2630 err = true;
2631 }
2632 if (kbuf != stack) delete[] kbuf;
2633 } else {
2634 err = true;
2635 }
2636 } else {
2637 err = true;
2638 }
2639 }
2640 if (err) {
2641 char c = kt::RemoteDB::BMERROR;
2642 sess->send(&c, 1);
2643 } else {
2644 *obuf = kt::RemoteDB::BMGETBULK;
2645 kc::writefixnum(obuf + 1, hits, sizeof(hits));
2646 if (!sess->send(obuf, osiz)) err = true;
2647 }
2648 kc::xfree(obuf);
2649 return !err;
2650 }
2651 // session local storage
2652 class SLS : public kt::RPCServer::Session::Data {
2653 friend class Worker;
2654 private:
SLS()2655 SLS() : curs_() {}
~SLS()2656 ~SLS() {
2657 std::map<int64_t, kt::TimedDB::Cursor*>::iterator it = curs_.begin();
2658 std::map<int64_t, kt::TimedDB::Cursor*>::iterator itend = curs_.end();
2659 while (it != itend) {
2660 kt::TimedDB::Cursor* cur = it->second;
2661 delete cur;
2662 ++it;
2663 }
2664 }
create(kt::RPCServer::Session * sess)2665 static SLS* create(kt::RPCServer::Session* sess) {
2666 SLS* sls = (SLS*)sess->data();
2667 if (!sls) {
2668 sls = new SLS;
2669 sess->set_data(sls);
2670 }
2671 return sls;
2672 }
2673 std::map<int64_t, kt::TimedDB::Cursor*> curs_;
2674 };
2675 int32_t thnum_;
2676 kc::CondMap* const condmap_;
2677 kt::TimedDB* const dbs_;
2678 const int32_t dbnum_;
2679 const std::map<std::string, int32_t>& dbmap_;
2680 const int32_t omode_;
2681 const double asi_;
2682 const bool ash_;
2683 const char* const bgspath_;
2684 const double bgsi_;
2685 kc::Compressor* const bgscomp_;
2686 kt::UpdateLogger* const ulog_;
2687 DBUpdateLogger* const ulogdbs_;
2688 const char* const cmdpath_;
2689 ScriptProcessor* const scrprocs_;
2690 OpCount* const opcounts_;
2691 uint64_t idlecnt_;
2692 double asnext_;
2693 double bgsnext_;
2694 Slave* slave_;
2695 };
2696
2697
2698 // main routine
main(int argc,char ** argv)2699 int main(int argc, char** argv) {
2700 g_progname = argv[0];
2701 g_procid = kc::getpid();
2702 g_starttime = kc::time();
2703 kc::setstdiobin();
2704 kt::setkillsignalhandler(killserver);
2705 if (argc > 1 && !std::strcmp(argv[1], "--version")) {
2706 printversion();
2707 return 0;
2708 }
2709 int32_t rv = run(argc, argv);
2710 return rv;
2711 }
2712
2713
2714 // print the usage and exit
usage()2715 static void usage() {
2716 eprintf("%s: Kyoto Tycoon: a handy cache/storage server\n", g_progname);
2717 eprintf("\n");
2718 eprintf("usage:\n");
2719 eprintf(" %s [-host str] [-port num] [-tout num] [-th num] [-log file] [-li|-ls|-le|-lz]"
2720 " [-ulog dir] [-ulim num] [-uasi num] [-sid num] [-ord] [-oat|-oas|-onl|-otl|-onr]"
2721 " [-asi num] [-ash] [-bgs dir] [-bgsi num] [-bgc str]"
2722 " [-dmn] [-pid file] [-cmd dir] [-scr file]"
2723 " [-mhost str] [-mport num] [-rts file] [-riv num]"
2724 " [-plsv file] [-plex str] [-pldb file] [db...]\n", g_progname);
2725 eprintf("\n");
2726 std::exit(1);
2727 }
2728
2729
2730 // kill the running server
killserver(int signum)2731 static void killserver(int signum) {
2732 if (g_serv) {
2733 g_serv->stop();
2734 g_serv = NULL;
2735 if (g_daemon && signum == SIGHUP) g_restart = true;
2736 if (signum == SIGUSR1) g_restart = true;
2737 }
2738 }
2739
2740
2741 // parse arguments of the command
run(int argc,char ** argv)2742 static int32_t run(int argc, char** argv) {
2743 bool argbrk = false;
2744 std::vector<std::string> dbpaths;
2745 const char* host = NULL;
2746 int32_t port = kt::DEFPORT;
2747 double tout = DEFTOUT;
2748 int32_t thnum = DEFTHNUM;
2749 const char* logpath = NULL;
2750 uint32_t logkinds = kc::UINT32MAX;
2751 const char* ulogpath = NULL;
2752 int64_t ulim = DEFULIM;
2753 double uasi = 0;
2754 int32_t sid = -1;
2755 int32_t omode = kc::BasicDB::OWRITER | kc::BasicDB::OCREATE;
2756 double asi = 0;
2757 bool ash = false;
2758 const char* bgspath = NULL;
2759 double bgsi = DEFBGSI;
2760 kc::Compressor* bgscomp = NULL;
2761 bool dmn = false;
2762 const char* pidpath = NULL;
2763 const char* cmdpath = NULL;
2764 const char* scrpath = NULL;
2765 const char* mhost = NULL;
2766 int32_t mport = kt::DEFPORT;
2767 const char* rtspath = NULL;
2768 double riv = DEFRIV;
2769 const char* plsvpath = NULL;
2770 const char* plsvex = "";
2771 const char* pldbpath = NULL;
2772 for (int32_t i = 1; i < argc; i++) {
2773 if (!argbrk && argv[i][0] == '-') {
2774 if (!std::strcmp(argv[i], "--")) {
2775 argbrk = true;
2776 } else if (!std::strcmp(argv[i], "-host")) {
2777 if (++i >= argc) usage();
2778 host = argv[i];
2779 } else if (!std::strcmp(argv[i], "-port")) {
2780 if (++i >= argc) usage();
2781 port = kc::atoix(argv[i]);
2782 } else if (!std::strcmp(argv[i], "-tout")) {
2783 if (++i >= argc) usage();
2784 tout = kc::atof(argv[i]);
2785 } else if (!std::strcmp(argv[i], "-th")) {
2786 if (++i >= argc) usage();
2787 thnum = kc::atof(argv[i]);
2788 } else if (!std::strcmp(argv[i], "-log")) {
2789 if (++i >= argc) usage();
2790 logpath = argv[i];
2791 } else if (!std::strcmp(argv[i], "-li")) {
2792 logkinds = Logger::INFO | Logger::SYSTEM | Logger::ERROR;
2793 } else if (!std::strcmp(argv[i], "-ls")) {
2794 logkinds = Logger::SYSTEM | Logger::ERROR;
2795 } else if (!std::strcmp(argv[i], "-le")) {
2796 logkinds = Logger::ERROR;
2797 } else if (!std::strcmp(argv[i], "-lz")) {
2798 logkinds = 0;
2799 } else if (!std::strcmp(argv[i], "-ulog")) {
2800 if (++i >= argc) usage();
2801 ulogpath = argv[i];
2802 } else if (!std::strcmp(argv[i], "-ulim")) {
2803 if (++i >= argc) usage();
2804 ulim = kc::atoix(argv[i]);
2805 } else if (!std::strcmp(argv[i], "-uasi")) {
2806 if (++i >= argc) usage();
2807 uasi = kc::atof(argv[i]);
2808 } else if (!std::strcmp(argv[i], "-sid")) {
2809 if (++i >= argc) usage();
2810 sid = kc::atoix(argv[i]);
2811 } else if (!std::strcmp(argv[i], "-ord")) {
2812 omode &= ~kc::BasicDB::OWRITER;
2813 omode |= kc::BasicDB::OREADER;
2814 } else if (!std::strcmp(argv[i], "-oat")) {
2815 omode |= kc::BasicDB::OAUTOTRAN;
2816 } else if (!std::strcmp(argv[i], "-oas")) {
2817 omode |= kc::BasicDB::OAUTOSYNC;
2818 } else if (!std::strcmp(argv[i], "-onl")) {
2819 omode |= kc::BasicDB::ONOLOCK;
2820 } else if (!std::strcmp(argv[i], "-otl")) {
2821 omode |= kc::BasicDB::OTRYLOCK;
2822 } else if (!std::strcmp(argv[i], "-onr")) {
2823 omode |= kc::BasicDB::ONOREPAIR;
2824 } else if (!std::strcmp(argv[i], "-asi")) {
2825 if (++i >= argc) usage();
2826 asi = kc::atof(argv[i]);
2827 } else if (!std::strcmp(argv[i], "-ash")) {
2828 ash = true;
2829 } else if (!std::strcmp(argv[i], "-bgs")) {
2830 if (++i >= argc) usage();
2831 bgspath = argv[i];
2832 } else if (!std::strcmp(argv[i], "-bgsi")) {
2833 if (++i >= argc) usage();
2834 bgsi = kc::atof(argv[i]);
2835 } else if (!std::strcmp(argv[i], "-bgsc")) {
2836 if (++i >= argc) usage();
2837 const char* cn = argv[i];
2838 if (!kc::stricmp(cn, "zlib") || !kc::stricmp(cn, "gz")) {
2839 bgscomp = new kc::ZLIBCompressor<kc::ZLIB::RAW>;
2840 } else if (!kc::stricmp(cn, "lzo") || !kc::stricmp(cn, "oz")) {
2841 bgscomp = new kc::LZOCompressor<kc::LZO::RAW>;
2842 } else if (!kc::stricmp(cn, "lzma") || !kc::stricmp(cn, "xz")) {
2843 bgscomp = new kc::LZMACompressor<kc::LZMA::RAW>;
2844 }
2845 } else if (!std::strcmp(argv[i], "-dmn")) {
2846 dmn = true;
2847 } else if (!std::strcmp(argv[i], "-pid")) {
2848 if (++i >= argc) usage();
2849 pidpath = argv[i];
2850 } else if (!std::strcmp(argv[i], "-cmd")) {
2851 if (++i >= argc) usage();
2852 cmdpath = argv[i];
2853 } else if (!std::strcmp(argv[i], "-scr")) {
2854 if (++i >= argc) usage();
2855 scrpath = argv[i];
2856 } else if (!std::strcmp(argv[i], "-mhost")) {
2857 if (++i >= argc) usage();
2858 mhost = argv[i];
2859 } else if (!std::strcmp(argv[i], "-mport")) {
2860 if (++i >= argc) usage();
2861 mport = kc::atoix(argv[i]);
2862 } else if (!std::strcmp(argv[i], "-rts")) {
2863 if (++i >= argc) usage();
2864 rtspath = argv[i];
2865 } else if (!std::strcmp(argv[i], "-riv")) {
2866 if (++i >= argc) usage();
2867 riv = kc::atof(argv[i]);
2868 } else if (!std::strcmp(argv[i], "-plsv")) {
2869 if (++i >= argc) usage();
2870 plsvpath = argv[i];
2871 } else if (!std::strcmp(argv[i], "-plex")) {
2872 if (++i >= argc) usage();
2873 plsvex = argv[i];
2874 } else if (!std::strcmp(argv[i], "-pldb")) {
2875 if (++i >= argc) usage();
2876 pldbpath = argv[i];
2877 } else {
2878 usage();
2879 }
2880 } else {
2881 argbrk = true;
2882 dbpaths.push_back(argv[i]);
2883 }
2884 }
2885 if (port < 1 || thnum < 1 || mport < 1) usage();
2886 if (thnum > THREADMAX) thnum = THREADMAX;
2887 if (dbpaths.empty()) {
2888 if (pldbpath) usage();
2889 dbpaths.push_back(":");
2890 }
2891 int32_t rv = proc(dbpaths, host, port, tout, thnum, logpath, logkinds,
2892 ulogpath, ulim, uasi, sid, omode, asi, ash, bgspath, bgsi, bgscomp,
2893 dmn, pidpath, cmdpath, scrpath, mhost, mport, rtspath, riv,
2894 plsvpath, plsvex, pldbpath);
2895 delete bgscomp;
2896 return rv;
2897 }
2898
2899
2900 // drive the server process
proc(const std::vector<std::string> & dbpaths,const char * host,int32_t port,double tout,int32_t thnum,const char * logpath,uint32_t logkinds,const char * ulogpath,int64_t ulim,double uasi,int32_t sid,int32_t omode,double asi,bool ash,const char * bgspath,double bgsi,kc::Compressor * bgscomp,bool dmn,const char * pidpath,const char * cmdpath,const char * scrpath,const char * mhost,int32_t mport,const char * rtspath,double riv,const char * plsvpath,const char * plsvex,const char * pldbpath)2901 static int32_t proc(const std::vector<std::string>& dbpaths,
2902 const char* host, int32_t port, double tout, int32_t thnum,
2903 const char* logpath, uint32_t logkinds,
2904 const char* ulogpath, int64_t ulim, double uasi,
2905 int32_t sid, int32_t omode, double asi, bool ash,
2906 const char* bgspath, double bgsi, kc::Compressor* bgscomp, bool dmn,
2907 const char* pidpath, const char* cmdpath, const char* scrpath,
2908 const char* mhost, int32_t mport, const char* rtspath, double riv,
2909 const char* plsvpath, const char* plsvex, const char* pldbpath) {
2910 g_daemon = false;
2911 if (dmn) {
2912 if (kc::File::PATHCHR == '/') {
2913 if (logpath && *logpath != kc::File::PATHCHR) {
2914 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, logpath);
2915 return 1;
2916 }
2917 if (ulogpath && *ulogpath != kc::File::PATHCHR) {
2918 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, ulogpath);
2919 return 1;
2920 }
2921 if (bgspath && *bgspath != kc::File::PATHCHR) {
2922 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, bgspath);
2923 return 1;
2924 }
2925 if (pidpath && *pidpath != kc::File::PATHCHR) {
2926 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, pidpath);
2927 return 1;
2928 }
2929 if (cmdpath && *cmdpath != kc::File::PATHCHR) {
2930 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, cmdpath);
2931 return 1;
2932 }
2933 if (scrpath && *scrpath != kc::File::PATHCHR) {
2934 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, scrpath);
2935 return 1;
2936 }
2937 if (rtspath && *rtspath != kc::File::PATHCHR) {
2938 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, rtspath);
2939 return 1;
2940 }
2941 if (plsvpath && *plsvpath != kc::File::PATHCHR) {
2942 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, plsvpath);
2943 return 1;
2944 }
2945 if (pldbpath && *pldbpath != kc::File::PATHCHR) {
2946 eprintf("%s: %s: a daemon can accept absolute path only\n", g_progname, pldbpath);
2947 return 1;
2948 }
2949 }
2950 if (!kt::daemonize()) {
2951 eprintf("%s: switching to a daemon failed\n", g_progname);
2952 return 1;
2953 }
2954 g_procid = kc::getpid();
2955 g_daemon = true;
2956 }
2957 if (ulogpath && sid < 0) {
2958 eprintf("%s: update log requires the server ID\n", g_progname);
2959 return 1;
2960 }
2961 if (!cmdpath) cmdpath = kc::File::CDIRSTR;
2962 if (mhost) {
2963 if (sid < 0) {
2964 eprintf("%s: replication requires the server ID\n", g_progname);
2965 return 1;
2966 }
2967 if (!rtspath) {
2968 eprintf("%s: replication requires the replication time stamp file\n", g_progname);
2969 return 1;
2970 }
2971 }
2972 if (sid < 0) sid = 0;
2973 kc::File::Status sbuf;
2974 if (bgspath && !kc::File::status(bgspath, &sbuf) && !kc::File::make_directory(bgspath)) {
2975 eprintf("%s: %s: could not open the directory\n", g_progname, bgspath);
2976 return 1;
2977 }
2978 if (!kc::File::status(cmdpath, &sbuf) || !sbuf.isdir) {
2979 eprintf("%s: %s: no such directory\n", g_progname, cmdpath);
2980 return 1;
2981 }
2982 if (scrpath && !kc::File::status(scrpath)) {
2983 eprintf("%s: %s: no such file\n", g_progname, scrpath);
2984 return 1;
2985 }
2986 if (dbpaths.size() > (size_t)OPENDBMAX) {
2987 eprintf("%s: too much databases\n", g_progname);
2988 return 1;
2989 }
2990 kt::RPCServer serv;
2991 Logger logger;
2992 if (!logger.open(logpath)) {
2993 eprintf("%s: %s: could not open the log file\n", g_progname, logpath ? logpath : "-");
2994 return 1;
2995 }
2996 serv.set_logger(&logger, logkinds);
2997 serv.log(Logger::SYSTEM, "================ [START]: pid=%d", g_procid);
2998 std::string addr = "";
2999 if (host) {
3000 addr = kt::Socket::get_host_address(host);
3001 if (addr.empty()) {
3002 serv.log(Logger::ERROR, "unknown host: %s", host);
3003 return 1;
3004 }
3005 }
3006 kt::SharedLibrary pldblib;
3007 kt::KTDBINIT pldbinit = NULL;
3008 if (pldbpath) {
3009 serv.log(Logger::SYSTEM, "loading a plug-in database file: path=%s", pldbpath);
3010 if (!pldblib.open(pldbpath)) {
3011 serv.log(Logger::ERROR, "could not load a plug-in database file: %s", pldbpath);
3012 return 1;
3013 }
3014 pldbinit = (kt::KTDBINIT)pldblib.symbol(kt::KTDBINITNAME);
3015 if (!pldbinit) {
3016 serv.log(Logger::ERROR, "could not find the initializer: %s: %s",
3017 pldbpath, kt::KTDBINITNAME);
3018 return 1;
3019 }
3020 }
3021 std::string expr = kc::strprintf("%s:%d", addr.c_str(), port);
3022 serv.set_network(expr, tout);
3023 int32_t dbnum = dbpaths.size();
3024 kt::UpdateLogger* ulog = NULL;
3025 DBUpdateLogger* ulogdbs = NULL;
3026 if (ulogpath) {
3027 ulog = new kt::UpdateLogger;
3028 serv.log(Logger::SYSTEM, "opening the update log: path=%s sid=%u", ulogpath, sid);
3029 if (!ulog->open(ulogpath, ulim, uasi)) {
3030 serv.log(Logger::ERROR, "could not open the update log: %s", ulogpath);
3031 delete ulog;
3032 return 1;
3033 }
3034 ulogdbs = new DBUpdateLogger[dbnum];
3035 }
3036 kt::TimedDB* dbs = new kt::TimedDB[dbnum];
3037 DBLogger dblogger(&logger, logkinds);
3038 std::map<std::string, int32_t> dbmap;
3039 for (int32_t i = 0; i < dbnum; i++) {
3040 const std::string& dbpath = dbpaths[i];
3041 serv.log(Logger::SYSTEM, "opening a database: path=%s", dbpath.c_str());
3042 if (logkinds != 0)
3043 dbs[i].tune_logger(&dblogger, kc::BasicDB::Logger::WARN | kc::BasicDB::Logger::ERROR);
3044 if (ulog) {
3045 ulogdbs[i].initialize(ulog, sid, i);
3046 dbs[i].tune_update_trigger(ulogdbs + i);
3047 }
3048 if (pldbinit) dbs[i].set_internal_db(pldbinit());
3049 if (!dbs[i].open(dbpath, omode)) {
3050 const kc::BasicDB::Error& e = dbs[i].error();
3051 serv.log(Logger::ERROR, "could not open a database file: %s: %s: %s",
3052 dbpath.c_str(), e.name(), e.message());
3053 delete[] dbs;
3054 delete[] ulogdbs;
3055 delete ulog;
3056 return 1;
3057 }
3058 std::string path = dbs[i].path();
3059 const char* rp = path.c_str();
3060 const char* pv = std::strrchr(rp, kc::File::PATHCHR);
3061 if (pv) rp = pv + 1;
3062 dbmap[rp] = i;
3063 }
3064 if (bgspath) {
3065 kc::DirStream dir;
3066 if (dir.open(bgspath)) {
3067 std::string name;
3068 while (dir.read(&name)) {
3069 const char* nstr = name.c_str();
3070 const char* pv = std::strrchr(nstr, kc::File::EXTCHR);
3071 int32_t idx = kc::atoi(nstr);
3072 if (*nstr >= '0' && *nstr <= '9' && pv && !kc::stricmp(pv + 1, BGSPATHEXT) &&
3073 idx >= 0 && idx < dbnum) {
3074 std::string path;
3075 kc::strprintf(&path, "%s%c%s", bgspath, kc::File::PATHCHR, nstr);
3076 uint64_t ssts;
3077 int64_t sscount, sssize;
3078 if (kt::TimedDB::status_snapshot_atomic(path, &ssts, &sscount, &sssize)) {
3079 serv.log(Logger::SYSTEM,
3080 "applying a snapshot file: db=%d ts=%llu count=%lld size=%lld",
3081 idx, (unsigned long long)ssts, (long long)sscount, (long long)sssize);
3082 if (!dbs[idx].load_snapshot_atomic(path, bgscomp)) {
3083 const kc::BasicDB::Error& e = dbs[idx].error();
3084 serv.log(Logger::ERROR, "could not apply a snapshot: %s: %s",
3085 e.name(), e.message());
3086 }
3087 }
3088 }
3089 }
3090 dir.close();
3091 }
3092 }
3093 ScriptProcessor* scrprocs = NULL;
3094 if (scrpath) {
3095 serv.log(Logger::SYSTEM, "loading a script file: path=%s", scrpath);
3096 scrprocs = new ScriptProcessor[thnum];
3097 for (int32_t i = 0; i < thnum; i++) {
3098 if (!scrprocs[i].set_resources(i, &serv, dbs, dbnum, &dbmap)) {
3099 serv.log(Logger::ERROR, "could not initialize the scripting processor");
3100 delete[] scrprocs;
3101 delete[] dbs;
3102 delete[] ulogdbs;
3103 delete ulog;
3104 return 1;
3105 }
3106 if (!scrprocs[i].load(scrpath))
3107 serv.log(Logger::ERROR, "could not load a script file: %s", scrpath);
3108 }
3109 }
3110 kt::SharedLibrary plsvlib;
3111 kt::PluggableServer* plsv = NULL;
3112 if (plsvpath) {
3113 serv.log(Logger::SYSTEM, "loading a plug-in server file: path=%s", plsvpath);
3114 if (!plsvlib.open(plsvpath)) {
3115 serv.log(Logger::ERROR, "could not load a plug-in server file: %s", plsvpath);
3116 delete[] scrprocs;
3117 delete[] dbs;
3118 delete[] ulogdbs;
3119 delete ulog;
3120 return 1;
3121 }
3122 kt::KTSERVINIT init = (kt::KTSERVINIT)plsvlib.symbol(kt::KTSERVINITNAME);
3123 if (!init) {
3124 serv.log(Logger::ERROR, "could not find the initializer: %s: %s",
3125 plsvpath, kt::KTSERVINITNAME);
3126 delete[] scrprocs;
3127 delete[] dbs;
3128 delete[] ulogdbs;
3129 delete ulog;
3130 return 1;
3131 }
3132 plsv = init();
3133 plsv->configure(dbs, dbnum, &logger, logkinds, plsvex);
3134 }
3135 OpCount* opcounts = new OpCount[thnum];
3136 for (int32_t i = 0; i < thnum; i++) {
3137 for (int32_t j = 0; j <= CNTMISC; j++) {
3138 opcounts[i][j] = 0;
3139 }
3140 }
3141 kc::CondMap condmap;
3142 Worker worker(thnum, &condmap, dbs, dbnum, dbmap, omode, asi, ash, bgspath, bgsi, bgscomp,
3143 ulog, ulogdbs, cmdpath, scrprocs, opcounts);
3144 serv.set_worker(&worker, thnum);
3145 if (pidpath) {
3146 char numbuf[kc::NUMBUFSIZ];
3147 size_t nsiz = std::sprintf(numbuf, "%d\n", g_procid);
3148 kc::File::write_file(pidpath, numbuf, nsiz);
3149 }
3150 bool err = false;
3151 while (true) {
3152 g_restart = false;
3153 g_serv = &serv;
3154 Slave slave(sid, rtspath, mhost, mport, riv, &serv, dbs, dbnum, ulog, ulogdbs);
3155 slave.start();
3156 worker.set_misc_conf(&slave);
3157 PlugInDriver pldriver(plsv);
3158 if (plsv) pldriver.start();
3159 if (serv.start()) {
3160 condmap.broadcast_all();
3161 if (!serv.finish()) err = true;
3162 } else {
3163 err = true;
3164 }
3165 kc::Thread::sleep(0.5);
3166 if (plsv) {
3167 plsv->stop();
3168 pldriver.join();
3169 if (pldriver.error()) err = true;
3170 kc::Thread::sleep(0.1);
3171 }
3172 slave.stop();
3173 slave.join();
3174 if (!g_restart || err) break;
3175 logger.close();
3176 if (!logger.open(logpath)) {
3177 eprintf("%s: %s: could not open the log file\n", g_progname, logpath ? logpath : "-");
3178 err = true;
3179 break;
3180 }
3181 if (scrprocs) {
3182 serv.log(Logger::SYSTEM, "reloading a script file: path=%s", scrpath);
3183 for (int32_t i = 0; i < thnum; i++) {
3184 scrprocs[i].clear();
3185 if (!scrprocs[i].set_resources(i, &serv, dbs, dbnum, &dbmap)) {
3186 serv.log(Logger::ERROR, "could not initialize the scripting processor");
3187 err = true;
3188 break;
3189 }
3190 if (!scrprocs[i].load(scrpath))
3191 serv.log(Logger::ERROR, "could not load a script file: %s", scrpath);
3192 }
3193 }
3194 if (err) break;
3195 }
3196 if (pidpath) kc::File::remove(pidpath);
3197 delete[] opcounts;
3198 if (plsv) {
3199 delete plsv;
3200 if (!plsvlib.close()) {
3201 eprintf("%s: closing a shared library failed\n", g_progname);
3202 err = true;
3203 }
3204 }
3205 if (bgspath) {
3206 serv.log(Logger::SYSTEM, "snapshotting databases");
3207 if (!dosnapshot(bgspath, bgscomp, dbs, dbnum, &serv)) err = true;
3208 }
3209 delete[] scrprocs;
3210 for (int32_t i = 0; i < dbnum; i++) {
3211 const std::string& dbpath = dbpaths[i];
3212 serv.log(Logger::SYSTEM, "closing a database: path=%s", dbpath.c_str());
3213 if (!dbs[i].close()) {
3214 const kc::BasicDB::Error& e = dbs[i].error();
3215 serv.log(Logger::ERROR, "could not close a database file: %s: %s: %s",
3216 dbpath.c_str(), e.name(), e.message());
3217 err = true;
3218 }
3219 }
3220 delete[] dbs;
3221 if (ulog) {
3222 delete[] ulogdbs;
3223 if (!ulog->close()) {
3224 eprintf("%s: closing the update log faild\n", g_progname);
3225 err = true;
3226 }
3227 delete ulog;
3228 }
3229 if (pldbinit && !pldblib.close()) {
3230 eprintf("%s: closing a shared library failed\n", g_progname);
3231 err = true;
3232 }
3233 serv.log(Logger::SYSTEM, "================ [FINISH]: pid=%d", g_procid);
3234 return err ? 1 : 0;
3235 }
3236
3237
3238 // snapshot all databases
dosnapshot(const char * bgspath,kc::Compressor * bgscomp,kt::TimedDB * dbs,int32_t dbnum,kt::RPCServer * serv)3239 static bool dosnapshot(const char* bgspath, kc::Compressor* bgscomp,
3240 kt::TimedDB* dbs, int32_t dbnum, kt::RPCServer* serv) {
3241 bool err = false;
3242 for (int32_t i = 0; i < dbnum; i++) {
3243 kt::TimedDB* db = dbs + i;
3244 std::string destpath;
3245 kc::strprintf(&destpath, "%s%c%08d%c%s",
3246 bgspath, kc::File::PATHCHR, i, kc::File::EXTCHR, BGSPATHEXT);
3247 std::string tmppath;
3248 kc::strprintf(&tmppath, "%s%ctmp", destpath.c_str(), kc::File::EXTCHR);
3249 int32_t cnt = 0;
3250 while (true) {
3251 if (db->dump_snapshot_atomic(tmppath, bgscomp)) {
3252 if (!kc::File::rename(tmppath, destpath)) {
3253 serv->log(Logger::ERROR, "renaming a file failed: %s: %s",
3254 tmppath.c_str(), destpath.c_str());
3255 }
3256 kc::File::remove(tmppath);
3257 break;
3258 }
3259 kc::File::remove(tmppath);
3260 const kc::BasicDB::Error& e = db->error();
3261 if (e != kc::BasicDB::Error::LOGIC) {
3262 serv->log(Logger::ERROR, "database error: %d: %s: %s", e.code(), e.name(), e.message());
3263 break;
3264 }
3265 if (++cnt >= 3) {
3266 serv->log(Logger::SYSTEM, "snapshotting was abandoned");
3267 err = true;
3268 break;
3269 }
3270 serv->log(Logger::INFO, "retrying snapshot: %d", cnt);
3271 }
3272 kc::Thread::yield();
3273 }
3274 return !err;
3275 }
3276
3277
3278
3279 // END OF FILE
3280