1 /*************************************************************************************************
2  * Database extension
3  *                                                               Copyright (C) 2009-2012 FAL Labs
4  * This file is part of Kyoto Tycoon.
5  * This program is free software: you can redistribute it and/or modify it under the terms of
6  * the GNU General Public License as published by the Free Software Foundation, either version
7  * 3 of the License, or any later version.
8  * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
stopserver(int signum)10  * See the GNU General Public License for more details.
11  * You should have received a copy of the GNU General Public License along with this program.
12  * If not, see <http://www.gnu.org/licenses/>.
13  *************************************************************************************************/
14 
15 
16 #ifndef _KTDBEXT_H                       // duplication check
17 #define _KTDBEXT_H
18 
19 #include <myconf.h>
20 #include <ktcommon.h>
21 #include <ktutil.h>
22 #include <ktulog.h>
23 #include <ktshlib.h>
24 #include <kttimeddb.h>
25 
26 namespace kyototycoon {                  // common namespace
27 
28 
29 /**
30  * MapReduce framework.
31  * @note Although this framework is not distributed or concurrent, it is useful for aggregate
32  * calculation with less CPU loading and less memory usage.
33  */
34 class MapReduce {
35  public:
36   class ValueIterator;
37  private:
38   class FlushThread;
39   class ReduceTaskQueue;
40   class MapVisitor;
41   struct MergeLine;
42   /** An alias of vector of loaded values. */
43   typedef std::vector<std::string> Values;
44   /** The default number of temporary databases. */
45   static const size_t DEFDBNUM = 8;
46   /** The maxinum number of temporary databases. */
47   static const size_t MAXDBNUM = 256;
48   /** The default cache limit. */
49   static const int64_t DEFCLIM = 512LL << 20;
50   /** The default cache bucket numer. */
51   static const int64_t DEFCBNUM = 1048583LL;
52   /** The bucket number of temprary databases. */
53   static const int64_t DBBNUM = 512LL << 10;
54   /** The page size of temprary databases. */
55   static const int32_t DBPSIZ = 32768;
56   /** The mapped size of temprary databases. */
57   static const int64_t DBMSIZ = 516LL * 4096;
58   /** The page cache capacity of temprary databases. */
59   static const int64_t DBPCCAP = 16LL << 20;
60   /** The default number of threads in parallel mode. */
61   static const size_t DEFTHNUM = 8;
62   /** The number of slots of the record lock. */
63   static const int32_t RLOCKSLOT = 256;
64  public:
65   /**
66    * Value iterator for the reducer.
67    */
68   class ValueIterator {
69     friend class MapReduce;
70    public:
71     /**
72      * Get the next value.
73      * @param sp the pointer to the variable into which the size of the region of the return
74      * value is assigned.
75      * @return the pointer to the next value region, or NULL if no value remains.
76      */
77     const char* next(size_t* sp) {
78       _assert_(sp);
79       if (!vptr_) {
80         if (vit_ == vend_) return NULL;
81         vptr_ = vit_->data();
82         vsiz_ = vit_->size();
83         vit_++;
84       }
85       uint64_t vsiz;
86       size_t step = kc::readvarnum(vptr_, vsiz_, &vsiz);
87       vptr_ += step;
88       vsiz_ -= step;
89       const char* vbuf = vptr_;
90       *sp = vsiz;
91       vptr_ += vsiz;
92       vsiz_ -= vsiz;
93       if (vsiz_ < 1) vptr_ = NULL;
94       return vbuf;
95     }
96    private:
97     /**
98      * Default constructor.
99      */
100     explicit ValueIterator(Values::const_iterator vit, Values::const_iterator vend) :
101         vit_(vit), vend_(vend), vptr_(NULL), vsiz_(0) {
102       _assert_(true);
103     }
104     /**
105      * Destructor.
106      */
107     ~ValueIterator() {
108       _assert_(true);
109     }
110     /** Dummy constructor to forbid the use. */
111     ValueIterator(const ValueIterator&);
112     /** Dummy Operator to forbid the use. */
113     ValueIterator& operator =(const ValueIterator&);
114     /** The current iterator of loaded values. */
115     Values::const_iterator vit_;
116     /** The ending iterator of loaded values. */
117     Values::const_iterator vend_;
118     /** The pointer of the current value. */
119     const char* vptr_;
120     /** The size of the current value. */
121     size_t vsiz_;
122   };
123   /**
124    * Execution options.
125    */
126   enum Option {
127     XNOLOCK = 1 << 0,                    ///< avoid locking against update operations
128     XPARAMAP = 1 << 1,                   ///< run mappers in parallel
129     XPARARED = 1 << 2,                   ///< run reducers in parallel
130     XPARAFLS = 1 << 3,                   ///< run cache flushers in parallel
131     XNOCOMP = 1 << 8                     ///< avoid compression of temporary databases
132   };
133   /**
134    * Default constructor.
135    */
136   explicit MapReduce() :
137       rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0),
138       mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM),
139       cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NULL),
140       redtasks_(NULL), redaborted_(false), rlocks_(NULL) {
141     _assert_(true);
142   }
143   /**
144    * Destructor.
145    */
146   virtual ~MapReduce() {
147     _assert_(true);
148   }
149   /**
150    * Map a record data.
151    * @param kbuf the pointer to the key region.
152    * @param ksiz the size of the key region.
153    * @param vbuf the pointer to the value region.
154    * @param vsiz the size of the value region.
155    * @return true on success, or false on failure.
156    * @note This function can call the MapReduce::emit method to emit a record.  To avoid
157    * deadlock, any explicit database operation must not be performed in this function.
158    */
159   virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) = 0;
160   /**
161    * Reduce a record data.
162    * @param kbuf the pointer to the key region.
163    * @param ksiz the size of the key region.
164    * @param iter the iterator to get the values.
165    * @return true on success, or false on failure.
166    * @note To avoid deadlock, any explicit database operation must not be performed in this
167    * function.
168    */
169   virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0;
170   /**
171    * Preprocess the map operations.
172    * @return true on success, or false on failure.
173    * @note This function can call the MapReduce::emit method to emit a record.  To avoid
174    * deadlock, any explicit database operation must not be performed in this function.
175    */
176   virtual bool preprocess() {
177     _assert_(true);
178     return true;
179   }
180   /**
181    * Mediate between the map and the reduce phases.
182    * @return true on success, or false on failure.
183    * @note This function can call the MapReduce::emit method to emit a record.  To avoid
184    * deadlock, any explicit database operation must not be performed in this function.
185    */
186   virtual bool midprocess() {
187     _assert_(true);
188     return true;
189   }
190   /**
191    * Postprocess the reduce operations.
192    * @return true on success, or false on failure.
193    * @note To avoid deadlock, any explicit database operation must not be performed in this
194    * function.
195    */
196   virtual bool postprocess() {
197     _assert_(true);
198     return true;
199   }
200   /**
201    * Process a log message.
202    * @param name the name of the event.
203    * @param message a supplement message.
204    * @return true on success, or false on failure.
205    */
206   virtual bool log(const char* name, const char* message) {
207     _assert_(name && message);
208     return true;
209   }
210   /**
211    * Execute the MapReduce process about a database.
212    * @param db the source database.
213    * @param tmppath the path of a directory for the temporary data storage.  If it is an empty
214    * string, temporary data are handled on memory.
215    * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking
216    * against update operations by other threads, MapReduce::XNOCOMP to avoid compression of
217    * temporary databases.
218    * @return true on success, or false on failure.
219    */
220   bool execute(TimedDB* db, const std::string& tmppath = "", uint32_t opts = 0) {
221     int64_t count = db->count();
222     if (count < 0) {
223       if (db->error() != kc::BasicDB::Error::NOIMPL) return false;
224       count = 0;
225     }
226     bool err = false;
227     double stime, etime;
228     db_ = db;
229     rcomp_ = kc::LEXICALCOMP;
230     kc::BasicDB* idb = db->reveal_inner_db();
231     const std::type_info& info = typeid(*idb);
232     if (info == typeid(kc::GrassDB)) {
233       kc::GrassDB* gdb = (kc::GrassDB*)idb;
234       rcomp_ = gdb->rcomp();
235     } else if (info == typeid(kc::TreeDB)) {
236       kc::TreeDB* tdb = (kc::TreeDB*)idb;
237       rcomp_ = tdb->rcomp();
238     } else if (info == typeid(kc::ForestDB)) {
239       kc::ForestDB* fdb = (kc::ForestDB*)idb;
240       rcomp_ = fdb->rcomp();
241     }
242     tmpdbs_ = new kc::BasicDB*[dbnum_];
243     if (tmppath.empty()) {
244       if (!logf("prepare", "started to open temporary databases on memory")) err = true;
245       stime = kc::time();
246       for (size_t i = 0; i < dbnum_; i++) {
247         kc::GrassDB* gdb = new kc::GrassDB;
248         int32_t myopts = 0;
249         if (!(opts & XNOCOMP)) myopts |= kc::GrassDB::TCOMPRESS;
250         gdb->tune_options(myopts);
251         gdb->tune_buckets(DBBNUM / 2);
252         gdb->tune_page(DBPSIZ);
253         gdb->tune_page_cache(DBPCCAP);
254         gdb->tune_comparator(rcomp_);
255         gdb->open("%", kc::GrassDB::OWRITER | kc::GrassDB::OCREATE | kc::GrassDB::OTRUNCATE);
256         tmpdbs_[i] = gdb;
257       }
258       etime = kc::time();
259       if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime))
260         err = true;
261       if (err) {
262         delete[] tmpdbs_;
263         return false;
264       }
265     } else {
266       kc::File::Status sbuf;
267       if (!kc::File::status(tmppath, &sbuf) || !sbuf.isdir) {
268         db->set_error(kc::BasicDB::Error::NOREPOS, "no such directory");
269         delete[] tmpdbs_;
270         return false;
271       }
272       if (!logf("prepare", "started to open temporary databases under %s", tmppath.c_str()))
273         err = true;
274       stime = kc::time();
275       uint32_t pid = getpid() & kc::UINT16MAX;
276       uint32_t tid = kc::Thread::hash() & kc::UINT16MAX;
277       uint32_t ts = kc::time() * 1000;
278       for (size_t i = 0; i < dbnum_; i++) {
279         std::string childpath =
280             kc::strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", tmppath.c_str(),
281                           kc::File::PATHCHR, pid, tid, ts, (int)(i + 1), kc::File::EXTCHR);
282         kc::TreeDB* tdb = new kc::TreeDB;
283         int32_t myopts = kc::TreeDB::TSMALL | kc::TreeDB::TLINEAR;
284         if (!(opts & XNOCOMP)) myopts |= kc::TreeDB::TCOMPRESS;
285         tdb->tune_options(myopts);
286         tdb->tune_buckets(DBBNUM);
287         tdb->tune_page(DBPSIZ);
288         tdb->tune_map(DBMSIZ);
289         tdb->tune_page_cache(DBPCCAP);
290         tdb->tune_comparator(rcomp_);
291         if (!tdb->open(childpath,
292                        kc::TreeDB::OWRITER | kc::TreeDB::OCREATE | kc::TreeDB::OTRUNCATE)) {
293           const kc::BasicDB::Error& e = tdb->error();
294           db->set_error(e.code(), e.message());
295           err = true;
296         }
297         tmpdbs_[i] = tdb;
298       }
299       etime = kc::time();
300       if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime))
301         err = true;
302       if (err) {
303         for (size_t i = 0; i < dbnum_; i++) {
304           delete tmpdbs_[i];
305         }
306         delete[] tmpdbs_;
307         return false;
308       }
309     }
310     if (opts & XPARARED) redtasks_ = new ReduceTaskQueue;
311     if (opts & XPARAFLS) flsths_ = new std::deque<FlushThread*>;
312     if (opts & XNOLOCK) {
313       MapChecker mapchecker;
314       MapVisitor mapvisitor(this, &mapchecker, count);
315       mapvisitor.visit_before();
316       if (!err) {
317         TimedDB::Cursor* cur = db->cursor();
318         if (!cur->jump() && cur->error() != kc::BasicDB::Error::NOREC) err = true;
319         while (!err) {
320           if (!cur->accept(&mapvisitor, false, true)) {
321             if (cur->error() != kc::BasicDB::Error::NOREC) err = true;
322             break;
323           }
324         }
325         delete cur;
326       }
327       if (mapvisitor.error()) err = true;
328       mapvisitor.visit_after();
329     } else if (opts & XPARAMAP) {
330       MapChecker mapchecker;
331       MapVisitor mapvisitor(this, &mapchecker, count);
332       rlocks_ = new kc::SlottedMutex(RLOCKSLOT);
333       if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) {
334         db_->set_error(kc::BasicDB::Error::LOGIC, "mapper failed");
335         err = true;
336       }
337       delete rlocks_;
338       rlocks_ = NULL;
339       if (mapvisitor.error()) err = true;
340     } else {
341       MapChecker mapchecker;
342       MapVisitor mapvisitor(this, &mapchecker, count);
343       if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true;
344       if (mapvisitor.error()) err = true;
345     }
346     if (flsths_) {
347       delete flsths_;
348       flsths_ = NULL;
349     }
350     if (redtasks_) {
351       delete redtasks_;
352       redtasks_ = NULL;
353     }
354     if (!logf("clean", "closing the temporary databases")) err = true;
355     stime = kc::time();
356     for (size_t i = 0; i < dbnum_; i++) {
357       std::string path = tmpdbs_[i]->path();
358       if (!tmpdbs_[i]->clear()) {
359         const kc::BasicDB::Error& e = tmpdbs_[i]->error();
360         db->set_error(e.code(), e.message());
361         err = true;
362       }
363       if (!tmpdbs_[i]->close()) {
364         const kc::BasicDB::Error& e = tmpdbs_[i]->error();
365         db->set_error(e.code(), e.message());
366         err = true;
367       }
368       if (!tmppath.empty()) kc::File::remove(path);
369       delete tmpdbs_[i];
370     }
371     etime = kc::time();
372     if (!logf("clean", "closing the temporary databases finished: time=%.6f",
373               etime - stime)) err = true;
374     delete[] tmpdbs_;
375     return !err;
376   }
377   /**
378    * Set the storage configurations.
379    * @param dbnum the number of temporary databases.
380    * @param clim the limit size of the internal cache.
381    * @param cbnum the bucket number of the internal cache.
382    */
383   void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) {
384     _assert_(true);
385     dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM;
386     if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM;
387     clim_ = clim > 0 ? clim : DEFCLIM;
388     cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM;
389     if (cbnum_ > kc::INT16MAX) cbnum_ = kc::nearbyprime(cbnum_);
390   }
391   /**
392    * Set the thread configurations.
393    * @param mapthnum the number of threads for the mapper.
394    * @param redthnum the number of threads for the reducer.
395    * @param flsthnum the number of threads for the internal flusher.
396    */
397   void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) {
398     _assert_(true);
399     mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM;
400     redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM;
401     flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM;
402   }
403  protected:
404   /**
405    * Emit a record from the mapper.
406    * @param kbuf the pointer to the key region.
407    * @param ksiz the size of the key region.
408    * @param vbuf the pointer to the value region.
409    * @param vsiz the size of the value region.
410    * @return true on success, or false on failure.
411    */
412   bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
413     _assert_(kbuf && ksiz <= kc::MEMMAXSIZ && vbuf && vsiz <= kc::MEMMAXSIZ);
414     bool err = false;
415     size_t rsiz = kc::sizevarnum(vsiz) + vsiz;
416     char stack[kc::NUMBUFSIZ*4];
417     char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
418     char* wp = rbuf;
419     wp += kc::writevarnum(rbuf, vsiz);
420     std::memcpy(wp, vbuf, vsiz);
421     if (rlocks_) {
422       size_t bidx = kc::TinyHashMap::hash_record(kbuf, ksiz) % cbnum_;
423       size_t lidx = bidx % RLOCKSLOT;
424       rlocks_->lock(lidx);
425       cache_->append(kbuf, ksiz, rbuf, rsiz);
426       rlocks_->unlock(lidx);
427     } else {
428       cache_->append(kbuf, ksiz, rbuf, rsiz);
429     }
430     if (rbuf != stack) delete[] rbuf;
431     csiz_ += kc::sizevarnum(ksiz) + ksiz + rsiz;
432     return !err;
433   }
434  private:
435   /**
436    * Cache flusher.
437    */
438   class FlushThread : public kc::Thread {
439    public:
440     /** constructor */
441     explicit FlushThread(MapReduce* mr, kc::BasicDB* tmpdb,
442                          kc::TinyHashMap* cache, size_t csiz, bool cown) :
443         mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), err_(false) {}
444     /** perform the concrete process */
445     void run() {
446       if (!mr_->logf("map", "started to flushing the cache: count=%lld size=%lld",
447                      (long long)cache_->count(), (long long)csiz_)) err_ = true;
448       double stime = kc::time();
449       kc::BasicDB* tmpdb = tmpdb_;
450       kc::TinyHashMap* cache = cache_;
451       bool cown = cown_;
452       kc::TinyHashMap::Sorter sorter(cache);
453       const char* kbuf, *vbuf;
454       size_t ksiz, vsiz;
455       while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
456         if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) {
457           const kc::BasicDB::Error& e = tmpdb->error();
458           mr_->db_->set_error(e.code(), e.message());
459           err_ = true;
460         }
461         sorter.step();
462         if (cown) cache->remove(kbuf, ksiz);
463       }
464       double etime = kc::time();
465       if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime - stime))
466         err_ = true;
467       if (cown) delete cache;
468     }
469     /** check the error flag. */
470     bool error() {
471       return err_;
472     }
473    private:
474     MapReduce* mr_;                          ///< driver
475     kc::BasicDB* tmpdb_;                     ///< temprary database
476     kc::TinyHashMap* cache_;                 ///< cache for emitter
477     size_t csiz_;                            ///< current cache size
478     bool cown_;                              ///< cache ownership flag
479     bool err_;                               ///< error flag
480   };
481   /**
482    * Task queue for parallel reducer.
483    */
484   class ReduceTaskQueue : public kc::TaskQueue {
485    public:
486     /**
487      * Task for parallel reducer.
488      */
489     class ReduceTask : public Task {
490       friend class ReduceTaskQueue;
491      public:
492       /** constructor */
493       explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, const Values& values) :
494           mr_(mr), key_(kbuf, ksiz), values_(values) {}
495      private:
496       MapReduce* mr_;                    ///< driver
497       std::string key_;                  ///< key
498       Values values_;                    ///< values
499     };
500     /** constructor */
501     explicit ReduceTaskQueue() {}
502    private:
503     /** process a task */
504     void do_task(Task* task) {
505       ReduceTask* rtask = (ReduceTask*)task;
506       ValueIterator iter(rtask->values_.begin(), rtask->values_.end());
507       if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter))
508         rtask->mr_->redaborted_ = true;
509       delete rtask;
510     }
511   };
512   /**
513    * Checker for the map process.
514    */
515   class MapChecker : public kc::BasicDB::ProgressChecker {
516    public:
517     /** constructor */
518     explicit MapChecker() : stop_(false) {}
519     /** stop the process */
520     void stop() {
521       stop_ = true;
522     }
523     /** check whether stopped */
524     bool stopped() {
525       return stop_;
526     }
527    private:
528     /** check whether stopped */
529     bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) {
530       return !stop_;
531     }
532     bool stop_;                          ///< flag for stop
533   };
534   /**
535    * Visitor for the map process.
536    */
537   class MapVisitor : public TimedDB::Visitor {
538    public:
539     /** constructor */
540     explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) :
541         mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {}
542     /** get the error flag */
543     bool error() {
544       return err_;
545     }
546     /** preprocess the mappter */
547     void visit_before() {
548       mr_->dbclock_ = 0;
549       mr_->cache_ = new kc::TinyHashMap(mr_->cbnum_);
550       mr_->csiz_ = 0;
551       if (!mr_->preprocess()) err_ = true;
552       if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true;
553       if (!mr_->logf("map", "started the map process: scale=%lld", (long long)scale_))
554         err_ = true;
555       stime_ = kc::time();
556     }
557     /** postprocess the mappter and call the reducer */
558     void visit_after() {
559       if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true;
560       double etime = kc::time();
561       if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_))
562         err_ = true;
563       if (!mr_->midprocess()) err_ = true;
564       if (mr_->cache_->count() > 0 && !mr_->flush_cache()) err_ = true;
565       delete mr_->cache_;
566       if (mr_->flsths_ && !mr_->flsths_->empty()) {
567         std::deque<FlushThread*>::iterator flthit = mr_->flsths_->begin();
568         std::deque<FlushThread*>::iterator flthitend = mr_->flsths_->end();
569         while (flthit != flthitend) {
570           FlushThread* flth = *flthit;
571           flth->join();
572           if (flth->error()) err_ = true;
573           delete flth;
574           ++flthit;
575         }
576       }
577       if (!err_ && !mr_->execute_reduce()) err_ = true;
578       if (!mr_->postprocess()) err_ = true;
579     }
580    private:
581     /** visit a record */
582     const char* visit_full(const char* kbuf, size_t ksiz,
583                            const char* vbuf, size_t vsiz, size_t* sp, int64_t* xtp) {
584       if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) {
585         checker_->stop();
586         err_ = true;
587       }
588       if (mr_->rlocks_) {
589         if (mr_->csiz_ >= mr_->clim_) {
590           mr_->rlocks_->lock_all();
591           if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
592             checker_->stop();
593             err_ = true;
594           }
595           mr_->rlocks_->unlock_all();
596         }
597       } else {
598         if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
599           checker_->stop();
600           err_ = true;
601         }
602       }
603       return NOP;
604     }
605     MapReduce* mr_;                      ///< driver
606     MapChecker* checker_;                ///< checker
607     int64_t scale_;                      ///< number of records
608     double stime_;                       ///< start time
609     bool err_;                           ///< error flag
610   };
611   /**
612    * Front line of a merging list.
613    */
614   struct MergeLine {
615     kc::BasicDB::Cursor* cur;            ///< cursor
616     kc::Comparator* rcomp;               ///< record comparator
617     char* kbuf;                          ///< pointer to the key
618     size_t ksiz;                         ///< size of the key
619     const char* vbuf;                    ///< pointer to the value
620     size_t vsiz;                         ///< size of the value
621     /** comparing operator */
622     bool operator <(const MergeLine& right) const {
623       return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0;
624     }
625   };
626   /**
627    * Process a log message.
628    * @param name the name of the event.
629    * @param format the printf-like format string.
630    * @param ... used according to the format string.
631    * @return true on success, or false on failure.
632    */
633   bool logf(const char* name, const char* format, ...) {
634     _assert_(name && format);
635     va_list ap;
636     va_start(ap, format);
637     std::string message;
638     kc::vstrprintf(&message, format, ap);
639     va_end(ap);
640     return log(name, message.c_str());
641   }
642   /**
643    * Flush all cache records.
644    * @return true on success, or false on failure.
645    */
646   bool flush_cache() {
647     _assert_(true);
648     bool err = false;
649     kc::BasicDB* tmpdb = tmpdbs_[dbclock_];
650     dbclock_ = (dbclock_ + 1) % dbnum_;
651     if (flsths_) {
652       size_t num = flsths_->size();
653       if (num >= flsthnum_ || num >= dbnum_) {
654         FlushThread* flth = flsths_->front();
655         flsths_->pop_front();
656         flth->join();
657         if (flth->error()) err = true;
658         delete flth;
659       }
660       FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true);
661       cache_ = new kc::TinyHashMap(cbnum_);
662       csiz_ = 0;
663       flth->start();
664       flsths_->push_back(flth);
665     } else {
666       FlushThread flth(this, tmpdb, cache_, csiz_, false);
667       flth.run();
668       if (flth.error()) err = true;
669       cache_->clear();
670       csiz_ = 0;
671     }
672     return !err;
673   }
674   /**
675    * Execute the reduce part.
676    * @return true on success, or false on failure.
677    */
678   bool execute_reduce() {
679     bool err = false;
680     int64_t scale = 0;
681     for (size_t i = 0; i < dbnum_; i++) {
682       scale += tmpdbs_[i]->count();
683     }
684     if (!logf("reduce", "started the reduce process: scale=%lld", (long long)scale)) err = true;
685     double stime = kc::time();
686     if (redtasks_) redtasks_->start(redthnum_);
687     std::priority_queue<MergeLine> lines;
688     for (size_t i = 0; i < dbnum_; i++) {
689       MergeLine line;
690       line.cur = tmpdbs_[i]->cursor();
691       line.rcomp = rcomp_;
692       line.cur->jump();
693       line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
694       if (line.kbuf) {
695         lines.push(line);
696       } else {
697         delete line.cur;
698       }
699     }
700     char* lkbuf = NULL;
701     size_t lksiz = 0;
702     Values values;
703     while (!err && !lines.empty()) {
704       MergeLine line = lines.top();
705       lines.pop();
706       if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lksiz))) {
707         if (!call_reducer(lkbuf, lksiz, values)) err = true;
708         values.clear();
709       }
710       values.push_back(std::string(line.vbuf, line.vsiz));
711       delete[] lkbuf;
712       lkbuf = line.kbuf;
713       lksiz = line.ksiz;
714       line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
715       if (line.kbuf) {
716         lines.push(line);
717       } else {
718         delete line.cur;
719       }
720     }
721     if (lkbuf) {
722       if (!err && !call_reducer(lkbuf, lksiz, values)) err = true;
723       values.clear();
724       delete[] lkbuf;
725     }
726     while (!lines.empty()) {
727       MergeLine line = lines.top();
728       lines.pop();
729       delete[] line.kbuf;
730       delete line.cur;
731     }
732     if (redtasks_) redtasks_->finish();
733     double etime = kc::time();
734     if (!logf("reduce", "the reduce process finished: time=%.6f", etime - stime)) err = true;
735     return !err;
736   }
737   /**
738    * Call the reducer.
739    * @param kbuf the pointer to the key region.
740    * @param ksiz the size of the key region.
741    * @param values a vector of the values.
742    * @return true on success, or false on failure.
743    */
744   bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) {
745     _assert_(kbuf && ksiz <= kc::MEMMAXSIZ);
746     if (redtasks_) {
747       if (redaborted_) return false;
748       ReduceTaskQueue::ReduceTask* task =
749           new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values);
750       redtasks_->add_task(task);
751       return true;
752     }
753     bool err = false;
754     ValueIterator iter(values.begin(), values.end());
755     if (!reduce(kbuf, ksiz, &iter)) err = true;
756     return !err;
757   }
758   /** Dummy constructor to forbid the use. */
759   MapReduce(const MapReduce&);
760   /** Dummy Operator to forbid the use. */
761   MapReduce& operator =(const MapReduce&);
762   /** The internal database. */
763   TimedDB* db_;
764   /** The record comparator. */
765   kc::Comparator* rcomp_;
766   /** The temporary databases. */
767   kc::BasicDB** tmpdbs_;
768   /** The number of temporary databases. */
769   size_t dbnum_;
770   /** The logical clock for temporary databases. */
771   int64_t dbclock_;
772   /** The number of the mapper threads. */
773   size_t mapthnum_;
774   /** The number of the reducer threads. */
775   size_t redthnum_;
776   /** The number of the flusher threads. */
777   size_t flsthnum_;
778   /** The cache for emitter. */
779   kc::TinyHashMap* cache_;
780   /** The current size of the cache for emitter. */
781   int64_t csiz_;
782   /** The limit size of the cache for emitter. */
783   int64_t clim_;
784   /** The bucket number of the cache for emitter. */
785   int64_t cbnum_;
786   /** The flush threads. */
787   std::deque<FlushThread*>* flsths_;
788   /** The task queue for parallel reducer. */
789   kc::TaskQueue* redtasks_;
790   /** The flag whether aborted. */
791   bool redaborted_;
792   /** The whole lock. */
793   kc::SlottedMutex* rlocks_;
794 };
795 
796 
797 }                                        // common namespace
798 
799 #endif                                   // duplication check
800 
801 // END OF FILE
802