1 /************************************************************************************************* 2 * Database extension 3 * Copyright (C) 2009-2012 FAL Labs 4 * This file is part of Kyoto Tycoon. 5 * This program is free software: you can redistribute it and/or modify it under the terms of 6 * the GNU General Public License as published by the Free Software Foundation, either version 7 * 3 of the License, or any later version. 8 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 9 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. stopserver(int signum)10 * See the GNU General Public License for more details. 11 * You should have received a copy of the GNU General Public License along with this program. 12 * If not, see <http://www.gnu.org/licenses/>. 13 *************************************************************************************************/ 14 15 16 #ifndef _KTDBEXT_H // duplication check 17 #define _KTDBEXT_H 18 19 #include <myconf.h> 20 #include <ktcommon.h> 21 #include <ktutil.h> 22 #include <ktulog.h> 23 #include <ktshlib.h> 24 #include <kttimeddb.h> 25 26 namespace kyototycoon { // common namespace 27 28 29 /** 30 * MapReduce framework. 31 * @note Although this framework is not distributed or concurrent, it is useful for aggregate 32 * calculation with less CPU loading and less memory usage. 33 */ 34 class MapReduce { 35 public: 36 class ValueIterator; 37 private: 38 class FlushThread; 39 class ReduceTaskQueue; 40 class MapVisitor; 41 struct MergeLine; 42 /** An alias of vector of loaded values. */ 43 typedef std::vector<std::string> Values; 44 /** The default number of temporary databases. */ 45 static const size_t DEFDBNUM = 8; 46 /** The maxinum number of temporary databases. */ 47 static const size_t MAXDBNUM = 256; 48 /** The default cache limit. */ 49 static const int64_t DEFCLIM = 512LL << 20; 50 /** The default cache bucket numer. */ 51 static const int64_t DEFCBNUM = 1048583LL; 52 /** The bucket number of temprary databases. */ 53 static const int64_t DBBNUM = 512LL << 10; 54 /** The page size of temprary databases. */ 55 static const int32_t DBPSIZ = 32768; 56 /** The mapped size of temprary databases. */ 57 static const int64_t DBMSIZ = 516LL * 4096; 58 /** The page cache capacity of temprary databases. */ 59 static const int64_t DBPCCAP = 16LL << 20; 60 /** The default number of threads in parallel mode. */ 61 static const size_t DEFTHNUM = 8; 62 /** The number of slots of the record lock. */ 63 static const int32_t RLOCKSLOT = 256; 64 public: 65 /** 66 * Value iterator for the reducer. 67 */ 68 class ValueIterator { 69 friend class MapReduce; 70 public: 71 /** 72 * Get the next value. 73 * @param sp the pointer to the variable into which the size of the region of the return 74 * value is assigned. 75 * @return the pointer to the next value region, or NULL if no value remains. 76 */ 77 const char* next(size_t* sp) { 78 _assert_(sp); 79 if (!vptr_) { 80 if (vit_ == vend_) return NULL; 81 vptr_ = vit_->data(); 82 vsiz_ = vit_->size(); 83 vit_++; 84 } 85 uint64_t vsiz; 86 size_t step = kc::readvarnum(vptr_, vsiz_, &vsiz); 87 vptr_ += step; 88 vsiz_ -= step; 89 const char* vbuf = vptr_; 90 *sp = vsiz; 91 vptr_ += vsiz; 92 vsiz_ -= vsiz; 93 if (vsiz_ < 1) vptr_ = NULL; 94 return vbuf; 95 } 96 private: 97 /** 98 * Default constructor. 99 */ 100 explicit ValueIterator(Values::const_iterator vit, Values::const_iterator vend) : 101 vit_(vit), vend_(vend), vptr_(NULL), vsiz_(0) { 102 _assert_(true); 103 } 104 /** 105 * Destructor. 106 */ 107 ~ValueIterator() { 108 _assert_(true); 109 } 110 /** Dummy constructor to forbid the use. */ 111 ValueIterator(const ValueIterator&); 112 /** Dummy Operator to forbid the use. */ 113 ValueIterator& operator =(const ValueIterator&); 114 /** The current iterator of loaded values. */ 115 Values::const_iterator vit_; 116 /** The ending iterator of loaded values. */ 117 Values::const_iterator vend_; 118 /** The pointer of the current value. */ 119 const char* vptr_; 120 /** The size of the current value. */ 121 size_t vsiz_; 122 }; 123 /** 124 * Execution options. 125 */ 126 enum Option { 127 XNOLOCK = 1 << 0, ///< avoid locking against update operations 128 XPARAMAP = 1 << 1, ///< run mappers in parallel 129 XPARARED = 1 << 2, ///< run reducers in parallel 130 XPARAFLS = 1 << 3, ///< run cache flushers in parallel 131 XNOCOMP = 1 << 8 ///< avoid compression of temporary databases 132 }; 133 /** 134 * Default constructor. 135 */ 136 explicit MapReduce() : 137 rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0), 138 mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM), 139 cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NULL), 140 redtasks_(NULL), redaborted_(false), rlocks_(NULL) { 141 _assert_(true); 142 } 143 /** 144 * Destructor. 145 */ 146 virtual ~MapReduce() { 147 _assert_(true); 148 } 149 /** 150 * Map a record data. 151 * @param kbuf the pointer to the key region. 152 * @param ksiz the size of the key region. 153 * @param vbuf the pointer to the value region. 154 * @param vsiz the size of the value region. 155 * @return true on success, or false on failure. 156 * @note This function can call the MapReduce::emit method to emit a record. To avoid 157 * deadlock, any explicit database operation must not be performed in this function. 158 */ 159 virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) = 0; 160 /** 161 * Reduce a record data. 162 * @param kbuf the pointer to the key region. 163 * @param ksiz the size of the key region. 164 * @param iter the iterator to get the values. 165 * @return true on success, or false on failure. 166 * @note To avoid deadlock, any explicit database operation must not be performed in this 167 * function. 168 */ 169 virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; 170 /** 171 * Preprocess the map operations. 172 * @return true on success, or false on failure. 173 * @note This function can call the MapReduce::emit method to emit a record. To avoid 174 * deadlock, any explicit database operation must not be performed in this function. 175 */ 176 virtual bool preprocess() { 177 _assert_(true); 178 return true; 179 } 180 /** 181 * Mediate between the map and the reduce phases. 182 * @return true on success, or false on failure. 183 * @note This function can call the MapReduce::emit method to emit a record. To avoid 184 * deadlock, any explicit database operation must not be performed in this function. 185 */ 186 virtual bool midprocess() { 187 _assert_(true); 188 return true; 189 } 190 /** 191 * Postprocess the reduce operations. 192 * @return true on success, or false on failure. 193 * @note To avoid deadlock, any explicit database operation must not be performed in this 194 * function. 195 */ 196 virtual bool postprocess() { 197 _assert_(true); 198 return true; 199 } 200 /** 201 * Process a log message. 202 * @param name the name of the event. 203 * @param message a supplement message. 204 * @return true on success, or false on failure. 205 */ 206 virtual bool log(const char* name, const char* message) { 207 _assert_(name && message); 208 return true; 209 } 210 /** 211 * Execute the MapReduce process about a database. 212 * @param db the source database. 213 * @param tmppath the path of a directory for the temporary data storage. If it is an empty 214 * string, temporary data are handled on memory. 215 * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking 216 * against update operations by other threads, MapReduce::XNOCOMP to avoid compression of 217 * temporary databases. 218 * @return true on success, or false on failure. 219 */ 220 bool execute(TimedDB* db, const std::string& tmppath = "", uint32_t opts = 0) { 221 int64_t count = db->count(); 222 if (count < 0) { 223 if (db->error() != kc::BasicDB::Error::NOIMPL) return false; 224 count = 0; 225 } 226 bool err = false; 227 double stime, etime; 228 db_ = db; 229 rcomp_ = kc::LEXICALCOMP; 230 kc::BasicDB* idb = db->reveal_inner_db(); 231 const std::type_info& info = typeid(*idb); 232 if (info == typeid(kc::GrassDB)) { 233 kc::GrassDB* gdb = (kc::GrassDB*)idb; 234 rcomp_ = gdb->rcomp(); 235 } else if (info == typeid(kc::TreeDB)) { 236 kc::TreeDB* tdb = (kc::TreeDB*)idb; 237 rcomp_ = tdb->rcomp(); 238 } else if (info == typeid(kc::ForestDB)) { 239 kc::ForestDB* fdb = (kc::ForestDB*)idb; 240 rcomp_ = fdb->rcomp(); 241 } 242 tmpdbs_ = new kc::BasicDB*[dbnum_]; 243 if (tmppath.empty()) { 244 if (!logf("prepare", "started to open temporary databases on memory")) err = true; 245 stime = kc::time(); 246 for (size_t i = 0; i < dbnum_; i++) { 247 kc::GrassDB* gdb = new kc::GrassDB; 248 int32_t myopts = 0; 249 if (!(opts & XNOCOMP)) myopts |= kc::GrassDB::TCOMPRESS; 250 gdb->tune_options(myopts); 251 gdb->tune_buckets(DBBNUM / 2); 252 gdb->tune_page(DBPSIZ); 253 gdb->tune_page_cache(DBPCCAP); 254 gdb->tune_comparator(rcomp_); 255 gdb->open("%", kc::GrassDB::OWRITER | kc::GrassDB::OCREATE | kc::GrassDB::OTRUNCATE); 256 tmpdbs_[i] = gdb; 257 } 258 etime = kc::time(); 259 if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) 260 err = true; 261 if (err) { 262 delete[] tmpdbs_; 263 return false; 264 } 265 } else { 266 kc::File::Status sbuf; 267 if (!kc::File::status(tmppath, &sbuf) || !sbuf.isdir) { 268 db->set_error(kc::BasicDB::Error::NOREPOS, "no such directory"); 269 delete[] tmpdbs_; 270 return false; 271 } 272 if (!logf("prepare", "started to open temporary databases under %s", tmppath.c_str())) 273 err = true; 274 stime = kc::time(); 275 uint32_t pid = getpid() & kc::UINT16MAX; 276 uint32_t tid = kc::Thread::hash() & kc::UINT16MAX; 277 uint32_t ts = kc::time() * 1000; 278 for (size_t i = 0; i < dbnum_; i++) { 279 std::string childpath = 280 kc::strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", tmppath.c_str(), 281 kc::File::PATHCHR, pid, tid, ts, (int)(i + 1), kc::File::EXTCHR); 282 kc::TreeDB* tdb = new kc::TreeDB; 283 int32_t myopts = kc::TreeDB::TSMALL | kc::TreeDB::TLINEAR; 284 if (!(opts & XNOCOMP)) myopts |= kc::TreeDB::TCOMPRESS; 285 tdb->tune_options(myopts); 286 tdb->tune_buckets(DBBNUM); 287 tdb->tune_page(DBPSIZ); 288 tdb->tune_map(DBMSIZ); 289 tdb->tune_page_cache(DBPCCAP); 290 tdb->tune_comparator(rcomp_); 291 if (!tdb->open(childpath, 292 kc::TreeDB::OWRITER | kc::TreeDB::OCREATE | kc::TreeDB::OTRUNCATE)) { 293 const kc::BasicDB::Error& e = tdb->error(); 294 db->set_error(e.code(), e.message()); 295 err = true; 296 } 297 tmpdbs_[i] = tdb; 298 } 299 etime = kc::time(); 300 if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) 301 err = true; 302 if (err) { 303 for (size_t i = 0; i < dbnum_; i++) { 304 delete tmpdbs_[i]; 305 } 306 delete[] tmpdbs_; 307 return false; 308 } 309 } 310 if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; 311 if (opts & XPARAFLS) flsths_ = new std::deque<FlushThread*>; 312 if (opts & XNOLOCK) { 313 MapChecker mapchecker; 314 MapVisitor mapvisitor(this, &mapchecker, count); 315 mapvisitor.visit_before(); 316 if (!err) { 317 TimedDB::Cursor* cur = db->cursor(); 318 if (!cur->jump() && cur->error() != kc::BasicDB::Error::NOREC) err = true; 319 while (!err) { 320 if (!cur->accept(&mapvisitor, false, true)) { 321 if (cur->error() != kc::BasicDB::Error::NOREC) err = true; 322 break; 323 } 324 } 325 delete cur; 326 } 327 if (mapvisitor.error()) err = true; 328 mapvisitor.visit_after(); 329 } else if (opts & XPARAMAP) { 330 MapChecker mapchecker; 331 MapVisitor mapvisitor(this, &mapchecker, count); 332 rlocks_ = new kc::SlottedMutex(RLOCKSLOT); 333 if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) { 334 db_->set_error(kc::BasicDB::Error::LOGIC, "mapper failed"); 335 err = true; 336 } 337 delete rlocks_; 338 rlocks_ = NULL; 339 if (mapvisitor.error()) err = true; 340 } else { 341 MapChecker mapchecker; 342 MapVisitor mapvisitor(this, &mapchecker, count); 343 if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true; 344 if (mapvisitor.error()) err = true; 345 } 346 if (flsths_) { 347 delete flsths_; 348 flsths_ = NULL; 349 } 350 if (redtasks_) { 351 delete redtasks_; 352 redtasks_ = NULL; 353 } 354 if (!logf("clean", "closing the temporary databases")) err = true; 355 stime = kc::time(); 356 for (size_t i = 0; i < dbnum_; i++) { 357 std::string path = tmpdbs_[i]->path(); 358 if (!tmpdbs_[i]->clear()) { 359 const kc::BasicDB::Error& e = tmpdbs_[i]->error(); 360 db->set_error(e.code(), e.message()); 361 err = true; 362 } 363 if (!tmpdbs_[i]->close()) { 364 const kc::BasicDB::Error& e = tmpdbs_[i]->error(); 365 db->set_error(e.code(), e.message()); 366 err = true; 367 } 368 if (!tmppath.empty()) kc::File::remove(path); 369 delete tmpdbs_[i]; 370 } 371 etime = kc::time(); 372 if (!logf("clean", "closing the temporary databases finished: time=%.6f", 373 etime - stime)) err = true; 374 delete[] tmpdbs_; 375 return !err; 376 } 377 /** 378 * Set the storage configurations. 379 * @param dbnum the number of temporary databases. 380 * @param clim the limit size of the internal cache. 381 * @param cbnum the bucket number of the internal cache. 382 */ 383 void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { 384 _assert_(true); 385 dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; 386 if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; 387 clim_ = clim > 0 ? clim : DEFCLIM; 388 cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; 389 if (cbnum_ > kc::INT16MAX) cbnum_ = kc::nearbyprime(cbnum_); 390 } 391 /** 392 * Set the thread configurations. 393 * @param mapthnum the number of threads for the mapper. 394 * @param redthnum the number of threads for the reducer. 395 * @param flsthnum the number of threads for the internal flusher. 396 */ 397 void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) { 398 _assert_(true); 399 mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; 400 redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; 401 flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM; 402 } 403 protected: 404 /** 405 * Emit a record from the mapper. 406 * @param kbuf the pointer to the key region. 407 * @param ksiz the size of the key region. 408 * @param vbuf the pointer to the value region. 409 * @param vsiz the size of the value region. 410 * @return true on success, or false on failure. 411 */ 412 bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { 413 _assert_(kbuf && ksiz <= kc::MEMMAXSIZ && vbuf && vsiz <= kc::MEMMAXSIZ); 414 bool err = false; 415 size_t rsiz = kc::sizevarnum(vsiz) + vsiz; 416 char stack[kc::NUMBUFSIZ*4]; 417 char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; 418 char* wp = rbuf; 419 wp += kc::writevarnum(rbuf, vsiz); 420 std::memcpy(wp, vbuf, vsiz); 421 if (rlocks_) { 422 size_t bidx = kc::TinyHashMap::hash_record(kbuf, ksiz) % cbnum_; 423 size_t lidx = bidx % RLOCKSLOT; 424 rlocks_->lock(lidx); 425 cache_->append(kbuf, ksiz, rbuf, rsiz); 426 rlocks_->unlock(lidx); 427 } else { 428 cache_->append(kbuf, ksiz, rbuf, rsiz); 429 } 430 if (rbuf != stack) delete[] rbuf; 431 csiz_ += kc::sizevarnum(ksiz) + ksiz + rsiz; 432 return !err; 433 } 434 private: 435 /** 436 * Cache flusher. 437 */ 438 class FlushThread : public kc::Thread { 439 public: 440 /** constructor */ 441 explicit FlushThread(MapReduce* mr, kc::BasicDB* tmpdb, 442 kc::TinyHashMap* cache, size_t csiz, bool cown) : 443 mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), err_(false) {} 444 /** perform the concrete process */ 445 void run() { 446 if (!mr_->logf("map", "started to flushing the cache: count=%lld size=%lld", 447 (long long)cache_->count(), (long long)csiz_)) err_ = true; 448 double stime = kc::time(); 449 kc::BasicDB* tmpdb = tmpdb_; 450 kc::TinyHashMap* cache = cache_; 451 bool cown = cown_; 452 kc::TinyHashMap::Sorter sorter(cache); 453 const char* kbuf, *vbuf; 454 size_t ksiz, vsiz; 455 while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { 456 if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { 457 const kc::BasicDB::Error& e = tmpdb->error(); 458 mr_->db_->set_error(e.code(), e.message()); 459 err_ = true; 460 } 461 sorter.step(); 462 if (cown) cache->remove(kbuf, ksiz); 463 } 464 double etime = kc::time(); 465 if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime - stime)) 466 err_ = true; 467 if (cown) delete cache; 468 } 469 /** check the error flag. */ 470 bool error() { 471 return err_; 472 } 473 private: 474 MapReduce* mr_; ///< driver 475 kc::BasicDB* tmpdb_; ///< temprary database 476 kc::TinyHashMap* cache_; ///< cache for emitter 477 size_t csiz_; ///< current cache size 478 bool cown_; ///< cache ownership flag 479 bool err_; ///< error flag 480 }; 481 /** 482 * Task queue for parallel reducer. 483 */ 484 class ReduceTaskQueue : public kc::TaskQueue { 485 public: 486 /** 487 * Task for parallel reducer. 488 */ 489 class ReduceTask : public Task { 490 friend class ReduceTaskQueue; 491 public: 492 /** constructor */ 493 explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, const Values& values) : 494 mr_(mr), key_(kbuf, ksiz), values_(values) {} 495 private: 496 MapReduce* mr_; ///< driver 497 std::string key_; ///< key 498 Values values_; ///< values 499 }; 500 /** constructor */ 501 explicit ReduceTaskQueue() {} 502 private: 503 /** process a task */ 504 void do_task(Task* task) { 505 ReduceTask* rtask = (ReduceTask*)task; 506 ValueIterator iter(rtask->values_.begin(), rtask->values_.end()); 507 if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter)) 508 rtask->mr_->redaborted_ = true; 509 delete rtask; 510 } 511 }; 512 /** 513 * Checker for the map process. 514 */ 515 class MapChecker : public kc::BasicDB::ProgressChecker { 516 public: 517 /** constructor */ 518 explicit MapChecker() : stop_(false) {} 519 /** stop the process */ 520 void stop() { 521 stop_ = true; 522 } 523 /** check whether stopped */ 524 bool stopped() { 525 return stop_; 526 } 527 private: 528 /** check whether stopped */ 529 bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) { 530 return !stop_; 531 } 532 bool stop_; ///< flag for stop 533 }; 534 /** 535 * Visitor for the map process. 536 */ 537 class MapVisitor : public TimedDB::Visitor { 538 public: 539 /** constructor */ 540 explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) : 541 mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {} 542 /** get the error flag */ 543 bool error() { 544 return err_; 545 } 546 /** preprocess the mappter */ 547 void visit_before() { 548 mr_->dbclock_ = 0; 549 mr_->cache_ = new kc::TinyHashMap(mr_->cbnum_); 550 mr_->csiz_ = 0; 551 if (!mr_->preprocess()) err_ = true; 552 if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; 553 if (!mr_->logf("map", "started the map process: scale=%lld", (long long)scale_)) 554 err_ = true; 555 stime_ = kc::time(); 556 } 557 /** postprocess the mappter and call the reducer */ 558 void visit_after() { 559 if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; 560 double etime = kc::time(); 561 if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_)) 562 err_ = true; 563 if (!mr_->midprocess()) err_ = true; 564 if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true; 565 delete mr_->cache_; 566 if (mr_->flsths_ && !mr_->flsths_->empty()) { 567 std::deque<FlushThread*>::iterator flthit = mr_->flsths_->begin(); 568 std::deque<FlushThread*>::iterator flthitend = mr_->flsths_->end(); 569 while (flthit != flthitend) { 570 FlushThread* flth = *flthit; 571 flth->join(); 572 if (flth->error()) err_ = true; 573 delete flth; 574 ++flthit; 575 } 576 } 577 if (!err_ && !mr_->execute_reduce()) err_ = true; 578 if (!mr_->postprocess()) err_ = true; 579 } 580 private: 581 /** visit a record */ 582 const char* visit_full(const char* kbuf, size_t ksiz, 583 const char* vbuf, size_t vsiz, size_t* sp, int64_t* xtp) { 584 if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { 585 checker_->stop(); 586 err_ = true; 587 } 588 if (mr_->rlocks_) { 589 if (mr_->csiz_ >= mr_->clim_) { 590 mr_->rlocks_->lock_all(); 591 if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { 592 checker_->stop(); 593 err_ = true; 594 } 595 mr_->rlocks_->unlock_all(); 596 } 597 } else { 598 if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { 599 checker_->stop(); 600 err_ = true; 601 } 602 } 603 return NOP; 604 } 605 MapReduce* mr_; ///< driver 606 MapChecker* checker_; ///< checker 607 int64_t scale_; ///< number of records 608 double stime_; ///< start time 609 bool err_; ///< error flag 610 }; 611 /** 612 * Front line of a merging list. 613 */ 614 struct MergeLine { 615 kc::BasicDB::Cursor* cur; ///< cursor 616 kc::Comparator* rcomp; ///< record comparator 617 char* kbuf; ///< pointer to the key 618 size_t ksiz; ///< size of the key 619 const char* vbuf; ///< pointer to the value 620 size_t vsiz; ///< size of the value 621 /** comparing operator */ 622 bool operator <(const MergeLine& right) const { 623 return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0; 624 } 625 }; 626 /** 627 * Process a log message. 628 * @param name the name of the event. 629 * @param format the printf-like format string. 630 * @param ... used according to the format string. 631 * @return true on success, or false on failure. 632 */ 633 bool logf(const char* name, const char* format, ...) { 634 _assert_(name && format); 635 va_list ap; 636 va_start(ap, format); 637 std::string message; 638 kc::vstrprintf(&message, format, ap); 639 va_end(ap); 640 return log(name, message.c_str()); 641 } 642 /** 643 * Flush all cache records. 644 * @return true on success, or false on failure. 645 */ 646 bool flush_cache() { 647 _assert_(true); 648 bool err = false; 649 kc::BasicDB* tmpdb = tmpdbs_[dbclock_]; 650 dbclock_ = (dbclock_ + 1) % dbnum_; 651 if (flsths_) { 652 size_t num = flsths_->size(); 653 if (num >= flsthnum_ || num >= dbnum_) { 654 FlushThread* flth = flsths_->front(); 655 flsths_->pop_front(); 656 flth->join(); 657 if (flth->error()) err = true; 658 delete flth; 659 } 660 FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true); 661 cache_ = new kc::TinyHashMap(cbnum_); 662 csiz_ = 0; 663 flth->start(); 664 flsths_->push_back(flth); 665 } else { 666 FlushThread flth(this, tmpdb, cache_, csiz_, false); 667 flth.run(); 668 if (flth.error()) err = true; 669 cache_->clear(); 670 csiz_ = 0; 671 } 672 return !err; 673 } 674 /** 675 * Execute the reduce part. 676 * @return true on success, or false on failure. 677 */ 678 bool execute_reduce() { 679 bool err = false; 680 int64_t scale = 0; 681 for (size_t i = 0; i < dbnum_; i++) { 682 scale += tmpdbs_[i]->count(); 683 } 684 if (!logf("reduce", "started the reduce process: scale=%lld", (long long)scale)) err = true; 685 double stime = kc::time(); 686 if (redtasks_) redtasks_->start(redthnum_); 687 std::priority_queue<MergeLine> lines; 688 for (size_t i = 0; i < dbnum_; i++) { 689 MergeLine line; 690 line.cur = tmpdbs_[i]->cursor(); 691 line.rcomp = rcomp_; 692 line.cur->jump(); 693 line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); 694 if (line.kbuf) { 695 lines.push(line); 696 } else { 697 delete line.cur; 698 } 699 } 700 char* lkbuf = NULL; 701 size_t lksiz = 0; 702 Values values; 703 while (!err && !lines.empty()) { 704 MergeLine line = lines.top(); 705 lines.pop(); 706 if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lksiz))) { 707 if (!call_reducer(lkbuf, lksiz, values)) err = true; 708 values.clear(); 709 } 710 values.push_back(std::string(line.vbuf, line.vsiz)); 711 delete[] lkbuf; 712 lkbuf = line.kbuf; 713 lksiz = line.ksiz; 714 line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); 715 if (line.kbuf) { 716 lines.push(line); 717 } else { 718 delete line.cur; 719 } 720 } 721 if (lkbuf) { 722 if (!err && !call_reducer(lkbuf, lksiz, values)) err = true; 723 values.clear(); 724 delete[] lkbuf; 725 } 726 while (!lines.empty()) { 727 MergeLine line = lines.top(); 728 lines.pop(); 729 delete[] line.kbuf; 730 delete line.cur; 731 } 732 if (redtasks_) redtasks_->finish(); 733 double etime = kc::time(); 734 if (!logf("reduce", "the reduce process finished: time=%.6f", etime - stime)) err = true; 735 return !err; 736 } 737 /** 738 * Call the reducer. 739 * @param kbuf the pointer to the key region. 740 * @param ksiz the size of the key region. 741 * @param values a vector of the values. 742 * @return true on success, or false on failure. 743 */ 744 bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { 745 _assert_(kbuf && ksiz <= kc::MEMMAXSIZ); 746 if (redtasks_) { 747 if (redaborted_) return false; 748 ReduceTaskQueue::ReduceTask* task = 749 new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values); 750 redtasks_->add_task(task); 751 return true; 752 } 753 bool err = false; 754 ValueIterator iter(values.begin(), values.end()); 755 if (!reduce(kbuf, ksiz, &iter)) err = true; 756 return !err; 757 } 758 /** Dummy constructor to forbid the use. */ 759 MapReduce(const MapReduce&); 760 /** Dummy Operator to forbid the use. */ 761 MapReduce& operator =(const MapReduce&); 762 /** The internal database. */ 763 TimedDB* db_; 764 /** The record comparator. */ 765 kc::Comparator* rcomp_; 766 /** The temporary databases. */ 767 kc::BasicDB** tmpdbs_; 768 /** The number of temporary databases. */ 769 size_t dbnum_; 770 /** The logical clock for temporary databases. */ 771 int64_t dbclock_; 772 /** The number of the mapper threads. */ 773 size_t mapthnum_; 774 /** The number of the reducer threads. */ 775 size_t redthnum_; 776 /** The number of the flusher threads. */ 777 size_t flsthnum_; 778 /** The cache for emitter. */ 779 kc::TinyHashMap* cache_; 780 /** The current size of the cache for emitter. */ 781 int64_t csiz_; 782 /** The limit size of the cache for emitter. */ 783 int64_t clim_; 784 /** The bucket number of the cache for emitter. */ 785 int64_t cbnum_; 786 /** The flush threads. */ 787 std::deque<FlushThread*>* flsths_; 788 /** The task queue for parallel reducer. */ 789 kc::TaskQueue* redtasks_; 790 /** The flag whether aborted. */ 791 bool redaborted_; 792 /** The whole lock. */ 793 kc::SlottedMutex* rlocks_; 794 }; 795 796 797 } // common namespace 798 799 #endif // duplication check 800 801 // END OF FILE 802