1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /*****************************************************************************
19  * $Id: tdriver-jobstep.cpp 9210 2013-01-21 14:10:42Z rdempsey $
20  *
21  ****************************************************************************/
22 
23 #include <iostream>
24 
25 #include <cppunit/extensions/HelperMacros.h>
26 #include <cppunit/extensions/TestFactoryRegistry.h>
27 #include <cppunit/ui/text/TestRunner.h>
28 
29 #include "joblist.h"
30 #include "jobstep.h"
31 #include "distributedenginecomm.h"
32 #include "calpontsystemcatalog.h"
33 #include "zdl.h"
34 
35 using namespace std;
36 using namespace joblist;
37 using namespace execplan;
38 
39 uint64_t count = 1000000;
40 const uint64_t ZDL_VEC_SIZE = 4096;
41 
42 class JobStepDriver : public CppUnit::TestFixture
43 {
44 
45     CPPUNIT_TEST_SUITE(JobStepDriver);
46 
47     /* These rely on Patrick's DB */
48 // CPPUNIT_TEST(pColScan_1);
49 // CPPUNIT_TEST(pColStep_1);
50 // CPPUNIT_TEST(pColStep_2);
51 // CPPUNIT_TEST(pColStep_as_ProjectionStep_1);
52 
53 // CPPUNIT_TEST(pnljoin_1);	// value list, no rid list, no reduction step
54 // CPPUNIT_TEST(pnljoin_2);	// value list, w/rid list, no reduction step
55 // CPPUNIT_TEST(pnljoin_3);	// value list + rid list + reduction step
56 
57     CPPUNIT_TEST(reduceStep_1);	// ElementType
58     CPPUNIT_TEST(reduceStep_2);	// StringElementType
59 //CPPUNIT_TEST(reduceStep_3); // DoubleElementType
60 //CPPUNIT_TEST(reduceStep_4); // reduceStep_1 with BucketDLs as inputs
61 
62     CPPUNIT_TEST(unionStep_1);
63 //CPPUNIT_TEST(unionStep_2);
64 //CPPUNIT_TEST(unionStep_3);
65 
66     CPPUNIT_TEST_SUITE_END();
67 
68     ResourceManager fRm;
69 
70 private:
71 public:
72 
pColScan_1()73     void pColScan_1()
74     {
75         DistributedEngineComm* dec;
76         boost::shared_ptr<CalpontSystemCatalog> cat;
77 
78         dec = DistributedEngineComm::instance(fRm);
79         // 	dec = DistributedEngineComm::instance("./config-dec.xml");
80         cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
81 
82         JobStepAssociation inJs;
83         JobStepAssociation outJs;
84 
85         AnyDataListSPtr spdl1(new AnyDataList());
86         BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
87         spdl1->bandedDL(dl1);
88         outJs.outAdd(spdl1);
89 
90         pColScanStep step0(inJs, outJs, dec, cat, 1003, 1000, 12345, 999, 7, 0, 0, fRm);
91         int8_t cop;
92         int64_t filterValue;
93         cop = COMPARE_GE;
94         filterValue = 3010;
95         step0.addFilter(cop, filterValue);
96         cop = COMPARE_LE;
97         filterValue = 3318;
98         step0.addFilter(cop, filterValue);
99         step0.setBOP(BOP_AND);
100         inJs = outJs;
101 
102         step0.run();
103 
104         step0.join();
105 
106         DeliveryStep step1(inJs, outJs, make_table("CALPONTSYS", "SYSTABLE"), cat, 10000, 0, 0, 0);
107         inJs = outJs;
108 
109         step1.run();
110 
111         step1.join();
112     }
113 
pColStep_1()114     void pColStep_1()
115     {
116         DistributedEngineComm* dec;
117         boost::shared_ptr<CalpontSystemCatalog> cat;
118         ElementType e;
119         int i, it;
120         bool more;
121 
122         dec = DistributedEngineComm::instance(fRm);
123         // 	dec = DistributedEngineComm::instance("./config-dec.xml");
124         cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
125 
126         JobStepAssociation inJs;
127         JobStepAssociation outJs;
128 
129         AnyDataListSPtr spdl1(new AnyDataList());
130         BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
131         spdl1->bandedDL(dl1);
132         inJs.outAdd(spdl1);
133 
134         AnyDataListSPtr spdl2(new AnyDataList());
135         BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
136         spdl2->bandedDL(dl2);
137         outJs.outAdd(spdl2);
138 
139         for (i = 10; i < 15; i++)
140         {
141             e.first = i;
142             dl1->insert(e);
143         }
144 
145         dl1->endOfInput();
146 
147         pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12346, 11, 11, 1, 0, fRm);
148 
149         p.setRidList(dl1);    // JSA should do this
150         p.run();
151         p.join();
152 
153         it = dl2->getIterator();
154 
155         for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
156 #ifdef DEBUG
157             cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
158 
159 #else
160             ;  // walk the list silently
161 #endif
162         CPPUNIT_ASSERT(i == 5);
163     }
164 
165     /* make sure it issues multiple primitive msgs correctly */
pColStep_2()166     void pColStep_2()
167     {
168         DistributedEngineComm* dec;
169         boost::shared_ptr<CalpontSystemCatalog> cat;
170         ElementType e;
171         int i, it;
172         bool more;
173 
174         dec = DistributedEngineComm::instance(fRm);
175         // 	dec = DistributedEngineComm::instance("./config-dec.xml");
176         cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
177 
178         JobStepAssociation inJs;
179         JobStepAssociation outJs;
180 
181         AnyDataListSPtr spdl1(new AnyDataList());
182         BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
183         spdl1->bandedDL(dl1);
184         inJs.outAdd(spdl1);
185 
186         AnyDataListSPtr spdl2(new AnyDataList());
187         BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
188         spdl2->bandedDL(dl2);
189         outJs.outAdd(spdl2);
190 
191         for (i = 10; i < 10000; i++)
192         {
193             if (i % 2 == 0)    // make it sparse
194             {
195                 e.first = i;
196                 dl1->insert(e);
197             }
198         }
199 
200         dl1->endOfInput();
201 
202         pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12347, 11, 11, 1, 0, fRm);
203 
204         p.setRidList(dl1);    // JSA should do this
205         p.run();
206         p.join();
207 
208         it = dl2->getIterator();
209 
210         for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
211 #ifdef DEBUG
212             cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
213 
214 #else
215             ;  // walk the list silently
216 #endif
217         CPPUNIT_ASSERT(i == 6);
218     }
219 
pColStep_as_ProjectionStep_1()220     void pColStep_as_ProjectionStep_1()
221     {
222         DistributedEngineComm* dec;
223         boost::shared_ptr<CalpontSystemCatalog> cat;
224         ElementType e;
225         int i, it;
226         bool more;
227 
228         dec = DistributedEngineComm::instance(fRm);
229         // 	dec = DistributedEngineComm::instance("./config-dec.xml");
230         cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
231 
232         JobStepAssociation inJs;
233         JobStepAssociation outJs;
234 
235         AnyDataListSPtr spdl1(new AnyDataList());
236         BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
237         spdl1->bandedDL(dl1);
238         inJs.outAdd(spdl1);
239 
240         AnyDataListSPtr spdl2(new AnyDataList());
241         BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
242         spdl2->bandedDL(dl2);
243         outJs.outAdd(spdl2);
244 
245         for (i = 1; i <= 21; i++)
246         {
247             e.first = i;
248             dl1->insert(e);
249         }
250 
251         dl1->endOfInput();
252 
253         // flushInterval = 8
254         pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12348, 11, 11, 1, 2, fRm);
255 
256         p.setRidList(dl1);    // JSA should do this
257         p.run();
258         p.join();
259 
260         it = dl2->getIterator();
261 
262         for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
263 #ifdef DEBUG
264             cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
265 
266 #else
267             ;  // walk the list silently
268 #endif
269 // 		CPPUNIT_ASSERT(i == 5);
270     }
271 
reduceStep_1()272     void reduceStep_1()
273     {
274         JobStepAssociation in, out;
275         AnyDataList* inputADL, *driverADL, *outputADL;
276         AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
277         WSDL<ElementType>*  outDL;
278         ElementType e;
279         unsigned i;
280         int it;
281         bool more;
282 
283 
284 
285         ZDL<ElementType>* inDL = new ZDL<ElementType>(1, fRm);
286         ZDL<ElementType>* dDL = new ZDL<ElementType>(1, fRm);
287         outDL = new WSDL<ElementType>(1, 100000, fRm);
288         inputADL = new AnyDataList();
289         driverADL = new AnyDataList();
290         outputADL = new AnyDataList();
291         inputADL->zonedDL(inDL);
292         driverADL->zonedDL(dDL);
293         outputADL->workingSetDL(outDL);
294         inputSPtr.reset(inputADL);
295         driverSPtr.reset(driverADL);
296         outputSPtr.reset(outputADL);
297 
298         in.outAdd(inputSPtr);
299         in.outAdd(driverSPtr);
300         out.outAdd(outputSPtr);
301 
302 // 		cout << "making input DataList" << endl;
303         vector <ElementType> vec1;
304         vector <ElementType> vec2;
305 
306         for (i = 0; i < ::count; i++)
307         {
308             e.first = i;
309             e.second = i + 1;
310             vec1.push_back(e);
311 
312             if (vec1.size() >= ZDL_VEC_SIZE)
313             {
314                 inDL->insert(vec1);
315                 vec1.clear();
316             }
317 
318 // 			inDL->insert(e);
319         }
320 
321         if (!vec1.empty())
322             inDL->insert(vec1);
323 
324         inDL->endOfInput();
325 
326 // 		cout << "making driver DataList" << endl;
327 
328         for (i = 0; i < ::count; i += 2)
329         {
330             e.first = i;
331             e.second = i + 1;
332             vec2.push_back(e);
333 
334             if (vec2.size() >= ZDL_VEC_SIZE)
335             {
336                 dDL->insert(vec2);
337                 vec2.clear();
338             }
339 
340 // 			dDL->insert(e);
341         }
342 
343         if (!vec2.empty())
344             dDL->insert(vec2);
345 
346         dDL->endOfInput();
347 
348 // 		cout << "reducing" << endl;
349 
350         ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
351         rs.run();
352         rs.join();
353 
354         it = outDL->getIterator();
355         more = outDL->next(it, &e);
356         i = 0;
357 
358         while (more)
359         {
360 //  			cout << i << ": first: " << e.first << " second: " << e.second << endl;
361             CPPUNIT_ASSERT(e.first < ::count);
362             CPPUNIT_ASSERT(e.first % 2 == 0);
363             more = outDL->next(it, &e);
364             i++;
365         }
366 
367     }
368 
reduceStep_2()369     void reduceStep_2()
370     {
371         JobStepAssociation in, out;
372         AnyDataList* inputADL, *driverADL, *outputADL;
373         AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
374         WSDL<StringElementType>* outDL;
375         StringElementType e;
376         unsigned i;
377         int it;
378         bool more;
379 
380 // 		inDL = new WSDL<StringElementType>(1, 10000, fRm);
381 // 		dDL = new WSDL<StringElementType>(1, 10000, fRm);
382         ZDL<StringElementType>* inDL = new ZDL<StringElementType>(1, fRm);
383         ZDL<StringElementType>* dDL = new ZDL<StringElementType>(1, fRm);
384 
385         outDL = new WSDL<StringElementType>(1, 10000, fRm);
386         inputADL = new AnyDataList();
387         driverADL = new AnyDataList();
388         outputADL = new AnyDataList();
389         inputADL->stringZonedDL(inDL);
390         driverADL->stringZonedDL(dDL);
391         outputADL->strDataList(outDL);
392         inputSPtr.reset(inputADL);
393         driverSPtr.reset(driverADL);
394         outputSPtr.reset(outputADL);
395 
396         in.outAdd(inputSPtr);
397         in.outAdd(driverSPtr);
398         out.outAdd(outputSPtr);
399         vector <StringElementType> vec1;
400         vector <StringElementType> vec2;
401 
402         for (i = 0; i < ::count; i++)
403         {
404             e.first = i;
405             e.second = string("blahblahblahblahblah");
406             vec1.push_back(e);
407 
408             if (vec1.size() >= ZDL_VEC_SIZE)
409             {
410                 inDL->insert(vec1);
411                 vec1.clear();
412             }
413 
414             if (0 == i % 2)
415             {
416                 e.second = string("blahblahblah");
417                 vec2.push_back(e);
418 
419                 if (vec2.size() >= ZDL_VEC_SIZE)
420                 {
421                     dDL->insert(vec2);
422                     vec2.clear();
423                 }
424             }
425 
426 // 			inDL->insert(e);
427         }
428 
429         if (!vec1.empty())
430             inDL->insert(vec1);
431 
432         inDL->endOfInput();
433 
434         /*		for (i = 0; i < ::count; i+=2) {
435         			e.first = i;
436         			e.second = string("blahblahblah");
437         			dDL->insert(e);
438         		}*/
439         if (!vec2.empty())
440             dDL->insert(vec2);
441 
442         dDL->endOfInput();
443 
444         ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
445         rs.run();
446         rs.join();
447 
448         it = outDL->getIterator();
449         more = outDL->next(it, &e);
450         i = 0;
451 
452         while (more)
453         {
454 //    			cout << i << ": first: " << e.first << " second: " << e.second << endl;
455             CPPUNIT_ASSERT(e.first < ::count);
456             CPPUNIT_ASSERT(e.first % 2 == 0);
457             more = outDL->next(it, &e);
458             i++;
459         }
460     }
461 
reduceStep_3()462     void reduceStep_3()
463     {
464         JobStepAssociation in, out;
465         AnyDataList* inputADL, *driverADL, *outputADL;
466         AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
467         WSDL<DoubleElementType>* inDL, *dDL, *outDL;
468         DoubleElementType e;
469         unsigned i;
470         int it;
471         bool more;
472 
473         inDL = new WSDL<DoubleElementType>(1, 100000, fRm);
474         dDL = new WSDL<DoubleElementType>(1, 100000, fRm);
475         outDL = new WSDL<DoubleElementType>(1, 100000, fRm);
476         inputADL = new AnyDataList();
477         driverADL = new AnyDataList();
478         outputADL = new AnyDataList();
479         inputADL->doubleDL(inDL);
480         driverADL->doubleDL(dDL);
481         outputADL->doubleDL(outDL);
482         inputSPtr.reset(inputADL);
483         driverSPtr.reset(driverADL);
484         outputSPtr.reset(outputADL);
485 
486         in.outAdd(inputSPtr);
487         in.outAdd(driverSPtr);
488         out.outAdd(outputSPtr);
489 
490         for (i = 0; i < ::count; i++)
491         {
492             e.first = i;
493             e.second = ((double) i) + 0.1;
494             inDL->insert(e);
495         }
496 
497         inDL->endOfInput();
498 
499         for (i = 0; i < ::count; i += 2)
500         {
501             e.first = i;
502             e.second = ((double) i) + 0.1;
503             dDL->insert(e);
504         }
505 
506         dDL->endOfInput();
507 
508         ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
509         rs.run();
510         rs.join();
511 
512         it = outDL->getIterator();
513         more = outDL->next(it, &e);
514         i = 0;
515 
516         while (more)
517         {
518 //    			cout << i << ": first: " << e.first << " second: " << e.second << endl;
519             CPPUNIT_ASSERT(e.first < ::count);
520             CPPUNIT_ASSERT(e.first % 2 == 0);
521             more = outDL->next(it, &e);
522             i++;
523         }
524     }
525 
reduceStep_4()526     void reduceStep_4()
527     {
528         JobStepAssociation in, out;
529         AnyDataList* inputADL, *driverADL, *outputADL;
530         AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
531         BucketDL<ElementType>* inDL, *dDL;
532         WSDL<ElementType>* outDL;
533         ElementType e;
534         unsigned i;
535         int it;
536         bool more;
537 
538         inDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
539         dDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
540         outDL = new WSDL<ElementType>(1, 100000, fRm);
541         inputADL = new AnyDataList();
542         driverADL = new AnyDataList();
543         outputADL = new AnyDataList();
544         inputADL->bucketDL(inDL);
545         driverADL->bucketDL(dDL);
546         outputADL->workingSetDL(outDL);
547         inputSPtr.reset(inputADL);
548         driverSPtr.reset(driverADL);
549         outputSPtr.reset(outputADL);
550 
551         in.outAdd(inputSPtr);
552         in.outAdd(driverSPtr);
553         out.outAdd(outputSPtr);
554 
555 // 		cout << "making input DataList" << endl;
556 
557         for (i = 0; i < ::count; i++)
558         {
559             e.first = i;
560             e.second = i + 1;
561             inDL->insert(e);
562         }
563 
564         inDL->endOfInput();
565 
566 // 		cout << "making driver DataList" << endl;
567 
568         for (i = 0; i < ::count; i += 2)
569         {
570             e.first = i;
571             e.second = i + 1;
572             dDL->insert(e);
573         }
574 
575         dDL->endOfInput();
576 
577 // 		cout << "reducing" << endl;
578 
579         ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
580         rs.run();
581         rs.join();
582 
583         it = outDL->getIterator();
584         more = outDL->next(it, &e);
585         i = 0;
586 
587         while (more)
588         {
589 //  			cout << i << ": first: " << e.first << " second: " << e.second << endl;
590             CPPUNIT_ASSERT(e.first < ::count);
591             CPPUNIT_ASSERT(e.first % 2 == 0);
592             more = outDL->next(it, &e);
593             i++;
594         }
595     }
596 
pnljoin_1()597     void pnljoin_1()
598     {
599         DistributedEngineComm* dec;
600         boost::shared_ptr<CalpontSystemCatalog> cat;
601         ElementType e;
602         int it;
603         bool more;
604 
605         dec = DistributedEngineComm::instance(fRm);
606         // 	dec = DistributedEngineComm::instance("./config-dec.xml");
607         cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
608 
609         JobStepAssociation inJs;
610         JobStepAssociation outJs;
611 
612         AnyDataListSPtr spdl1(new AnyDataList());
613         WSDL<ElementType>* dl1 = new WSDL<ElementType>(1, 100, fRm);
614         spdl1->workingSetDL(dl1);
615         inJs.outAdd(spdl1);
616 
617         AnyDataListSPtr spdl2(new AnyDataList());
618         WSDL<ElementType>* dl2 = new WSDL<ElementType>(1, 100, fRm);
619         spdl2->workingSetDL(dl2);
620         outJs.outAdd(spdl2);
621 
622         /* These values are unique to Pat's DB files unfortunately. */
623         /* Fill in the value list */
624         dl1->insert(ElementType(1, 3179));  // row 10 in the target
625         dl1->insert(ElementType(2, 3191));	// row 12
626         dl1->insert(ElementType(3, 3207));	// row 14
627         dl1->insert(ElementType(4, 3318));	// row 20  OOO
628         dl1->insert(ElementType(5, 3242));	// row 16
629         dl1->insert(ElementType(6, 3289));	// row 18
630         dl1->insert(ElementType(7, 3191));  // duplicate value of row 14
631 
632         dl1->endOfInput();
633         dl1->OID(1003);
634 
635         PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
636 
637         joiner.run();
638         joiner.join();
639 
640         it = dl2->getIterator();
641         more = dl2->next(it, &e);
642 
643         while (more)
644         {
645 #ifdef DEBUG
646             cout << "first: " << e.first << " second: " << e.second << endl;
647 #endif
648             more = dl2->next(it, &e);
649         }
650     }
pnljoin_2()651     void pnljoin_2()
652     {
653         DistributedEngineComm* dec;
654         boost::shared_ptr<CalpontSystemCatalog> cat;
655         ElementType e;
656         int i, it;
657         bool more;
658 
659         dec = DistributedEngineComm::instance(fRm);
660         // 	dec = DistributedEngineComm::instance("./config-dec.xml");
661         cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
662 
663         JobStepAssociation inJs;
664         JobStepAssociation outJs;
665 
666         AnyDataListSPtr spdl1(new AnyDataList());
667         WSDL<ElementType>* valueList = new WSDL<ElementType>(1, 100, fRm);
668         spdl1->workingSetDL(valueList);
669         inJs.outAdd(spdl1);
670 
671         AnyDataListSPtr spdl2(new AnyDataList());
672         AnyDataListSPtr spdl3(new AnyDataList());
673         WSDL<ElementType>* colResults = new WSDL<ElementType>(1, 100, fRm);
674         WSDL<ElementType>* inputRidList = new WSDL<ElementType>(1, 100, fRm);
675         spdl2->workingSetDL(colResults);
676         spdl3->workingSetDL(inputRidList);
677         outJs.outAdd(spdl2);
678         inJs.outAdd(spdl3);
679 
680 
681         /* These values are unique to Pat's DB files unfortunately. */
682         /* Fill in the value list */
683         valueList->insert(ElementType(1, 3179));  // row 10 in the target
684         valueList->insert(ElementType(2, 3191));	// row 12
685         valueList->insert(ElementType(3, 3207));	// row 14
686         valueList->insert(ElementType(4, 3318));	// row 20  OOO
687         valueList->insert(ElementType(5, 3242));	// row 16
688         valueList->insert(ElementType(6, 3289));	// row 18
689         valueList->insert(ElementType(7, 3191));  // duplicate value of row 14
690 
691         valueList->endOfInput();
692         valueList->OID(1003);
693 
694         // supply a ridlist with row 16 missing; make sure it's missing in the result
695         for (i = 0; i < 25; i++)
696             if (i != 16)
697                 inputRidList->insert(ElementType(i, i));
698 
699         inputRidList->endOfInput();
700 
701         PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
702 
703         joiner.run();
704         joiner.join();
705 
706         it = colResults->getIterator();
707         more = colResults->next(it, &e);
708 
709         while (more)
710         {
711 #ifdef DEBUG
712             cout << "first: " << e.first << " second: " << e.second << endl;
713 #endif
714             more = colResults->next(it, &e);
715         }
716     }
717 
pnljoin_3()718     void pnljoin_3()
719     {
720         DistributedEngineComm* dec;
721         boost::shared_ptr<CalpontSystemCatalog> cat;
722         ElementType e;
723         int i, it;
724         bool more;
725 
726         dec = DistributedEngineComm::instance(fRm);
727         // 	dec = DistributedEngineComm::instance("./config-dec.xml");
728         cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
729 
730         AnyDataListSPtr spdl1(new AnyDataList());
731         AnyDataListSPtr spdl3(new AnyDataList());
732         WSDL<ElementType>* valueList = new WSDL<ElementType>(1, 100, fRm);
733         WSDL<ElementType>* inputRidList = new WSDL<ElementType>(1, 100, fRm);
734         spdl1->workingSetDL(valueList);
735         spdl3->workingSetDL(inputRidList);
736 
737         JobStepAssociation inJs;
738         JobStepAssociation outJs;
739 
740         inJs.outAdd(spdl1);
741         inJs.outAdd(spdl3);
742 
743         AnyDataListSPtr spdl2(new AnyDataList());
744         AnyDataListSPtr spdl4(new AnyDataList());
745         WSDL<ElementType>* colResults = new WSDL<ElementType>(1, 100, fRm);
746         WSDL<ElementType>* reducedRidList = new WSDL<ElementType>(1, 100, fRm);
747         spdl2->workingSetDL(colResults);
748         spdl4->workingSetDL(reducedRidList);
749         outJs.outAdd(spdl2);
750         outJs.outAdd(spdl4);
751 
752         /* These values are unique to Pat's DB files unfortunately. */
753         /* Fill in the value list */
754         valueList->insert(ElementType(10, 3179));  // row 10 in the target
755         valueList->insert(ElementType(12, 3191));	// row 12
756         valueList->insert(ElementType(14541513, 3207));	// row 14 - on output should be in colResults, not in reducedridlist
757         valueList->insert(ElementType(20, 3318));	// row 20  OOO
758         valueList->insert(ElementType(16, 3242));	// row 16
759         valueList->insert(ElementType(18, 3289));	// row 18
760 
761         // XXXPAT: Duplicates here can end up in the reducedRidList.  Technically it's
762         // correct, but do we want that or not?
763 // 		valueList->insert(ElementType(12, 3191));  // duplicate value of row 12
764 
765         valueList->endOfInput();
766         valueList->OID(1003);
767 
768         // supply a ridlist with row 16 missing; make sure it's missing in the result
769         for (i = 0; i < 25; i++)
770             if (i != 16)
771                 inputRidList->insert(ElementType(i, i));
772 
773         inputRidList->endOfInput();
774 
775         PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
776 
777         joiner.run();
778         joiner.join();
779 
780         it = colResults->getIterator();
781         more = colResults->next(it, &e);
782 #ifdef DEBUG
783         cout << "ColResults:" << endl;
784 #endif
785 
786         while (more)
787         {
788 #ifdef DEBUG
789             cout << "   first: " << e.first << " second: " << e.second << endl;
790 #endif
791             more = colResults->next(it, &e);
792         }
793 
794         it = reducedRidList->getIterator();
795         more = reducedRidList->next(it, &e);
796 #ifdef DEBUG
797         cout << "Reduced Rid List:" << endl;
798 #endif
799 
800         while (more)
801         {
802 #ifdef DEBUG
803             cout << "   first: " << e.first << " second: " << e.second << endl;
804 #endif
805             more = reducedRidList->next(it, &e);
806         }
807 
808     }
809 
unionStep_1()810     void unionStep_1()
811     {
812         JobStepAssociation in, out;
813         AnyDataList* inputADL, *driverADL, *outputADL;
814         AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
815         WSDL<ElementType>* outDL;
816         set<ElementType> s;
817         set<ElementType>::iterator sIt;
818         ElementType e;
819         unsigned i;
820         int it;
821         bool more;
822 
823         ZDL<ElementType>* inDL = new ZDL<ElementType>(1, fRm);
824         ZDL<ElementType>* dDL = new ZDL<ElementType>(1, fRm);
825 
826         outDL = new WSDL<ElementType>(1, 100000, fRm);
827         inputADL = new AnyDataList();
828         driverADL = new AnyDataList();
829         outputADL = new AnyDataList();
830         inputADL->zonedDL(inDL);
831         driverADL->zonedDL(dDL);
832 
833         outputADL->workingSetDL(outDL);
834         inputSPtr.reset(inputADL);
835         driverSPtr.reset(driverADL);
836         outputSPtr.reset(outputADL);
837 
838         in.outAdd(inputSPtr);
839         in.outAdd(driverSPtr);
840         out.outAdd(outputSPtr);
841 
842 //  		cout << "making input DataList" << endl;
843         vector <ElementType> vec1;
844         vector <ElementType> vec2;
845 
846         for (i = 0; i < ::count; i++)
847         {
848             e.first = i;
849             e.second = i + 1;
850             vec1.push_back(e);
851 
852             if (vec1.size() >= ZDL_VEC_SIZE)
853             {
854                 inDL->insert(vec1);
855                 vec1.clear();
856             }
857 
858 // 			inDL->insert(e);
859         }
860 
861         if (!vec1.empty())
862             inDL->insert(vec1);
863 
864         inDL->endOfInput();
865 
866 // 		cout << "making driver DataList" << endl;
867 
868         for (i = 0; i < ::count; i += 2)
869         {
870             e.first = i;
871             e.second = i + 1;
872             vec2.push_back(e);
873 
874             if (vec2.size() >= ZDL_VEC_SIZE)
875             {
876                 dDL->insert(vec2);
877                 vec2.clear();
878             }
879 
880 // 			dDL->insert(e);
881         }
882 
883         if (!vec2.empty())
884             dDL->insert(vec2);
885 
886         dDL->endOfInput();
887 
888 //  		cout << "unionizing" << endl;
889 
890         UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
891         rs.run();
892         rs.join();
893 
894         CPPUNIT_ASSERT(outDL->totalSize() == inDL->totalSize());
895 
896         it = outDL->getIterator();
897 
898         for (i = 0, more = outDL->next(it, &e) ; more; more = outDL->next(it, &e), i++)
899             s.insert(e);
900 
901         CPPUNIT_ASSERT(outDL->totalSize() == i);
902         CPPUNIT_ASSERT(i == ::count);
903 
904         CPPUNIT_ASSERT(s.size() == outDL->totalSize());  // verifies no duplicates in outDL
905 
906         for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
907             CPPUNIT_ASSERT(sIt->first == i);
908 
909         CPPUNIT_ASSERT(i == ::count);	// verifies they all exist.
910     }
911 
unionStep_2()912     void unionStep_2()
913     {
914         JobStepAssociation in, out;
915         AnyDataList* inputADL, *driverADL, *outputADL;
916         AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
917         WSDL<ElementType>* inDL, *dDL, *outDL;
918         set<ElementType> s;
919         set<ElementType>::iterator sIt;
920         ElementType e;
921         unsigned i;
922         int it;
923         bool more;
924 
925         inDL = new WSDL<ElementType>(1, 100000, fRm);
926         dDL = new WSDL<ElementType>(1, 100000, fRm);
927         outDL = new WSDL<ElementType>(1, 100000, fRm);
928         inputADL = new AnyDataList();
929         driverADL = new AnyDataList();
930         outputADL = new AnyDataList();
931         inputADL->workingSetDL(inDL);
932         driverADL->workingSetDL(dDL);
933         outputADL->workingSetDL(outDL);
934         inputSPtr.reset(inputADL);
935         driverSPtr.reset(driverADL);
936         outputSPtr.reset(outputADL);
937 
938         in.outAdd(inputSPtr);
939         in.outAdd(driverSPtr);
940         out.outAdd(outputSPtr);
941 
942 //  		cout << "making input DataList" << endl;
943 
944         for (i = 0; i < ::count; i += 2)
945         {
946             e.first = i;
947             e.second = i + 1;
948             inDL->insert(e);
949         }
950 
951         inDL->endOfInput();
952 
953 //  		cout << "making driver DataList" << endl;
954 
955         for (i = 0; i < ::count; i++)
956         {
957             e.first = i;
958             e.second = i + 1;
959             dDL->insert(e);
960         }
961 
962         dDL->endOfInput();
963 
964 //  		cout << "unionizing" << endl;
965 
966         UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
967         rs.run();
968         rs.join();
969 
970         CPPUNIT_ASSERT(outDL->totalSize() == dDL->totalSize());
971 
972         it = outDL->getIterator();
973 
974         for (i = 0, more = outDL->next(it, &e) ; more; more = outDL->next(it, &e), i++)
975             s.insert(e);
976 
977         CPPUNIT_ASSERT(outDL->totalSize() == i);
978         CPPUNIT_ASSERT(i == ::count);
979 
980         CPPUNIT_ASSERT(s.size() == outDL->totalSize());  // verifies no duplicates in outDL
981 
982         for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
983             CPPUNIT_ASSERT(sIt->first == i);
984 
985         CPPUNIT_ASSERT(i == ::count);	// verifies they all exist.
986     }
987 
unionStep_3()988     void unionStep_3()
989     {
990         JobStepAssociation in, out;
991         AnyDataList* inputADL, *driverADL, *outputADL;
992         AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
993         WSDL<ElementType>* outDL;
994         BucketDL<ElementType>* inDL, *dDL;
995         set<ElementType> s;
996         set<ElementType>::iterator sIt;
997         ElementType e;
998         unsigned i;
999         int it;
1000         bool more;
1001 
1002         inDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
1003         dDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
1004         outDL = new WSDL<ElementType>(1, 100000, fRm);
1005         inputADL = new AnyDataList();
1006         driverADL = new AnyDataList();
1007         outputADL = new AnyDataList();
1008         inputADL->bucketDL(inDL);
1009         driverADL->bucketDL(dDL);
1010         outputADL->workingSetDL(outDL);
1011         inputSPtr.reset(inputADL);
1012         driverSPtr.reset(driverADL);
1013         outputSPtr.reset(outputADL);
1014 
1015         in.outAdd(inputSPtr);
1016         in.outAdd(driverSPtr);
1017         out.outAdd(outputSPtr);
1018 
1019 //  		cout << "making input DataList" << endl;
1020 
1021         for (i = 0; i < ::count; i++)
1022         {
1023             e.first = i;
1024             e.second = i + 1;
1025             inDL->insert(e);
1026         }
1027 
1028         inDL->endOfInput();
1029 
1030 //  		cout << "making driver DataList" << endl;
1031 
1032         for (; i < ::count * 2; i++)
1033         {
1034             e.first = i;
1035             e.second = i + 1;
1036             dDL->insert(e);
1037         }
1038 
1039         dDL->endOfInput();
1040 
1041 //  		cout << "unionizing" << endl;
1042 
1043         UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
1044         rs.run();
1045         rs.join();
1046 
1047         CPPUNIT_ASSERT(outDL->totalSize() == 2 * ::count);
1048 
1049         it = outDL->getIterator();
1050 
1051         for (i = 0, more = outDL->next(it, &e) ; more; more = outDL->next(it, &e), i++)
1052             s.insert(e);
1053 
1054         CPPUNIT_ASSERT(outDL->totalSize() == i);
1055         CPPUNIT_ASSERT(i == 2 * ::count);
1056 
1057         CPPUNIT_ASSERT(s.size() == outDL->totalSize());  // verifies no duplicates in outDL
1058 
1059         for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
1060             CPPUNIT_ASSERT(sIt->first == i);
1061 
1062         CPPUNIT_ASSERT(i == 2 * ::count);	// verifies they all exist.
1063     }
1064 
1065 
1066 
1067 };
1068 
1069 CPPUNIT_TEST_SUITE_REGISTRATION(JobStepDriver);
1070 
main(int argc,char ** argv)1071 int main( int argc, char** argv)
1072 {
1073     CppUnit::TextUi::TestRunner runner;
1074     CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
1075     runner.addTest( registry.makeTest() );
1076     bool wasSuccessful = runner.run( "", false );
1077     return (wasSuccessful ? 0 : 1);
1078 }
1079