1 /* Copyright (C) 2014 InfiniDB, Inc.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 /*****************************************************************************
19 * $Id: tdriver-jobstep.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20 *
21 ****************************************************************************/
22
23 #include <iostream>
24
25 #include <cppunit/extensions/HelperMacros.h>
26 #include <cppunit/extensions/TestFactoryRegistry.h>
27 #include <cppunit/ui/text/TestRunner.h>
28
29 #include "joblist.h"
30 #include "jobstep.h"
31 #include "distributedenginecomm.h"
32 #include "calpontsystemcatalog.h"
33 #include "zdl.h"
34
35 using namespace std;
36 using namespace joblist;
37 using namespace execplan;
38
39 uint64_t count = 1000000;
40 const uint64_t ZDL_VEC_SIZE = 4096;
41
42 class JobStepDriver : public CppUnit::TestFixture
43 {
44
45 CPPUNIT_TEST_SUITE(JobStepDriver);
46
47 /* These rely on Patrick's DB */
48 // CPPUNIT_TEST(pColScan_1);
49 // CPPUNIT_TEST(pColStep_1);
50 // CPPUNIT_TEST(pColStep_2);
51 // CPPUNIT_TEST(pColStep_as_ProjectionStep_1);
52
53 // CPPUNIT_TEST(pnljoin_1); // value list, no rid list, no reduction step
54 // CPPUNIT_TEST(pnljoin_2); // value list, w/rid list, no reduction step
55 // CPPUNIT_TEST(pnljoin_3); // value list + rid list + reduction step
56
57 CPPUNIT_TEST(reduceStep_1); // ElementType
58 CPPUNIT_TEST(reduceStep_2); // StringElementType
59 //CPPUNIT_TEST(reduceStep_3); // DoubleElementType
60 //CPPUNIT_TEST(reduceStep_4); // reduceStep_1 with BucketDLs as inputs
61
62 CPPUNIT_TEST(unionStep_1);
63 //CPPUNIT_TEST(unionStep_2);
64 //CPPUNIT_TEST(unionStep_3);
65
66 CPPUNIT_TEST_SUITE_END();
67
68 ResourceManager fRm;
69
70 private:
71 public:
72
pColScan_1()73 void pColScan_1()
74 {
75 DistributedEngineComm* dec;
76 boost::shared_ptr<CalpontSystemCatalog> cat;
77
78 dec = DistributedEngineComm::instance(fRm);
79 // dec = DistributedEngineComm::instance("./config-dec.xml");
80 cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
81
82 JobStepAssociation inJs;
83 JobStepAssociation outJs;
84
85 AnyDataListSPtr spdl1(new AnyDataList());
86 BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
87 spdl1->bandedDL(dl1);
88 outJs.outAdd(spdl1);
89
90 pColScanStep step0(inJs, outJs, dec, cat, 1003, 1000, 12345, 999, 7, 0, 0, fRm);
91 int8_t cop;
92 int64_t filterValue;
93 cop = COMPARE_GE;
94 filterValue = 3010;
95 step0.addFilter(cop, filterValue);
96 cop = COMPARE_LE;
97 filterValue = 3318;
98 step0.addFilter(cop, filterValue);
99 step0.setBOP(BOP_AND);
100 inJs = outJs;
101
102 step0.run();
103
104 step0.join();
105
106 DeliveryStep step1(inJs, outJs, make_table("CALPONTSYS", "SYSTABLE"), cat, 10000, 0, 0, 0);
107 inJs = outJs;
108
109 step1.run();
110
111 step1.join();
112 }
113
pColStep_1()114 void pColStep_1()
115 {
116 DistributedEngineComm* dec;
117 boost::shared_ptr<CalpontSystemCatalog> cat;
118 ElementType e;
119 int i, it;
120 bool more;
121
122 dec = DistributedEngineComm::instance(fRm);
123 // dec = DistributedEngineComm::instance("./config-dec.xml");
124 cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
125
126 JobStepAssociation inJs;
127 JobStepAssociation outJs;
128
129 AnyDataListSPtr spdl1(new AnyDataList());
130 BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
131 spdl1->bandedDL(dl1);
132 inJs.outAdd(spdl1);
133
134 AnyDataListSPtr spdl2(new AnyDataList());
135 BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
136 spdl2->bandedDL(dl2);
137 outJs.outAdd(spdl2);
138
139 for (i = 10; i < 15; i++)
140 {
141 e.first = i;
142 dl1->insert(e);
143 }
144
145 dl1->endOfInput();
146
147 pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12346, 11, 11, 1, 0, fRm);
148
149 p.setRidList(dl1); // JSA should do this
150 p.run();
151 p.join();
152
153 it = dl2->getIterator();
154
155 for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
156 #ifdef DEBUG
157 cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
158
159 #else
160 ; // walk the list silently
161 #endif
162 CPPUNIT_ASSERT(i == 5);
163 }
164
165 /* make sure it issues multiple primitive msgs correctly */
pColStep_2()166 void pColStep_2()
167 {
168 DistributedEngineComm* dec;
169 boost::shared_ptr<CalpontSystemCatalog> cat;
170 ElementType e;
171 int i, it;
172 bool more;
173
174 dec = DistributedEngineComm::instance(fRm);
175 // dec = DistributedEngineComm::instance("./config-dec.xml");
176 cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
177
178 JobStepAssociation inJs;
179 JobStepAssociation outJs;
180
181 AnyDataListSPtr spdl1(new AnyDataList());
182 BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
183 spdl1->bandedDL(dl1);
184 inJs.outAdd(spdl1);
185
186 AnyDataListSPtr spdl2(new AnyDataList());
187 BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
188 spdl2->bandedDL(dl2);
189 outJs.outAdd(spdl2);
190
191 for (i = 10; i < 10000; i++)
192 {
193 if (i % 2 == 0) // make it sparse
194 {
195 e.first = i;
196 dl1->insert(e);
197 }
198 }
199
200 dl1->endOfInput();
201
202 pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12347, 11, 11, 1, 0, fRm);
203
204 p.setRidList(dl1); // JSA should do this
205 p.run();
206 p.join();
207
208 it = dl2->getIterator();
209
210 for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
211 #ifdef DEBUG
212 cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
213
214 #else
215 ; // walk the list silently
216 #endif
217 CPPUNIT_ASSERT(i == 6);
218 }
219
pColStep_as_ProjectionStep_1()220 void pColStep_as_ProjectionStep_1()
221 {
222 DistributedEngineComm* dec;
223 boost::shared_ptr<CalpontSystemCatalog> cat;
224 ElementType e;
225 int i, it;
226 bool more;
227
228 dec = DistributedEngineComm::instance(fRm);
229 // dec = DistributedEngineComm::instance("./config-dec.xml");
230 cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
231
232 JobStepAssociation inJs;
233 JobStepAssociation outJs;
234
235 AnyDataListSPtr spdl1(new AnyDataList());
236 BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
237 spdl1->bandedDL(dl1);
238 inJs.outAdd(spdl1);
239
240 AnyDataListSPtr spdl2(new AnyDataList());
241 BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
242 spdl2->bandedDL(dl2);
243 outJs.outAdd(spdl2);
244
245 for (i = 1; i <= 21; i++)
246 {
247 e.first = i;
248 dl1->insert(e);
249 }
250
251 dl1->endOfInput();
252
253 // flushInterval = 8
254 pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12348, 11, 11, 1, 2, fRm);
255
256 p.setRidList(dl1); // JSA should do this
257 p.run();
258 p.join();
259
260 it = dl2->getIterator();
261
262 for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
263 #ifdef DEBUG
264 cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
265
266 #else
267 ; // walk the list silently
268 #endif
269 // CPPUNIT_ASSERT(i == 5);
270 }
271
reduceStep_1()272 void reduceStep_1()
273 {
274 JobStepAssociation in, out;
275 AnyDataList* inputADL, *driverADL, *outputADL;
276 AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
277 WSDL<ElementType>* outDL;
278 ElementType e;
279 unsigned i;
280 int it;
281 bool more;
282
283
284
285 ZDL<ElementType>* inDL = new ZDL<ElementType>(1, fRm);
286 ZDL<ElementType>* dDL = new ZDL<ElementType>(1, fRm);
287 outDL = new WSDL<ElementType>(1, 100000, fRm);
288 inputADL = new AnyDataList();
289 driverADL = new AnyDataList();
290 outputADL = new AnyDataList();
291 inputADL->zonedDL(inDL);
292 driverADL->zonedDL(dDL);
293 outputADL->workingSetDL(outDL);
294 inputSPtr.reset(inputADL);
295 driverSPtr.reset(driverADL);
296 outputSPtr.reset(outputADL);
297
298 in.outAdd(inputSPtr);
299 in.outAdd(driverSPtr);
300 out.outAdd(outputSPtr);
301
302 // cout << "making input DataList" << endl;
303 vector <ElementType> vec1;
304 vector <ElementType> vec2;
305
306 for (i = 0; i < ::count; i++)
307 {
308 e.first = i;
309 e.second = i + 1;
310 vec1.push_back(e);
311
312 if (vec1.size() >= ZDL_VEC_SIZE)
313 {
314 inDL->insert(vec1);
315 vec1.clear();
316 }
317
318 // inDL->insert(e);
319 }
320
321 if (!vec1.empty())
322 inDL->insert(vec1);
323
324 inDL->endOfInput();
325
326 // cout << "making driver DataList" << endl;
327
328 for (i = 0; i < ::count; i += 2)
329 {
330 e.first = i;
331 e.second = i + 1;
332 vec2.push_back(e);
333
334 if (vec2.size() >= ZDL_VEC_SIZE)
335 {
336 dDL->insert(vec2);
337 vec2.clear();
338 }
339
340 // dDL->insert(e);
341 }
342
343 if (!vec2.empty())
344 dDL->insert(vec2);
345
346 dDL->endOfInput();
347
348 // cout << "reducing" << endl;
349
350 ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
351 rs.run();
352 rs.join();
353
354 it = outDL->getIterator();
355 more = outDL->next(it, &e);
356 i = 0;
357
358 while (more)
359 {
360 // cout << i << ": first: " << e.first << " second: " << e.second << endl;
361 CPPUNIT_ASSERT(e.first < ::count);
362 CPPUNIT_ASSERT(e.first % 2 == 0);
363 more = outDL->next(it, &e);
364 i++;
365 }
366
367 }
368
reduceStep_2()369 void reduceStep_2()
370 {
371 JobStepAssociation in, out;
372 AnyDataList* inputADL, *driverADL, *outputADL;
373 AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
374 WSDL<StringElementType>* outDL;
375 StringElementType e;
376 unsigned i;
377 int it;
378 bool more;
379
380 // inDL = new WSDL<StringElementType>(1, 10000, fRm);
381 // dDL = new WSDL<StringElementType>(1, 10000, fRm);
382 ZDL<StringElementType>* inDL = new ZDL<StringElementType>(1, fRm);
383 ZDL<StringElementType>* dDL = new ZDL<StringElementType>(1, fRm);
384
385 outDL = new WSDL<StringElementType>(1, 10000, fRm);
386 inputADL = new AnyDataList();
387 driverADL = new AnyDataList();
388 outputADL = new AnyDataList();
389 inputADL->stringZonedDL(inDL);
390 driverADL->stringZonedDL(dDL);
391 outputADL->strDataList(outDL);
392 inputSPtr.reset(inputADL);
393 driverSPtr.reset(driverADL);
394 outputSPtr.reset(outputADL);
395
396 in.outAdd(inputSPtr);
397 in.outAdd(driverSPtr);
398 out.outAdd(outputSPtr);
399 vector <StringElementType> vec1;
400 vector <StringElementType> vec2;
401
402 for (i = 0; i < ::count; i++)
403 {
404 e.first = i;
405 e.second = string("blahblahblahblahblah");
406 vec1.push_back(e);
407
408 if (vec1.size() >= ZDL_VEC_SIZE)
409 {
410 inDL->insert(vec1);
411 vec1.clear();
412 }
413
414 if (0 == i % 2)
415 {
416 e.second = string("blahblahblah");
417 vec2.push_back(e);
418
419 if (vec2.size() >= ZDL_VEC_SIZE)
420 {
421 dDL->insert(vec2);
422 vec2.clear();
423 }
424 }
425
426 // inDL->insert(e);
427 }
428
429 if (!vec1.empty())
430 inDL->insert(vec1);
431
432 inDL->endOfInput();
433
434 /* for (i = 0; i < ::count; i+=2) {
435 e.first = i;
436 e.second = string("blahblahblah");
437 dDL->insert(e);
438 }*/
439 if (!vec2.empty())
440 dDL->insert(vec2);
441
442 dDL->endOfInput();
443
444 ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
445 rs.run();
446 rs.join();
447
448 it = outDL->getIterator();
449 more = outDL->next(it, &e);
450 i = 0;
451
452 while (more)
453 {
454 // cout << i << ": first: " << e.first << " second: " << e.second << endl;
455 CPPUNIT_ASSERT(e.first < ::count);
456 CPPUNIT_ASSERT(e.first % 2 == 0);
457 more = outDL->next(it, &e);
458 i++;
459 }
460 }
461
reduceStep_3()462 void reduceStep_3()
463 {
464 JobStepAssociation in, out;
465 AnyDataList* inputADL, *driverADL, *outputADL;
466 AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
467 WSDL<DoubleElementType>* inDL, *dDL, *outDL;
468 DoubleElementType e;
469 unsigned i;
470 int it;
471 bool more;
472
473 inDL = new WSDL<DoubleElementType>(1, 100000, fRm);
474 dDL = new WSDL<DoubleElementType>(1, 100000, fRm);
475 outDL = new WSDL<DoubleElementType>(1, 100000, fRm);
476 inputADL = new AnyDataList();
477 driverADL = new AnyDataList();
478 outputADL = new AnyDataList();
479 inputADL->doubleDL(inDL);
480 driverADL->doubleDL(dDL);
481 outputADL->doubleDL(outDL);
482 inputSPtr.reset(inputADL);
483 driverSPtr.reset(driverADL);
484 outputSPtr.reset(outputADL);
485
486 in.outAdd(inputSPtr);
487 in.outAdd(driverSPtr);
488 out.outAdd(outputSPtr);
489
490 for (i = 0; i < ::count; i++)
491 {
492 e.first = i;
493 e.second = ((double) i) + 0.1;
494 inDL->insert(e);
495 }
496
497 inDL->endOfInput();
498
499 for (i = 0; i < ::count; i += 2)
500 {
501 e.first = i;
502 e.second = ((double) i) + 0.1;
503 dDL->insert(e);
504 }
505
506 dDL->endOfInput();
507
508 ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
509 rs.run();
510 rs.join();
511
512 it = outDL->getIterator();
513 more = outDL->next(it, &e);
514 i = 0;
515
516 while (more)
517 {
518 // cout << i << ": first: " << e.first << " second: " << e.second << endl;
519 CPPUNIT_ASSERT(e.first < ::count);
520 CPPUNIT_ASSERT(e.first % 2 == 0);
521 more = outDL->next(it, &e);
522 i++;
523 }
524 }
525
reduceStep_4()526 void reduceStep_4()
527 {
528 JobStepAssociation in, out;
529 AnyDataList* inputADL, *driverADL, *outputADL;
530 AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
531 BucketDL<ElementType>* inDL, *dDL;
532 WSDL<ElementType>* outDL;
533 ElementType e;
534 unsigned i;
535 int it;
536 bool more;
537
538 inDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
539 dDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
540 outDL = new WSDL<ElementType>(1, 100000, fRm);
541 inputADL = new AnyDataList();
542 driverADL = new AnyDataList();
543 outputADL = new AnyDataList();
544 inputADL->bucketDL(inDL);
545 driverADL->bucketDL(dDL);
546 outputADL->workingSetDL(outDL);
547 inputSPtr.reset(inputADL);
548 driverSPtr.reset(driverADL);
549 outputSPtr.reset(outputADL);
550
551 in.outAdd(inputSPtr);
552 in.outAdd(driverSPtr);
553 out.outAdd(outputSPtr);
554
555 // cout << "making input DataList" << endl;
556
557 for (i = 0; i < ::count; i++)
558 {
559 e.first = i;
560 e.second = i + 1;
561 inDL->insert(e);
562 }
563
564 inDL->endOfInput();
565
566 // cout << "making driver DataList" << endl;
567
568 for (i = 0; i < ::count; i += 2)
569 {
570 e.first = i;
571 e.second = i + 1;
572 dDL->insert(e);
573 }
574
575 dDL->endOfInput();
576
577 // cout << "reducing" << endl;
578
579 ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
580 rs.run();
581 rs.join();
582
583 it = outDL->getIterator();
584 more = outDL->next(it, &e);
585 i = 0;
586
587 while (more)
588 {
589 // cout << i << ": first: " << e.first << " second: " << e.second << endl;
590 CPPUNIT_ASSERT(e.first < ::count);
591 CPPUNIT_ASSERT(e.first % 2 == 0);
592 more = outDL->next(it, &e);
593 i++;
594 }
595 }
596
pnljoin_1()597 void pnljoin_1()
598 {
599 DistributedEngineComm* dec;
600 boost::shared_ptr<CalpontSystemCatalog> cat;
601 ElementType e;
602 int it;
603 bool more;
604
605 dec = DistributedEngineComm::instance(fRm);
606 // dec = DistributedEngineComm::instance("./config-dec.xml");
607 cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
608
609 JobStepAssociation inJs;
610 JobStepAssociation outJs;
611
612 AnyDataListSPtr spdl1(new AnyDataList());
613 WSDL<ElementType>* dl1 = new WSDL<ElementType>(1, 100, fRm);
614 spdl1->workingSetDL(dl1);
615 inJs.outAdd(spdl1);
616
617 AnyDataListSPtr spdl2(new AnyDataList());
618 WSDL<ElementType>* dl2 = new WSDL<ElementType>(1, 100, fRm);
619 spdl2->workingSetDL(dl2);
620 outJs.outAdd(spdl2);
621
622 /* These values are unique to Pat's DB files unfortunately. */
623 /* Fill in the value list */
624 dl1->insert(ElementType(1, 3179)); // row 10 in the target
625 dl1->insert(ElementType(2, 3191)); // row 12
626 dl1->insert(ElementType(3, 3207)); // row 14
627 dl1->insert(ElementType(4, 3318)); // row 20 OOO
628 dl1->insert(ElementType(5, 3242)); // row 16
629 dl1->insert(ElementType(6, 3289)); // row 18
630 dl1->insert(ElementType(7, 3191)); // duplicate value of row 14
631
632 dl1->endOfInput();
633 dl1->OID(1003);
634
635 PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
636
637 joiner.run();
638 joiner.join();
639
640 it = dl2->getIterator();
641 more = dl2->next(it, &e);
642
643 while (more)
644 {
645 #ifdef DEBUG
646 cout << "first: " << e.first << " second: " << e.second << endl;
647 #endif
648 more = dl2->next(it, &e);
649 }
650 }
pnljoin_2()651 void pnljoin_2()
652 {
653 DistributedEngineComm* dec;
654 boost::shared_ptr<CalpontSystemCatalog> cat;
655 ElementType e;
656 int i, it;
657 bool more;
658
659 dec = DistributedEngineComm::instance(fRm);
660 // dec = DistributedEngineComm::instance("./config-dec.xml");
661 cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
662
663 JobStepAssociation inJs;
664 JobStepAssociation outJs;
665
666 AnyDataListSPtr spdl1(new AnyDataList());
667 WSDL<ElementType>* valueList = new WSDL<ElementType>(1, 100, fRm);
668 spdl1->workingSetDL(valueList);
669 inJs.outAdd(spdl1);
670
671 AnyDataListSPtr spdl2(new AnyDataList());
672 AnyDataListSPtr spdl3(new AnyDataList());
673 WSDL<ElementType>* colResults = new WSDL<ElementType>(1, 100, fRm);
674 WSDL<ElementType>* inputRidList = new WSDL<ElementType>(1, 100, fRm);
675 spdl2->workingSetDL(colResults);
676 spdl3->workingSetDL(inputRidList);
677 outJs.outAdd(spdl2);
678 inJs.outAdd(spdl3);
679
680
681 /* These values are unique to Pat's DB files unfortunately. */
682 /* Fill in the value list */
683 valueList->insert(ElementType(1, 3179)); // row 10 in the target
684 valueList->insert(ElementType(2, 3191)); // row 12
685 valueList->insert(ElementType(3, 3207)); // row 14
686 valueList->insert(ElementType(4, 3318)); // row 20 OOO
687 valueList->insert(ElementType(5, 3242)); // row 16
688 valueList->insert(ElementType(6, 3289)); // row 18
689 valueList->insert(ElementType(7, 3191)); // duplicate value of row 14
690
691 valueList->endOfInput();
692 valueList->OID(1003);
693
694 // supply a ridlist with row 16 missing; make sure it's missing in the result
695 for (i = 0; i < 25; i++)
696 if (i != 16)
697 inputRidList->insert(ElementType(i, i));
698
699 inputRidList->endOfInput();
700
701 PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
702
703 joiner.run();
704 joiner.join();
705
706 it = colResults->getIterator();
707 more = colResults->next(it, &e);
708
709 while (more)
710 {
711 #ifdef DEBUG
712 cout << "first: " << e.first << " second: " << e.second << endl;
713 #endif
714 more = colResults->next(it, &e);
715 }
716 }
717
pnljoin_3()718 void pnljoin_3()
719 {
720 DistributedEngineComm* dec;
721 boost::shared_ptr<CalpontSystemCatalog> cat;
722 ElementType e;
723 int i, it;
724 bool more;
725
726 dec = DistributedEngineComm::instance(fRm);
727 // dec = DistributedEngineComm::instance("./config-dec.xml");
728 cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
729
730 AnyDataListSPtr spdl1(new AnyDataList());
731 AnyDataListSPtr spdl3(new AnyDataList());
732 WSDL<ElementType>* valueList = new WSDL<ElementType>(1, 100, fRm);
733 WSDL<ElementType>* inputRidList = new WSDL<ElementType>(1, 100, fRm);
734 spdl1->workingSetDL(valueList);
735 spdl3->workingSetDL(inputRidList);
736
737 JobStepAssociation inJs;
738 JobStepAssociation outJs;
739
740 inJs.outAdd(spdl1);
741 inJs.outAdd(spdl3);
742
743 AnyDataListSPtr spdl2(new AnyDataList());
744 AnyDataListSPtr spdl4(new AnyDataList());
745 WSDL<ElementType>* colResults = new WSDL<ElementType>(1, 100, fRm);
746 WSDL<ElementType>* reducedRidList = new WSDL<ElementType>(1, 100, fRm);
747 spdl2->workingSetDL(colResults);
748 spdl4->workingSetDL(reducedRidList);
749 outJs.outAdd(spdl2);
750 outJs.outAdd(spdl4);
751
752 /* These values are unique to Pat's DB files unfortunately. */
753 /* Fill in the value list */
754 valueList->insert(ElementType(10, 3179)); // row 10 in the target
755 valueList->insert(ElementType(12, 3191)); // row 12
756 valueList->insert(ElementType(14541513, 3207)); // row 14 - on output should be in colResults, not in reducedridlist
757 valueList->insert(ElementType(20, 3318)); // row 20 OOO
758 valueList->insert(ElementType(16, 3242)); // row 16
759 valueList->insert(ElementType(18, 3289)); // row 18
760
761 // XXXPAT: Duplicates here can end up in the reducedRidList. Technically it's
762 // correct, but do we want that or not?
763 // valueList->insert(ElementType(12, 3191)); // duplicate value of row 12
764
765 valueList->endOfInput();
766 valueList->OID(1003);
767
768 // supply a ridlist with row 16 missing; make sure it's missing in the result
769 for (i = 0; i < 25; i++)
770 if (i != 16)
771 inputRidList->insert(ElementType(i, i));
772
773 inputRidList->endOfInput();
774
775 PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
776
777 joiner.run();
778 joiner.join();
779
780 it = colResults->getIterator();
781 more = colResults->next(it, &e);
782 #ifdef DEBUG
783 cout << "ColResults:" << endl;
784 #endif
785
786 while (more)
787 {
788 #ifdef DEBUG
789 cout << " first: " << e.first << " second: " << e.second << endl;
790 #endif
791 more = colResults->next(it, &e);
792 }
793
794 it = reducedRidList->getIterator();
795 more = reducedRidList->next(it, &e);
796 #ifdef DEBUG
797 cout << "Reduced Rid List:" << endl;
798 #endif
799
800 while (more)
801 {
802 #ifdef DEBUG
803 cout << " first: " << e.first << " second: " << e.second << endl;
804 #endif
805 more = reducedRidList->next(it, &e);
806 }
807
808 }
809
unionStep_1()810 void unionStep_1()
811 {
812 JobStepAssociation in, out;
813 AnyDataList* inputADL, *driverADL, *outputADL;
814 AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
815 WSDL<ElementType>* outDL;
816 set<ElementType> s;
817 set<ElementType>::iterator sIt;
818 ElementType e;
819 unsigned i;
820 int it;
821 bool more;
822
823 ZDL<ElementType>* inDL = new ZDL<ElementType>(1, fRm);
824 ZDL<ElementType>* dDL = new ZDL<ElementType>(1, fRm);
825
826 outDL = new WSDL<ElementType>(1, 100000, fRm);
827 inputADL = new AnyDataList();
828 driverADL = new AnyDataList();
829 outputADL = new AnyDataList();
830 inputADL->zonedDL(inDL);
831 driverADL->zonedDL(dDL);
832
833 outputADL->workingSetDL(outDL);
834 inputSPtr.reset(inputADL);
835 driverSPtr.reset(driverADL);
836 outputSPtr.reset(outputADL);
837
838 in.outAdd(inputSPtr);
839 in.outAdd(driverSPtr);
840 out.outAdd(outputSPtr);
841
842 // cout << "making input DataList" << endl;
843 vector <ElementType> vec1;
844 vector <ElementType> vec2;
845
846 for (i = 0; i < ::count; i++)
847 {
848 e.first = i;
849 e.second = i + 1;
850 vec1.push_back(e);
851
852 if (vec1.size() >= ZDL_VEC_SIZE)
853 {
854 inDL->insert(vec1);
855 vec1.clear();
856 }
857
858 // inDL->insert(e);
859 }
860
861 if (!vec1.empty())
862 inDL->insert(vec1);
863
864 inDL->endOfInput();
865
866 // cout << "making driver DataList" << endl;
867
868 for (i = 0; i < ::count; i += 2)
869 {
870 e.first = i;
871 e.second = i + 1;
872 vec2.push_back(e);
873
874 if (vec2.size() >= ZDL_VEC_SIZE)
875 {
876 dDL->insert(vec2);
877 vec2.clear();
878 }
879
880 // dDL->insert(e);
881 }
882
883 if (!vec2.empty())
884 dDL->insert(vec2);
885
886 dDL->endOfInput();
887
888 // cout << "unionizing" << endl;
889
890 UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
891 rs.run();
892 rs.join();
893
894 CPPUNIT_ASSERT(outDL->totalSize() == inDL->totalSize());
895
896 it = outDL->getIterator();
897
898 for (i = 0, more = outDL->next(it, &e) ; more; more = outDL->next(it, &e), i++)
899 s.insert(e);
900
901 CPPUNIT_ASSERT(outDL->totalSize() == i);
902 CPPUNIT_ASSERT(i == ::count);
903
904 CPPUNIT_ASSERT(s.size() == outDL->totalSize()); // verifies no duplicates in outDL
905
906 for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
907 CPPUNIT_ASSERT(sIt->first == i);
908
909 CPPUNIT_ASSERT(i == ::count); // verifies they all exist.
910 }
911
unionStep_2()912 void unionStep_2()
913 {
914 JobStepAssociation in, out;
915 AnyDataList* inputADL, *driverADL, *outputADL;
916 AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
917 WSDL<ElementType>* inDL, *dDL, *outDL;
918 set<ElementType> s;
919 set<ElementType>::iterator sIt;
920 ElementType e;
921 unsigned i;
922 int it;
923 bool more;
924
925 inDL = new WSDL<ElementType>(1, 100000, fRm);
926 dDL = new WSDL<ElementType>(1, 100000, fRm);
927 outDL = new WSDL<ElementType>(1, 100000, fRm);
928 inputADL = new AnyDataList();
929 driverADL = new AnyDataList();
930 outputADL = new AnyDataList();
931 inputADL->workingSetDL(inDL);
932 driverADL->workingSetDL(dDL);
933 outputADL->workingSetDL(outDL);
934 inputSPtr.reset(inputADL);
935 driverSPtr.reset(driverADL);
936 outputSPtr.reset(outputADL);
937
938 in.outAdd(inputSPtr);
939 in.outAdd(driverSPtr);
940 out.outAdd(outputSPtr);
941
942 // cout << "making input DataList" << endl;
943
944 for (i = 0; i < ::count; i += 2)
945 {
946 e.first = i;
947 e.second = i + 1;
948 inDL->insert(e);
949 }
950
951 inDL->endOfInput();
952
953 // cout << "making driver DataList" << endl;
954
955 for (i = 0; i < ::count; i++)
956 {
957 e.first = i;
958 e.second = i + 1;
959 dDL->insert(e);
960 }
961
962 dDL->endOfInput();
963
964 // cout << "unionizing" << endl;
965
966 UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
967 rs.run();
968 rs.join();
969
970 CPPUNIT_ASSERT(outDL->totalSize() == dDL->totalSize());
971
972 it = outDL->getIterator();
973
974 for (i = 0, more = outDL->next(it, &e) ; more; more = outDL->next(it, &e), i++)
975 s.insert(e);
976
977 CPPUNIT_ASSERT(outDL->totalSize() == i);
978 CPPUNIT_ASSERT(i == ::count);
979
980 CPPUNIT_ASSERT(s.size() == outDL->totalSize()); // verifies no duplicates in outDL
981
982 for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
983 CPPUNIT_ASSERT(sIt->first == i);
984
985 CPPUNIT_ASSERT(i == ::count); // verifies they all exist.
986 }
987
unionStep_3()988 void unionStep_3()
989 {
990 JobStepAssociation in, out;
991 AnyDataList* inputADL, *driverADL, *outputADL;
992 AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
993 WSDL<ElementType>* outDL;
994 BucketDL<ElementType>* inDL, *dDL;
995 set<ElementType> s;
996 set<ElementType>::iterator sIt;
997 ElementType e;
998 unsigned i;
999 int it;
1000 bool more;
1001
1002 inDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
1003 dDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
1004 outDL = new WSDL<ElementType>(1, 100000, fRm);
1005 inputADL = new AnyDataList();
1006 driverADL = new AnyDataList();
1007 outputADL = new AnyDataList();
1008 inputADL->bucketDL(inDL);
1009 driverADL->bucketDL(dDL);
1010 outputADL->workingSetDL(outDL);
1011 inputSPtr.reset(inputADL);
1012 driverSPtr.reset(driverADL);
1013 outputSPtr.reset(outputADL);
1014
1015 in.outAdd(inputSPtr);
1016 in.outAdd(driverSPtr);
1017 out.outAdd(outputSPtr);
1018
1019 // cout << "making input DataList" << endl;
1020
1021 for (i = 0; i < ::count; i++)
1022 {
1023 e.first = i;
1024 e.second = i + 1;
1025 inDL->insert(e);
1026 }
1027
1028 inDL->endOfInput();
1029
1030 // cout << "making driver DataList" << endl;
1031
1032 for (; i < ::count * 2; i++)
1033 {
1034 e.first = i;
1035 e.second = i + 1;
1036 dDL->insert(e);
1037 }
1038
1039 dDL->endOfInput();
1040
1041 // cout << "unionizing" << endl;
1042
1043 UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
1044 rs.run();
1045 rs.join();
1046
1047 CPPUNIT_ASSERT(outDL->totalSize() == 2 * ::count);
1048
1049 it = outDL->getIterator();
1050
1051 for (i = 0, more = outDL->next(it, &e) ; more; more = outDL->next(it, &e), i++)
1052 s.insert(e);
1053
1054 CPPUNIT_ASSERT(outDL->totalSize() == i);
1055 CPPUNIT_ASSERT(i == 2 * ::count);
1056
1057 CPPUNIT_ASSERT(s.size() == outDL->totalSize()); // verifies no duplicates in outDL
1058
1059 for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
1060 CPPUNIT_ASSERT(sIt->first == i);
1061
1062 CPPUNIT_ASSERT(i == 2 * ::count); // verifies they all exist.
1063 }
1064
1065
1066
1067 };
1068
1069 CPPUNIT_TEST_SUITE_REGISTRATION(JobStepDriver);
1070
main(int argc,char ** argv)1071 int main( int argc, char** argv)
1072 {
1073 CppUnit::TextUi::TestRunner runner;
1074 CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
1075 runner.addTest( registry.makeTest() );
1076 bool wasSuccessful = runner.run( "", false );
1077 return (wasSuccessful ? 0 : 1);
1078 }
1079