1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /***********************************************************************
19 *   $Id: tdriver-bru.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20 *
21 *
22 ***********************************************************************/
23 #include <iostream>
24 #include <sstream>
25 #include <fstream>
26 #include <iomanip>
27 #include <vector>
28 #include <set>
29 
30 #include <pthread.h>
31 #include <time.h>
32 #include <sys/time.h>
33 
34 #include <boost/filesystem/path.hpp>
35 #include <boost/filesystem/operations.hpp>
36 #include <boost/thread/mutex.hpp>
37 
38 #include <cppunit/extensions/HelperMacros.h>
39 #include <cppunit/extensions/TestFactoryRegistry.h>
40 #include <cppunit/ui/text/TestRunner.h>
41 
42 #include "bucketdl.h"
43 #include "elementtype.h"
44 #include "stopwatch.cpp"
45 
46 #include "bucketreuse.h"
47 
48 
49 // #undef CPPUNIT_ASSERT
50 // #define CPPUNIT_ASSERT(x)
51 
52 using namespace std;
53 using namespace boost;
54 using namespace joblist;
55 
56 //Stopwatch timer;
57 
58 //------------------------------------------------------------------------------
59 // TestDriver class derived from CppUnit
60 //------------------------------------------------------------------------------
61 
62 class BucketReUseDriver : public CppUnit::TestFixture
63 {
64 
65     CPPUNIT_TEST_SUITE(BucketReUseDriver);
66     CPPUNIT_TEST(parseConfig);
67     CPPUNIT_TEST(createFiles);
68     CPPUNIT_TEST(reuseFiles);
69     CPPUNIT_TEST(newversion);
70     CPPUNIT_TEST(concurrent);
71     CPPUNIT_TEST(concurrent_newversion);
72     CPPUNIT_TEST(concurrent_race);
73     CPPUNIT_TEST_SUITE_END();
74 
75 private:
76 public:
77     //--------------------------------------------------------------------------
78     // setup method run prior to each unit test, inherited from base
79     //--------------------------------------------------------------------------
setUp()80     void setUp()
81     {
82         clock_gettime(CLOCK_REALTIME, &ts);
83     }
84 
85     //--------------------------------------------------------------------------
86     // validate results from a unit test, inherited from base
87     //--------------------------------------------------------------------------
validateResults()88     void validateResults() {}
89 
90     //--------------------------------------------------------------------------
91     // test functions
92     //--------------------------------------------------------------------------
93     void parseConfig();
94     void createFiles();
95     void reuseFiles();
96     void newversion();
97     void concurrent();
98     void concurrent_newversion();
99     void concurrent_race();
100 
101 private:
102     //--------------------------------------------------------------------------
103     // initialize method
104     // oid : reference to the column OID used for test
105     //--------------------------------------------------------------------------
106     void initControl(execplan::CalpontSystemCatalog::OID& oid);
107 
108     //--------------------------------------------------------------------------
109     // validate results
110     // files: the filenames to be verified if exist
111     // exist: expect if the files are created or deleted
112     //--------------------------------------------------------------------------
113     void validateFileExist(set<string>& files, bool exist);
114 
115     static void* insertThread(void*);
116     static void* readThread(void*);
117 
118     static void* scanThread(void*);
119     static void* reuseThread(void*);
120     static void* raceThread(void*);
121 
122     struct ThreadArg
123     {
124         uint64_t id;                              // thread id
125         uint64_t version;                         // db version
126         uint64_t buckets;                         // max bucket numbers
127         uint64_t elements;                        // max number of elem per bucket
128         uint64_t total;                           // total number of elements
129         execplan::CalpontSystemCatalog::OID oid;  // column OID
130         BucketDataList* dl;                       // datalist
131 
132         // for sync threads
133         bool* flag;
134         pthread_mutex_t* mutex;
135         pthread_cond_t*  cond;
136 
137         // for file status check
138         set<string>*  files;
139 
ThreadArgBucketReUseDriver::ThreadArg140         ThreadArg() : id(0), version(0), buckets(0), elements(0), total(0), dl(NULL),
141             flag(NULL), mutex(NULL), cond(NULL), files(NULL) {}
142     };
143 
144     static const string column;
145 
146 public:
147     static struct timespec ts;
148 };
149 
150 CPPUNIT_TEST_SUITE_REGISTRATION(BucketReUseDriver);
151 
152 const string BucketReUseDriver::column = "tpch.lineitem.l_orderkey";
153 struct timespec BucketReUseDriver::ts;
154 
155 //------------------------------------------------------------------------------
156 // main entry point
157 //------------------------------------------------------------------------------
main(int argc,char ** argv)158 int main(int argc, char** argv)
159 {
160     CppUnit::TextUi::TestRunner runner;
161     CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
162     runner.addTest( registry.makeTest() );
163     bool wasSuccessful = runner.run( "", false );
164     return (wasSuccessful ? 0 : 1);
165 }
166 
167 struct timespec atTime();
168 ostream& operator<<(ostream& os, const struct timespec& t);
169 
parseConfig()170 void BucketReUseDriver::parseConfig()
171 {
172     cout << "ut: parseConfig start...\n" << endl;
173 
174     // before run this test, make sure "tpch.lineitem.l_orderkey" is in the Columnstore.xml
175     BucketReuseManager* control = BucketReuseManager::instance();
176     ResourceManager rm;
177     BucketReuseManager::instance()->startup(rm);
178 
179     // list all the predicates if any listed in the Columnstore.xml file
180     for (BucketReuseMap::iterator it = control->fControlMap.begin();
181             it != control->fControlMap.end(); it++)
182         cout << *(it->second);
183 
184     cout << endl;
185 
186     size_t schemap = column.find_first_of(".");
187     size_t columnp = column.find_last_of(".");
188     CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
189 
190     execplan::CalpontSystemCatalog::TableColName columnName;
191     columnName.schema = column.substr(0, schemap);
192     columnName.table  = column.substr(schemap + 1, columnp - schemap - 1);
193     columnName.column = column.substr(columnp + 1);
194 
195     string filter = "allrows";
196 
197     CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) !=
198                    control->fControlMap.end());
199 
200     cout << "ut: parseConfig done!\n" << endl;
201 }
202 
203 
createFiles()204 void BucketReUseDriver::createFiles()
205 {
206     cout << "ut: createFiles start...\n" << endl;
207 
208     ThreadArg arg;
209     arg.id = 1;
210     arg.version = 1;
211     arg.buckets = 2;
212     arg.elements = 2;
213     arg.total = 1000000;
214 
215     execplan::CalpontSystemCatalog::OID oid;
216     initControl(oid);
217     arg.oid = oid;
218 
219     set<string> files;
220     arg.files = &files;
221 
222     pthread_t t;
223     pthread_create(&t, NULL, scanThread, &arg);
224     pthread_join(t, NULL);
225 
226     validateFileExist(files, true);
227 
228     cout << "ut: createFiles done!\n" << endl;
229 }
230 
231 
reuseFiles()232 void BucketReUseDriver::reuseFiles()
233 {
234     cout << "ut: reuseFiles start...\n" << endl;
235 
236     ThreadArg arg1;
237     arg1.id = 1;
238     arg1.version = 1;
239     arg1.buckets = 2;
240     arg1.elements = 2;
241     arg1.total = 1000000;
242 
243     execplan::CalpontSystemCatalog::OID oid;
244     initControl(oid);
245     arg1.oid = oid;
246 
247     set<string> files1;
248     arg1.files = &files1;
249 
250     // create the files
251     pthread_t t1;
252     pthread_create(&t1, NULL, scanThread, &arg1);
253     pthread_join(t1, NULL);
254 
255     validateFileExist(files1, true);
256 
257     // use new datalist, reuse case
258     ThreadArg arg2 = arg1;
259     arg2.id = 2;
260 
261     set<string> files2;
262     arg2.files = &files2;
263 
264     pthread_t t2;
265     pthread_create(&t2, NULL, reuseThread, &arg2);
266     pthread_join(t2, NULL);
267 
268     validateFileExist(files2, true);
269 
270     cout << "ut: reuseFiles done!\n" << endl;
271 }
272 
273 
newversion()274 void BucketReUseDriver::newversion()
275 {
276     cout << "ut: newversion start...\n" << endl;
277 
278     ThreadArg arg1;
279     arg1.id = 1;
280     arg1.version = 1;
281     arg1.buckets = 2;
282     arg1.elements = 2;
283     arg1.total = 1000000;
284 
285     execplan::CalpontSystemCatalog::OID oid;
286     initControl(oid);
287     arg1.oid = oid;
288 
289     set<string> files1;
290     arg1.files = &files1;
291 
292     // create the files
293     pthread_t t1;
294     pthread_create(&t1, NULL, scanThread, &arg1);
295     pthread_join(t1, NULL);
296 
297     validateFileExist(files1, true);
298 
299     // new version
300     ThreadArg arg2 = arg1;
301     arg2.id = 2;
302     arg2.version = 2;
303 
304     set<string> files2;
305     arg2.files = &files2;
306 
307     pthread_t t2;
308     pthread_create(&t2, NULL, scanThread, &arg2);
309     pthread_join(t2, NULL);
310 
311     pthread_yield();
312 
313     validateFileExist(files1, false);
314     validateFileExist(files2, true);
315 
316     // read from the new files with new datalist
317     ThreadArg arg3 = arg2;
318     arg3.id = 3;
319     arg3.dl = NULL;
320 
321     set<string> files3;
322     arg3.files = &files3;
323 
324     pthread_t t3;
325     pthread_create(&t3, NULL, reuseThread, &arg3);
326     pthread_join(t3, NULL);
327 
328     validateFileExist(files3, true);
329 
330     cout << "ut: newversion done!\n" << endl;
331 }
332 
concurrent()333 void BucketReUseDriver::concurrent()
334 {
335     cout << "ut: concurrent start...\n" << endl;
336 
337     ThreadArg arg1;
338     arg1.id = 1;
339     arg1.version = 1;
340     arg1.buckets = 2;
341     arg1.elements = 2;
342     arg1.total = 1000000;
343 
344     execplan::CalpontSystemCatalog::OID oid;
345     initControl(oid);
346     arg1.oid = oid;
347 
348     set<string> files1;
349     arg1.files = &files1;
350 
351     // create the files
352     pthread_t t1;
353     pthread_create(&t1, NULL, scanThread, &arg1);
354 
355     sleep(1);
356 
357     // reuse case, current thread
358     ThreadArg arg2 = arg1;
359     arg2.id = 2;
360     set<string> files2;
361     arg2.files = &files2;
362 
363     pthread_t t2;
364     pthread_create(&t2, NULL, reuseThread, &arg2);
365 
366     pthread_join(t1, NULL);
367     validateFileExist(files1, true);
368 
369     pthread_join(t2, NULL);
370     validateFileExist(files2, true);
371 
372     cout << "ut: concurrent done!\n" << endl;
373 }
374 
concurrent_newversion()375 void BucketReUseDriver::concurrent_newversion()
376 {
377     cout << "ut: concurrent_newversion start...\n" << endl;
378 
379     bool flag = false;
380     pthread_mutex_t mutex;
381     pthread_mutex_init(&mutex, 0);
382     pthread_cond_t  cond;
383     pthread_cond_init(&cond, 0);
384 
385     ThreadArg arg1;
386     arg1.id = 1;
387     arg1.version = 1;
388     arg1.buckets = 2;
389     arg1.elements = 2;
390     arg1.total = 1000000;
391 
392     execplan::CalpontSystemCatalog::OID oid;
393     initControl(oid);
394     arg1.oid = oid;
395 
396     set<string> files1;
397     arg1.files = &files1;
398 
399     // create the files
400     pthread_t t1;
401     pthread_create(&t1, NULL, scanThread, &arg1);
402 
403     // reuse case, current thread
404     ThreadArg arg2 = arg1;
405     arg2.id = 2;
406     set<string> files2;
407     arg2.files = &files2;
408 
409     sleep(1);
410 
411     pthread_t t2;
412     pthread_create(&t2, NULL, reuseThread, &arg2);
413 
414     // new version
415     ThreadArg arg3 = arg1;
416     arg3.id = 3;
417     arg3.version = 3;
418     arg3.flag = &flag;
419     arg3.mutex = &mutex;
420     arg3.cond = &cond;
421     set<string> files3;
422     arg3.files = &files3;
423 
424     pthread_t t3;
425     pthread_create(&t3, NULL, scanThread, &arg3);
426 
427     sleep(3);
428 
429     pthread_mutex_lock(&mutex);
430     flag = true;
431     pthread_cond_broadcast(&cond);
432     pthread_mutex_unlock(&mutex);
433 
434     pthread_join(t1, NULL);
435     pthread_join(t2, NULL);
436     pthread_join(t3, NULL);
437 
438     // let cleanup thread do its job
439     pthread_yield();
440 
441     validateFileExist(files1, false);
442     validateFileExist(files2, false);
443     validateFileExist(files3, true);
444 
445     pthread_mutex_destroy(&mutex);
446     pthread_cond_destroy(&cond);
447 
448     cout << "ut: concurrent_newversion done!\n" << endl;
449 }
450 
concurrent_race()451 void BucketReUseDriver::concurrent_race()
452 {
453     cout << "ut: concurrent_race start...\n" << endl;
454 
455     ThreadArg arg1;
456     arg1.id = 1;
457     arg1.version = 1;
458     arg1.buckets = 2;
459     arg1.elements = 2;
460     arg1.total = 2000000;
461 
462     execplan::CalpontSystemCatalog::OID oid;
463     initControl(oid);
464     arg1.oid = oid;
465 
466     set<string> files1;
467     arg1.files = &files1;
468 
469     ThreadArg arg2 = arg1;
470     arg2.id = 2;
471     set<string> files2;
472     arg2.files = &files2;
473 
474     // start the version 1 threads
475     pthread_t t1;
476     pthread_t t2;
477     pthread_create(&t1, NULL, raceThread, &arg1);
478     pthread_create(&t2, NULL, raceThread, &arg2);
479 
480     // let the version 1 threads register
481     sleep(1);
482 
483     ThreadArg arg3 = arg1;
484     arg3.id = 3;
485     arg3.version = 4;
486     arg3.total = 1000000;
487     set<string> files3;
488     arg3.files = &files3;
489 
490     ThreadArg arg4 = arg3;
491     arg4.id = 4;
492     set<string> files4;
493     arg4.files = &files4;
494 
495     // start the version 4 threads
496     pthread_t t3;
497     pthread_t t4;
498     pthread_create(&t3, NULL, raceThread, &arg3);
499     pthread_create(&t4, NULL, raceThread, &arg4);
500 
501     pthread_join(t3, NULL);
502     pthread_join(t4, NULL);
503     validateFileExist(files1, true);
504     validateFileExist(files4, true);
505 
506     pthread_join(t1, NULL);
507     pthread_join(t2, NULL);
508     validateFileExist(files1, false);
509     validateFileExist(files4, true);
510 
511     cout << "ut: concurrent_race done!\n" << endl;
512 }
513 
initControl(execplan::CalpontSystemCatalog::OID & oid)514 void BucketReUseDriver::initControl(execplan::CalpontSystemCatalog::OID& oid)
515 {
516     // make sure the column for testing is in the map, "column" is a class variable
517     // to test with another column, only one place to change
518     // -- to validate the config parsing, run parseConfig --
519     config::Config* cf = config::Config::makeConfig();
520     vector<string> columns;
521     cf->getConfig("HashBucketReuse", "Predicate", columns);
522 
523     bool found = true;
524 
525     for (vector<string>::iterator it = columns.begin(); it != columns.end(); ++it)
526     {
527         if (it->compare(0, column.size(), column) == 0)
528         {
529             found = true;
530             break;
531         }
532     }
533 
534     string filter = "allrows";
535     BucketReuseManager* control = BucketReuseManager::instance();
536 
537     if (found == false)
538     {
539         cout << "tpch.lineitem.l_orderkey is not in the Columnstore.xml!)" << endl;
540         cout << "insert it to countinue unit test" << endl;
541 
542         size_t schemap = column.find_first_of(".");
543         size_t columnp = column.find_last_of(".");
544         CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
545 
546         execplan::CalpontSystemCatalog::TableColName tcn;
547         tcn.schema = column.substr(0, schemap);
548         tcn.table  = column.substr(schemap + 1, columnp - schemap - 1);
549         tcn.column = column.substr(columnp + 1);
550 
551         control->fConfigMap.insert(pair<string, BucketFileKey>(column, BucketFileKey(tcn, filter)));
552     }
553 
554     ResourceManager rm;
555     // now  start the BucketReuseManager
556     BucketReuseManager::instance()->startup(rm);
557 
558     // get the oid for registration
559     size_t schemap = column.find_first_of(".");
560     size_t columnp = column.find_last_of(".");
561     CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
562 
563     execplan::CalpontSystemCatalog::TableColName columnName;
564     columnName.schema = column.substr(0, schemap);
565     columnName.table  = column.substr(schemap + 1, columnp - schemap - 1);
566     columnName.column = column.substr(columnp + 1);
567 
568     CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) !=
569                    control->fControlMap.end());
570 }
571 
572 
validateFileExist(set<string> & files,bool exist)573 void BucketReUseDriver::validateFileExist(set<string>& files, bool exist)
574 {
575     cout << "\ncheck if files exist or not:" << endl;
576 
577     for (set<string>::iterator i = files.begin(); i != files.end(); i++)
578     {
579         filesystem::path p(i->c_str());
580         cout << (*i) << "-- ";
581 
582         if (exist)
583         {
584             CPPUNIT_ASSERT(filesystem::exists(p));
585             cout << "OK" << endl;
586         }
587         else
588         {
589             CPPUNIT_ASSERT(!filesystem::exists(p));
590             cout << "GONE" << endl;
591         }
592     }
593 
594     cout << endl;
595 }
596 
597 
insertThread(void * arg)598 void* BucketReUseDriver::insertThread(void* arg)
599 {
600     ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
601     BucketDataList* dl = a->dl;
602     CPPUNIT_ASSERT(dl != NULL);
603 
604     cout << "thread " << a->id << " start at " << atTime() << endl;
605 
606     BucketReuseControlEntry* entry = dl->reuseControl();
607 
608     for (uint64_t i = 0; i < a->buckets; i++)
609     {
610         stringstream ss;
611         ss << entry->baseName() << "." << i;
612         filesystem::path p(ss.str().c_str());
613         a->files->insert(ss.str());
614     }
615 
616     ElementType e;
617 
618     for (uint64_t i = 0; i < a->total; i++)
619     {
620         e.first = i;
621         e.second = i * 10 + a->version;  // include the version in values
622         dl->insert(e);
623     }
624 
625     cout << "thread[" << a->id << "] last element inserted at " << atTime() << endl;
626     dl->endOfInput();
627 
628     cout << "thread " << a->id << " finished at " << atTime() << endl;
629 
630     return NULL;
631 }
632 
633 
readThread(void * arg)634 void* BucketReUseDriver::readThread(void* arg)
635 {
636     ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
637     BucketDataList* dl = a->dl;
638     CPPUNIT_ASSERT(dl != NULL);
639 
640     cout << "thread " << a->id << " start at " << atTime() << endl;
641 
642     BucketReuseControlEntry* entry = dl->reuseControl();
643 
644     for (uint64_t i = 0; i < a->buckets; i++)
645     {
646         stringstream ss;
647         ss << entry->baseName() << "." << i;
648         filesystem::path p(ss.str().c_str());
649         a->files->insert(ss.str());
650     }
651 
652     ElementType e;
653     uint64_t min = 0xffffffff, max = 0, count = 0;
654     uint64_t k[a->buckets];  // count of each bucket
655     bool firstRead = true;
656 
657     for (uint64_t i = 0; i < a->buckets; i++)
658     {
659         k[i] = 0;
660         uint64_t it = dl->getIterator(i);
661 
662         while (dl->next(i, it, &e))
663         {
664             if (firstRead)
665             {
666                 cout << "thread[" << a->id << "] first read at " << atTime() << endl;
667                 firstRead = false;
668             }
669 
670             if (e.second < min) min = e.second;
671 
672             if (e.second > max) max = e.second;
673 
674             // output the first 10 of each bucket or last 10 of the datalist
675             //if (count < 10 || (a->total - count) < 10 || k[i] < 10)
676             if (count < 2 || (a->total - count) < 2 || k[i] < 2)
677                 cout << "thread[" << a->id << "] bucket:" << i
678                      << " e(" << e.first << ", " << e.second << ")" << endl;
679 
680             count++;
681             k[i]++;
682         }
683     }
684 
685     cout << "\nthread[" << a->id << "] element: count = " << count
686          << ", min/max = " << min << "/" << max << ", elements in each bucket: ";
687 
688     for (uint64_t i = 0; i < a->buckets; i++)
689         cout << k[i] << " ";
690 
691     cout << endl;
692 
693     cout << "thread " << a->id << " finished at " << atTime() << endl;
694 
695     return NULL;
696 }
697 
698 
scanThread(void * arg)699 void* BucketReUseDriver::scanThread(void* arg)
700 {
701     ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
702     BucketDataList* dl = a->dl;
703     CPPUNIT_ASSERT(dl == NULL);
704 
705     if (a->cond != NULL)
706     {
707         pthread_mutex_lock(a->mutex);
708 
709         while (*(a->flag) != true)
710             pthread_cond_wait(a->cond, a->mutex);
711 
712         pthread_mutex_unlock(a->mutex);
713     }
714 
715     cout << "thread " << a->id << " start at " << atTime() << endl;
716 
717     string dummy;
718     bool scan = false;
719     boost::shared_ptr<execplan::CalpontSystemCatalog> c =
720         execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
721     execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
722     BucketReuseControlEntry* entry = BucketReuseManager::instance()->userRegister(
723                                          tcn, dummy, a->version, a->buckets, scan);
724     CPPUNIT_ASSERT(scan == true);
725 
726     ResourceManager rm;
727     dl = new BucketDataList(a->buckets, 1, a->elements, rm);
728     dl->setElementMode(1);
729     dl->reuseControl(entry, !scan);
730 
731     ThreadArg arg1 = *a;
732     arg1.id *= 10;
733     arg1.dl = dl;
734 
735     pthread_t t1;
736     pthread_create(&t1, NULL, insertThread, &arg1);
737 
738     ThreadArg arg2 = arg1;
739     arg2.id += 1;
740     pthread_t t2;
741     pthread_create(&t2, NULL, readThread, &arg2);
742 
743     pthread_join(t1, NULL);
744     pthread_join(t2, NULL);
745 
746     delete dl;
747 
748     cout << "thread " << a->id << " finished at " << atTime() << endl;
749 
750     return NULL;
751 }
752 
753 
reuseThread(void * arg)754 void* BucketReUseDriver::reuseThread(void* arg)
755 {
756     ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
757     BucketDataList* dl = a->dl;
758     CPPUNIT_ASSERT(dl == NULL);
759 
760     if (a->cond != NULL)
761     {
762         pthread_mutex_lock(a->mutex);
763 
764         while (*(a->flag) != true)
765             pthread_cond_wait(a->cond, a->mutex);
766 
767         pthread_mutex_unlock(a->mutex);
768     }
769 
770     cout << "thread " << a->id << " start at " << atTime() << endl;
771 
772     string dummy;
773     bool scan = true;
774     boost::shared_ptr<execplan::CalpontSystemCatalog> c =
775         execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
776     execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
777     BucketReuseControlEntry* entry = BucketReuseManager::instance()->userRegister(
778                                          tcn, dummy, a->version, a->buckets, scan);
779     CPPUNIT_ASSERT(scan == false);
780 
781     ResourceManager rm;
782     dl = new BucketDataList(a->buckets, 1, a->elements, rm);
783     dl->setElementMode(1);
784     dl->reuseControl(entry, !scan);
785 
786     ThreadArg arg1 = *a;
787     arg1.id *= 10;
788     arg1.dl = dl;
789 
790     pthread_t t1;
791     pthread_create(&t1, NULL, readThread, &arg1);
792 
793     if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
794     {
795         boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
796         dl->reuseControl()->stateChange().wait(lock);
797     }
798     else
799     {
800         CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
801                        (entry->fileStatus() == BucketReuseControlEntry::ready_c));
802     }
803 
804     // the bucket files are ready
805     dl->restoreBucketInformation();
806     dl->endOfInput();
807 
808     pthread_join(t1, NULL);
809 
810     delete dl;
811 
812     cout << "thread " << a->id << " finished at " << atTime() << endl;
813 
814     return NULL;
815 }
816 
817 
raceThread(void * arg)818 void* BucketReUseDriver::raceThread(void* arg)
819 {
820     ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
821     BucketDataList* dl = a->dl;
822     CPPUNIT_ASSERT(dl == NULL);
823 
824     if (a->cond != NULL)
825     {
826         pthread_mutex_lock(a->mutex);
827 
828         while (*(a->flag) != true)
829             pthread_cond_wait(a->cond, a->mutex);
830 
831         pthread_mutex_unlock(a->mutex);
832     }
833 
834     cout << "thread " << a->id << " start at " << atTime() << endl;
835 
836     string dummy;
837     bool scan = true;
838     ResourceManager rm;
839     BucketReuseControlEntry* entry = NULL;
840     {
841         boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
842         boost::shared_ptr<execplan::CalpontSystemCatalog> c =
843             execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
844         execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
845         entry = BucketReuseManager::instance()->userRegister(
846                     tcn, dummy, a->version, a->buckets, scan);
847 
848         dl = new BucketDataList(a->buckets, 1, a->elements, rm);
849         dl->setElementMode(1);
850         dl->reuseControl(entry, !scan);
851     }
852 
853     if (scan == true)
854     {
855         CPPUNIT_ASSERT(entry->fileStatus() == BucketReuseControlEntry::progress_c);
856 
857         ThreadArg arg1 = *a;
858         arg1.id *= 10;
859         arg1.dl = dl;
860 
861         pthread_t t1;
862         pthread_create(&t1, NULL, insertThread, &arg1);
863 
864         ThreadArg arg2 = arg1;
865         arg2.id += 1;
866         pthread_t t2;
867         pthread_create(&t2, NULL, readThread, &arg2);
868 
869         pthread_join(t1, NULL);
870         pthread_join(t2, NULL);
871     }
872     else
873     {
874         ThreadArg arg1 = *a;
875         arg1.id *= 10;
876         arg1.dl = dl;
877 
878         pthread_t t1;
879         pthread_create(&t1, NULL, readThread, &arg1);
880 
881         if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
882         {
883             boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
884             dl->reuseControl()->stateChange().wait(lock);
885         }
886         else
887         {
888             CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
889                            (entry->fileStatus() == BucketReuseControlEntry::ready_c));
890         }
891 
892         // the bucket files are ready
893         dl->restoreBucketInformation();
894         dl->endOfInput();
895 
896         pthread_join(t1, NULL);
897     }
898 
899     delete dl;
900 
901     cout << "thread " << a->id << " finished at " << atTime() << endl;
902 
903     return NULL;
904 }
905 
906 
atTime()907 timespec atTime()
908 {
909     timespec ts, ts1, ts2;
910     ts1 = BucketReUseDriver::ts;
911     clock_gettime(CLOCK_REALTIME, &ts2);
912 
913     if (ts2.tv_nsec < ts1.tv_nsec)
914     {
915         ts.tv_sec  = ts2.tv_sec - ts1.tv_sec - 1;
916         ts.tv_nsec = ts2.tv_nsec + 1000000000 - ts1.tv_nsec;
917     }
918     else
919     {
920         ts.tv_sec  = ts2.tv_sec - ts1.tv_sec;
921         ts.tv_nsec = ts2.tv_nsec - ts1.tv_nsec;
922     }
923 
924     return ts;
925 }
926 
operator <<(ostream & os,const struct timespec & t)927 ostream& operator<<(ostream& os, const struct timespec& t)
928 {
929     os << t.tv_sec << "." << setw(9) << setfill('0') << t.tv_nsec << "s";
930     return os;
931 }
932 
933