1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /***********************************************************************
19 * $Id: tdriver-bru.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20 *
21 *
22 ***********************************************************************/
23 #include <iostream>
24 #include <sstream>
25 #include <fstream>
26 #include <iomanip>
27 #include <vector>
28 #include <set>
29
30 #include <pthread.h>
31 #include <time.h>
32 #include <sys/time.h>
33
34 #include <boost/filesystem/path.hpp>
35 #include <boost/filesystem/operations.hpp>
36 #include <boost/thread/mutex.hpp>
37
38 #include <cppunit/extensions/HelperMacros.h>
39 #include <cppunit/extensions/TestFactoryRegistry.h>
40 #include <cppunit/ui/text/TestRunner.h>
41
42 #include "bucketdl.h"
43 #include "elementtype.h"
44 #include "stopwatch.cpp"
45
46 #include "bucketreuse.h"
47
48
49 // #undef CPPUNIT_ASSERT
50 // #define CPPUNIT_ASSERT(x)
51
52 using namespace std;
53 using namespace boost;
54 using namespace joblist;
55
56 //Stopwatch timer;
57
58 //------------------------------------------------------------------------------
59 // TestDriver class derived from CppUnit
60 //------------------------------------------------------------------------------
61
62 class BucketReUseDriver : public CppUnit::TestFixture
63 {
64
65 CPPUNIT_TEST_SUITE(BucketReUseDriver);
66 CPPUNIT_TEST(parseConfig);
67 CPPUNIT_TEST(createFiles);
68 CPPUNIT_TEST(reuseFiles);
69 CPPUNIT_TEST(newversion);
70 CPPUNIT_TEST(concurrent);
71 CPPUNIT_TEST(concurrent_newversion);
72 CPPUNIT_TEST(concurrent_race);
73 CPPUNIT_TEST_SUITE_END();
74
75 private:
76 public:
77 //--------------------------------------------------------------------------
78 // setup method run prior to each unit test, inherited from base
79 //--------------------------------------------------------------------------
setUp()80 void setUp()
81 {
82 clock_gettime(CLOCK_REALTIME, &ts);
83 }
84
85 //--------------------------------------------------------------------------
86 // validate results from a unit test, inherited from base
87 //--------------------------------------------------------------------------
validateResults()88 void validateResults() {}
89
90 //--------------------------------------------------------------------------
91 // test functions
92 //--------------------------------------------------------------------------
93 void parseConfig();
94 void createFiles();
95 void reuseFiles();
96 void newversion();
97 void concurrent();
98 void concurrent_newversion();
99 void concurrent_race();
100
101 private:
102 //--------------------------------------------------------------------------
103 // initialize method
104 // oid : reference to the column OID used for test
105 //--------------------------------------------------------------------------
106 void initControl(execplan::CalpontSystemCatalog::OID& oid);
107
108 //--------------------------------------------------------------------------
109 // validate results
110 // files: the filenames to be verified if exist
111 // exist: expect if the files are created or deleted
112 //--------------------------------------------------------------------------
113 void validateFileExist(set<string>& files, bool exist);
114
115 static void* insertThread(void*);
116 static void* readThread(void*);
117
118 static void* scanThread(void*);
119 static void* reuseThread(void*);
120 static void* raceThread(void*);
121
122 struct ThreadArg
123 {
124 uint64_t id; // thread id
125 uint64_t version; // db version
126 uint64_t buckets; // max bucket numbers
127 uint64_t elements; // max number of elem per bucket
128 uint64_t total; // total number of elements
129 execplan::CalpontSystemCatalog::OID oid; // column OID
130 BucketDataList* dl; // datalist
131
132 // for sync threads
133 bool* flag;
134 pthread_mutex_t* mutex;
135 pthread_cond_t* cond;
136
137 // for file status check
138 set<string>* files;
139
ThreadArgBucketReUseDriver::ThreadArg140 ThreadArg() : id(0), version(0), buckets(0), elements(0), total(0), dl(NULL),
141 flag(NULL), mutex(NULL), cond(NULL), files(NULL) {}
142 };
143
144 static const string column;
145
146 public:
147 static struct timespec ts;
148 };
149
150 CPPUNIT_TEST_SUITE_REGISTRATION(BucketReUseDriver);
151
152 const string BucketReUseDriver::column = "tpch.lineitem.l_orderkey";
153 struct timespec BucketReUseDriver::ts;
154
155 //------------------------------------------------------------------------------
156 // main entry point
157 //------------------------------------------------------------------------------
main(int argc,char ** argv)158 int main(int argc, char** argv)
159 {
160 CppUnit::TextUi::TestRunner runner;
161 CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
162 runner.addTest( registry.makeTest() );
163 bool wasSuccessful = runner.run( "", false );
164 return (wasSuccessful ? 0 : 1);
165 }
166
167 struct timespec atTime();
168 ostream& operator<<(ostream& os, const struct timespec& t);
169
parseConfig()170 void BucketReUseDriver::parseConfig()
171 {
172 cout << "ut: parseConfig start...\n" << endl;
173
174 // before run this test, make sure "tpch.lineitem.l_orderkey" is in the Columnstore.xml
175 BucketReuseManager* control = BucketReuseManager::instance();
176 ResourceManager rm;
177 BucketReuseManager::instance()->startup(rm);
178
179 // list all the predicates if any listed in the Columnstore.xml file
180 for (BucketReuseMap::iterator it = control->fControlMap.begin();
181 it != control->fControlMap.end(); it++)
182 cout << *(it->second);
183
184 cout << endl;
185
186 size_t schemap = column.find_first_of(".");
187 size_t columnp = column.find_last_of(".");
188 CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
189
190 execplan::CalpontSystemCatalog::TableColName columnName;
191 columnName.schema = column.substr(0, schemap);
192 columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
193 columnName.column = column.substr(columnp + 1);
194
195 string filter = "allrows";
196
197 CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) !=
198 control->fControlMap.end());
199
200 cout << "ut: parseConfig done!\n" << endl;
201 }
202
203
createFiles()204 void BucketReUseDriver::createFiles()
205 {
206 cout << "ut: createFiles start...\n" << endl;
207
208 ThreadArg arg;
209 arg.id = 1;
210 arg.version = 1;
211 arg.buckets = 2;
212 arg.elements = 2;
213 arg.total = 1000000;
214
215 execplan::CalpontSystemCatalog::OID oid;
216 initControl(oid);
217 arg.oid = oid;
218
219 set<string> files;
220 arg.files = &files;
221
222 pthread_t t;
223 pthread_create(&t, NULL, scanThread, &arg);
224 pthread_join(t, NULL);
225
226 validateFileExist(files, true);
227
228 cout << "ut: createFiles done!\n" << endl;
229 }
230
231
reuseFiles()232 void BucketReUseDriver::reuseFiles()
233 {
234 cout << "ut: reuseFiles start...\n" << endl;
235
236 ThreadArg arg1;
237 arg1.id = 1;
238 arg1.version = 1;
239 arg1.buckets = 2;
240 arg1.elements = 2;
241 arg1.total = 1000000;
242
243 execplan::CalpontSystemCatalog::OID oid;
244 initControl(oid);
245 arg1.oid = oid;
246
247 set<string> files1;
248 arg1.files = &files1;
249
250 // create the files
251 pthread_t t1;
252 pthread_create(&t1, NULL, scanThread, &arg1);
253 pthread_join(t1, NULL);
254
255 validateFileExist(files1, true);
256
257 // use new datalist, reuse case
258 ThreadArg arg2 = arg1;
259 arg2.id = 2;
260
261 set<string> files2;
262 arg2.files = &files2;
263
264 pthread_t t2;
265 pthread_create(&t2, NULL, reuseThread, &arg2);
266 pthread_join(t2, NULL);
267
268 validateFileExist(files2, true);
269
270 cout << "ut: reuseFiles done!\n" << endl;
271 }
272
273
newversion()274 void BucketReUseDriver::newversion()
275 {
276 cout << "ut: newversion start...\n" << endl;
277
278 ThreadArg arg1;
279 arg1.id = 1;
280 arg1.version = 1;
281 arg1.buckets = 2;
282 arg1.elements = 2;
283 arg1.total = 1000000;
284
285 execplan::CalpontSystemCatalog::OID oid;
286 initControl(oid);
287 arg1.oid = oid;
288
289 set<string> files1;
290 arg1.files = &files1;
291
292 // create the files
293 pthread_t t1;
294 pthread_create(&t1, NULL, scanThread, &arg1);
295 pthread_join(t1, NULL);
296
297 validateFileExist(files1, true);
298
299 // new version
300 ThreadArg arg2 = arg1;
301 arg2.id = 2;
302 arg2.version = 2;
303
304 set<string> files2;
305 arg2.files = &files2;
306
307 pthread_t t2;
308 pthread_create(&t2, NULL, scanThread, &arg2);
309 pthread_join(t2, NULL);
310
311 pthread_yield();
312
313 validateFileExist(files1, false);
314 validateFileExist(files2, true);
315
316 // read from the new files with new datalist
317 ThreadArg arg3 = arg2;
318 arg3.id = 3;
319 arg3.dl = NULL;
320
321 set<string> files3;
322 arg3.files = &files3;
323
324 pthread_t t3;
325 pthread_create(&t3, NULL, reuseThread, &arg3);
326 pthread_join(t3, NULL);
327
328 validateFileExist(files3, true);
329
330 cout << "ut: newversion done!\n" << endl;
331 }
332
concurrent()333 void BucketReUseDriver::concurrent()
334 {
335 cout << "ut: concurrent start...\n" << endl;
336
337 ThreadArg arg1;
338 arg1.id = 1;
339 arg1.version = 1;
340 arg1.buckets = 2;
341 arg1.elements = 2;
342 arg1.total = 1000000;
343
344 execplan::CalpontSystemCatalog::OID oid;
345 initControl(oid);
346 arg1.oid = oid;
347
348 set<string> files1;
349 arg1.files = &files1;
350
351 // create the files
352 pthread_t t1;
353 pthread_create(&t1, NULL, scanThread, &arg1);
354
355 sleep(1);
356
357 // reuse case, current thread
358 ThreadArg arg2 = arg1;
359 arg2.id = 2;
360 set<string> files2;
361 arg2.files = &files2;
362
363 pthread_t t2;
364 pthread_create(&t2, NULL, reuseThread, &arg2);
365
366 pthread_join(t1, NULL);
367 validateFileExist(files1, true);
368
369 pthread_join(t2, NULL);
370 validateFileExist(files2, true);
371
372 cout << "ut: concurrent done!\n" << endl;
373 }
374
concurrent_newversion()375 void BucketReUseDriver::concurrent_newversion()
376 {
377 cout << "ut: concurrent_newversion start...\n" << endl;
378
379 bool flag = false;
380 pthread_mutex_t mutex;
381 pthread_mutex_init(&mutex, 0);
382 pthread_cond_t cond;
383 pthread_cond_init(&cond, 0);
384
385 ThreadArg arg1;
386 arg1.id = 1;
387 arg1.version = 1;
388 arg1.buckets = 2;
389 arg1.elements = 2;
390 arg1.total = 1000000;
391
392 execplan::CalpontSystemCatalog::OID oid;
393 initControl(oid);
394 arg1.oid = oid;
395
396 set<string> files1;
397 arg1.files = &files1;
398
399 // create the files
400 pthread_t t1;
401 pthread_create(&t1, NULL, scanThread, &arg1);
402
403 // reuse case, current thread
404 ThreadArg arg2 = arg1;
405 arg2.id = 2;
406 set<string> files2;
407 arg2.files = &files2;
408
409 sleep(1);
410
411 pthread_t t2;
412 pthread_create(&t2, NULL, reuseThread, &arg2);
413
414 // new version
415 ThreadArg arg3 = arg1;
416 arg3.id = 3;
417 arg3.version = 3;
418 arg3.flag = &flag;
419 arg3.mutex = &mutex;
420 arg3.cond = &cond;
421 set<string> files3;
422 arg3.files = &files3;
423
424 pthread_t t3;
425 pthread_create(&t3, NULL, scanThread, &arg3);
426
427 sleep(3);
428
429 pthread_mutex_lock(&mutex);
430 flag = true;
431 pthread_cond_broadcast(&cond);
432 pthread_mutex_unlock(&mutex);
433
434 pthread_join(t1, NULL);
435 pthread_join(t2, NULL);
436 pthread_join(t3, NULL);
437
438 // let cleanup thread do its job
439 pthread_yield();
440
441 validateFileExist(files1, false);
442 validateFileExist(files2, false);
443 validateFileExist(files3, true);
444
445 pthread_mutex_destroy(&mutex);
446 pthread_cond_destroy(&cond);
447
448 cout << "ut: concurrent_newversion done!\n" << endl;
449 }
450
concurrent_race()451 void BucketReUseDriver::concurrent_race()
452 {
453 cout << "ut: concurrent_race start...\n" << endl;
454
455 ThreadArg arg1;
456 arg1.id = 1;
457 arg1.version = 1;
458 arg1.buckets = 2;
459 arg1.elements = 2;
460 arg1.total = 2000000;
461
462 execplan::CalpontSystemCatalog::OID oid;
463 initControl(oid);
464 arg1.oid = oid;
465
466 set<string> files1;
467 arg1.files = &files1;
468
469 ThreadArg arg2 = arg1;
470 arg2.id = 2;
471 set<string> files2;
472 arg2.files = &files2;
473
474 // start the version 1 threads
475 pthread_t t1;
476 pthread_t t2;
477 pthread_create(&t1, NULL, raceThread, &arg1);
478 pthread_create(&t2, NULL, raceThread, &arg2);
479
480 // let the version 1 threads register
481 sleep(1);
482
483 ThreadArg arg3 = arg1;
484 arg3.id = 3;
485 arg3.version = 4;
486 arg3.total = 1000000;
487 set<string> files3;
488 arg3.files = &files3;
489
490 ThreadArg arg4 = arg3;
491 arg4.id = 4;
492 set<string> files4;
493 arg4.files = &files4;
494
495 // start the version 4 threads
496 pthread_t t3;
497 pthread_t t4;
498 pthread_create(&t3, NULL, raceThread, &arg3);
499 pthread_create(&t4, NULL, raceThread, &arg4);
500
501 pthread_join(t3, NULL);
502 pthread_join(t4, NULL);
503 validateFileExist(files1, true);
504 validateFileExist(files4, true);
505
506 pthread_join(t1, NULL);
507 pthread_join(t2, NULL);
508 validateFileExist(files1, false);
509 validateFileExist(files4, true);
510
511 cout << "ut: concurrent_race done!\n" << endl;
512 }
513
initControl(execplan::CalpontSystemCatalog::OID & oid)514 void BucketReUseDriver::initControl(execplan::CalpontSystemCatalog::OID& oid)
515 {
516 // make sure the column for testing is in the map, "column" is a class variable
517 // to test with another column, only one place to change
518 // -- to validate the config parsing, run parseConfig --
519 config::Config* cf = config::Config::makeConfig();
520 vector<string> columns;
521 cf->getConfig("HashBucketReuse", "Predicate", columns);
522
523 bool found = true;
524
525 for (vector<string>::iterator it = columns.begin(); it != columns.end(); ++it)
526 {
527 if (it->compare(0, column.size(), column) == 0)
528 {
529 found = true;
530 break;
531 }
532 }
533
534 string filter = "allrows";
535 BucketReuseManager* control = BucketReuseManager::instance();
536
537 if (found == false)
538 {
539 cout << "tpch.lineitem.l_orderkey is not in the Columnstore.xml!)" << endl;
540 cout << "insert it to countinue unit test" << endl;
541
542 size_t schemap = column.find_first_of(".");
543 size_t columnp = column.find_last_of(".");
544 CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
545
546 execplan::CalpontSystemCatalog::TableColName tcn;
547 tcn.schema = column.substr(0, schemap);
548 tcn.table = column.substr(schemap + 1, columnp - schemap - 1);
549 tcn.column = column.substr(columnp + 1);
550
551 control->fConfigMap.insert(pair<string, BucketFileKey>(column, BucketFileKey(tcn, filter)));
552 }
553
554 ResourceManager rm;
555 // now start the BucketReuseManager
556 BucketReuseManager::instance()->startup(rm);
557
558 // get the oid for registration
559 size_t schemap = column.find_first_of(".");
560 size_t columnp = column.find_last_of(".");
561 CPPUNIT_ASSERT(schemap != string::npos && columnp != string::npos);
562
563 execplan::CalpontSystemCatalog::TableColName columnName;
564 columnName.schema = column.substr(0, schemap);
565 columnName.table = column.substr(schemap + 1, columnp - schemap - 1);
566 columnName.column = column.substr(columnp + 1);
567
568 CPPUNIT_ASSERT(control->fControlMap.find(BucketFileKey(columnName, filter)) !=
569 control->fControlMap.end());
570 }
571
572
validateFileExist(set<string> & files,bool exist)573 void BucketReUseDriver::validateFileExist(set<string>& files, bool exist)
574 {
575 cout << "\ncheck if files exist or not:" << endl;
576
577 for (set<string>::iterator i = files.begin(); i != files.end(); i++)
578 {
579 filesystem::path p(i->c_str());
580 cout << (*i) << "-- ";
581
582 if (exist)
583 {
584 CPPUNIT_ASSERT(filesystem::exists(p));
585 cout << "OK" << endl;
586 }
587 else
588 {
589 CPPUNIT_ASSERT(!filesystem::exists(p));
590 cout << "GONE" << endl;
591 }
592 }
593
594 cout << endl;
595 }
596
597
insertThread(void * arg)598 void* BucketReUseDriver::insertThread(void* arg)
599 {
600 ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
601 BucketDataList* dl = a->dl;
602 CPPUNIT_ASSERT(dl != NULL);
603
604 cout << "thread " << a->id << " start at " << atTime() << endl;
605
606 BucketReuseControlEntry* entry = dl->reuseControl();
607
608 for (uint64_t i = 0; i < a->buckets; i++)
609 {
610 stringstream ss;
611 ss << entry->baseName() << "." << i;
612 filesystem::path p(ss.str().c_str());
613 a->files->insert(ss.str());
614 }
615
616 ElementType e;
617
618 for (uint64_t i = 0; i < a->total; i++)
619 {
620 e.first = i;
621 e.second = i * 10 + a->version; // include the version in values
622 dl->insert(e);
623 }
624
625 cout << "thread[" << a->id << "] last element inserted at " << atTime() << endl;
626 dl->endOfInput();
627
628 cout << "thread " << a->id << " finished at " << atTime() << endl;
629
630 return NULL;
631 }
632
633
readThread(void * arg)634 void* BucketReUseDriver::readThread(void* arg)
635 {
636 ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
637 BucketDataList* dl = a->dl;
638 CPPUNIT_ASSERT(dl != NULL);
639
640 cout << "thread " << a->id << " start at " << atTime() << endl;
641
642 BucketReuseControlEntry* entry = dl->reuseControl();
643
644 for (uint64_t i = 0; i < a->buckets; i++)
645 {
646 stringstream ss;
647 ss << entry->baseName() << "." << i;
648 filesystem::path p(ss.str().c_str());
649 a->files->insert(ss.str());
650 }
651
652 ElementType e;
653 uint64_t min = 0xffffffff, max = 0, count = 0;
654 uint64_t k[a->buckets]; // count of each bucket
655 bool firstRead = true;
656
657 for (uint64_t i = 0; i < a->buckets; i++)
658 {
659 k[i] = 0;
660 uint64_t it = dl->getIterator(i);
661
662 while (dl->next(i, it, &e))
663 {
664 if (firstRead)
665 {
666 cout << "thread[" << a->id << "] first read at " << atTime() << endl;
667 firstRead = false;
668 }
669
670 if (e.second < min) min = e.second;
671
672 if (e.second > max) max = e.second;
673
674 // output the first 10 of each bucket or last 10 of the datalist
675 //if (count < 10 || (a->total - count) < 10 || k[i] < 10)
676 if (count < 2 || (a->total - count) < 2 || k[i] < 2)
677 cout << "thread[" << a->id << "] bucket:" << i
678 << " e(" << e.first << ", " << e.second << ")" << endl;
679
680 count++;
681 k[i]++;
682 }
683 }
684
685 cout << "\nthread[" << a->id << "] element: count = " << count
686 << ", min/max = " << min << "/" << max << ", elements in each bucket: ";
687
688 for (uint64_t i = 0; i < a->buckets; i++)
689 cout << k[i] << " ";
690
691 cout << endl;
692
693 cout << "thread " << a->id << " finished at " << atTime() << endl;
694
695 return NULL;
696 }
697
698
scanThread(void * arg)699 void* BucketReUseDriver::scanThread(void* arg)
700 {
701 ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
702 BucketDataList* dl = a->dl;
703 CPPUNIT_ASSERT(dl == NULL);
704
705 if (a->cond != NULL)
706 {
707 pthread_mutex_lock(a->mutex);
708
709 while (*(a->flag) != true)
710 pthread_cond_wait(a->cond, a->mutex);
711
712 pthread_mutex_unlock(a->mutex);
713 }
714
715 cout << "thread " << a->id << " start at " << atTime() << endl;
716
717 string dummy;
718 bool scan = false;
719 boost::shared_ptr<execplan::CalpontSystemCatalog> c =
720 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
721 execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
722 BucketReuseControlEntry* entry = BucketReuseManager::instance()->userRegister(
723 tcn, dummy, a->version, a->buckets, scan);
724 CPPUNIT_ASSERT(scan == true);
725
726 ResourceManager rm;
727 dl = new BucketDataList(a->buckets, 1, a->elements, rm);
728 dl->setElementMode(1);
729 dl->reuseControl(entry, !scan);
730
731 ThreadArg arg1 = *a;
732 arg1.id *= 10;
733 arg1.dl = dl;
734
735 pthread_t t1;
736 pthread_create(&t1, NULL, insertThread, &arg1);
737
738 ThreadArg arg2 = arg1;
739 arg2.id += 1;
740 pthread_t t2;
741 pthread_create(&t2, NULL, readThread, &arg2);
742
743 pthread_join(t1, NULL);
744 pthread_join(t2, NULL);
745
746 delete dl;
747
748 cout << "thread " << a->id << " finished at " << atTime() << endl;
749
750 return NULL;
751 }
752
753
reuseThread(void * arg)754 void* BucketReUseDriver::reuseThread(void* arg)
755 {
756 ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
757 BucketDataList* dl = a->dl;
758 CPPUNIT_ASSERT(dl == NULL);
759
760 if (a->cond != NULL)
761 {
762 pthread_mutex_lock(a->mutex);
763
764 while (*(a->flag) != true)
765 pthread_cond_wait(a->cond, a->mutex);
766
767 pthread_mutex_unlock(a->mutex);
768 }
769
770 cout << "thread " << a->id << " start at " << atTime() << endl;
771
772 string dummy;
773 bool scan = true;
774 boost::shared_ptr<execplan::CalpontSystemCatalog> c =
775 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
776 execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
777 BucketReuseControlEntry* entry = BucketReuseManager::instance()->userRegister(
778 tcn, dummy, a->version, a->buckets, scan);
779 CPPUNIT_ASSERT(scan == false);
780
781 ResourceManager rm;
782 dl = new BucketDataList(a->buckets, 1, a->elements, rm);
783 dl->setElementMode(1);
784 dl->reuseControl(entry, !scan);
785
786 ThreadArg arg1 = *a;
787 arg1.id *= 10;
788 arg1.dl = dl;
789
790 pthread_t t1;
791 pthread_create(&t1, NULL, readThread, &arg1);
792
793 if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
794 {
795 boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
796 dl->reuseControl()->stateChange().wait(lock);
797 }
798 else
799 {
800 CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
801 (entry->fileStatus() == BucketReuseControlEntry::ready_c));
802 }
803
804 // the bucket files are ready
805 dl->restoreBucketInformation();
806 dl->endOfInput();
807
808 pthread_join(t1, NULL);
809
810 delete dl;
811
812 cout << "thread " << a->id << " finished at " << atTime() << endl;
813
814 return NULL;
815 }
816
817
raceThread(void * arg)818 void* BucketReUseDriver::raceThread(void* arg)
819 {
820 ThreadArg* a = reinterpret_cast<ThreadArg*>(arg);
821 BucketDataList* dl = a->dl;
822 CPPUNIT_ASSERT(dl == NULL);
823
824 if (a->cond != NULL)
825 {
826 pthread_mutex_lock(a->mutex);
827
828 while (*(a->flag) != true)
829 pthread_cond_wait(a->cond, a->mutex);
830
831 pthread_mutex_unlock(a->mutex);
832 }
833
834 cout << "thread " << a->id << " start at " << atTime() << endl;
835
836 string dummy;
837 bool scan = true;
838 ResourceManager rm;
839 BucketReuseControlEntry* entry = NULL;
840 {
841 boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
842 boost::shared_ptr<execplan::CalpontSystemCatalog> c =
843 execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(0x80000000);
844 execplan::CalpontSystemCatalog::TableColName tcn = c->colName(a->oid);
845 entry = BucketReuseManager::instance()->userRegister(
846 tcn, dummy, a->version, a->buckets, scan);
847
848 dl = new BucketDataList(a->buckets, 1, a->elements, rm);
849 dl->setElementMode(1);
850 dl->reuseControl(entry, !scan);
851 }
852
853 if (scan == true)
854 {
855 CPPUNIT_ASSERT(entry->fileStatus() == BucketReuseControlEntry::progress_c);
856
857 ThreadArg arg1 = *a;
858 arg1.id *= 10;
859 arg1.dl = dl;
860
861 pthread_t t1;
862 pthread_create(&t1, NULL, insertThread, &arg1);
863
864 ThreadArg arg2 = arg1;
865 arg2.id += 1;
866 pthread_t t2;
867 pthread_create(&t2, NULL, readThread, &arg2);
868
869 pthread_join(t1, NULL);
870 pthread_join(t2, NULL);
871 }
872 else
873 {
874 ThreadArg arg1 = *a;
875 arg1.id *= 10;
876 arg1.dl = dl;
877
878 pthread_t t1;
879 pthread_create(&t1, NULL, readThread, &arg1);
880
881 if (entry->fileStatus() == BucketReuseControlEntry::progress_c)
882 {
883 boost::mutex::scoped_lock lock(BucketReuseManager::instance()->getMutex());
884 dl->reuseControl()->stateChange().wait(lock);
885 }
886 else
887 {
888 CPPUNIT_ASSERT((entry->fileStatus() == BucketReuseControlEntry::using_c) ||
889 (entry->fileStatus() == BucketReuseControlEntry::ready_c));
890 }
891
892 // the bucket files are ready
893 dl->restoreBucketInformation();
894 dl->endOfInput();
895
896 pthread_join(t1, NULL);
897 }
898
899 delete dl;
900
901 cout << "thread " << a->id << " finished at " << atTime() << endl;
902
903 return NULL;
904 }
905
906
atTime()907 timespec atTime()
908 {
909 timespec ts, ts1, ts2;
910 ts1 = BucketReUseDriver::ts;
911 clock_gettime(CLOCK_REALTIME, &ts2);
912
913 if (ts2.tv_nsec < ts1.tv_nsec)
914 {
915 ts.tv_sec = ts2.tv_sec - ts1.tv_sec - 1;
916 ts.tv_nsec = ts2.tv_nsec + 1000000000 - ts1.tv_nsec;
917 }
918 else
919 {
920 ts.tv_sec = ts2.tv_sec - ts1.tv_sec;
921 ts.tv_nsec = ts2.tv_nsec - ts1.tv_nsec;
922 }
923
924 return ts;
925 }
926
operator <<(ostream & os,const struct timespec & t)927 ostream& operator<<(ostream& os, const struct timespec& t)
928 {
929 os << t.tv_sec << "." << setw(9) << setfill('0') << t.tv_nsec << "s";
930 return os;
931 }
932
933