1 /* Copyright (C) 2019 MariaDB Corporaton.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 // $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $
19 
20 #include <list>
21 #include <sstream>
22 #include <pthread.h>
23 #include <iomanip>
24 #include <cppunit/extensions/HelperMacros.h>
25 #include <cppunit/extensions/TestFactoryRegistry.h>
26 #include <cppunit/ui/text/TestRunner.h>
27 
28 #include "jobstep.h"
29 #include "funcexp.h"
30 #include "jlf_common.h"
31 #include "tupleannexstep.h"
32 #include "calpontsystemcatalog.h"
33 #include "resourcemanager.h"
34 #include <boost/any.hpp>
35 #include <boost/function.hpp>
36 #include "bytestream.h"
37 #include <time.h>
38 #include <sys/time.h>
39 #include <limits.h>
40 
41 #define DEBUG
42 #define MEMORY_LIMIT 14983602176
43 
44 using namespace std;
45 using namespace joblist;
46 using namespace messageqcpp;
47 
48 // Timer class used by this tdriver to output elapsed times, etc.
49 class Timer
50 {
51 public:
start(const string & message)52     void start(const string& message)
53     {
54         if (!fHeaderDisplayed)
55         {
56             header();
57             fHeaderDisplayed = true;
58         }
59 
60         gettimeofday(&fTvStart, 0);
61         cout << timestr() << "          Start " << message << endl;
62     }
63 
stop(const string & message)64     void stop(const string& message)
65     {
66         time_t now;
67         time(&now);
68         string secondsElapsed;
69         getTimeElapsed(secondsElapsed);
70         cout << timestr() << " " << secondsElapsed << " Stop  " << message << endl;
71     }
72 
Timer()73     Timer() : fHeaderDisplayed(false) {}
74 
75 private:
76 
77     struct timeval fTvStart;
78     bool fHeaderDisplayed;
79 
getTimeElapsed(string & seconds)80     double getTimeElapsed(string& seconds)
81     {
82         struct timeval tvStop;
83         gettimeofday(&tvStop, 0);
84         double secondsElapsed =
85             (tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) -
86             (fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0));
87         ostringstream oss;
88         oss << secondsElapsed;
89         seconds = oss.str();
90         seconds.resize(8, '0');
91         return secondsElapsed;
92     }
93 
timestr()94     string timestr()
95     {
96         struct tm tm;
97         struct timeval tv;
98 
99         gettimeofday(&tv, 0);
100         localtime_r(&tv.tv_sec, &tm);
101 
102         ostringstream oss;
103         oss << setfill('0')
104             << setw(2) << tm.tm_hour << ':'
105             << setw(2) << tm.tm_min << ':'
106             << setw(2) << tm.tm_sec	<< '.'
107             << setw(6) << tv.tv_usec
108             ;
109         return oss.str();
110     }
111 
header()112     void header()
113     {
114         cout << endl;
115         cout << "Time            Seconds  Activity" << endl;
116     }
117 };
118 
119 class FilterDriver : public CppUnit::TestFixture
120 {
121 
122     CPPUNIT_TEST_SUITE(FilterDriver);
123 
124     CPPUNIT_TEST(ORDERBY_TIME_TEST);
125 
126     CPPUNIT_TEST_SUITE_END();
127 
128 private:
orderByTest_nRGs(uint64_t numRows,uint64_t limit,uint64_t maxThreads,bool parallelExecution,bool generateRandValues,bool hasDistinct)129    void orderByTest_nRGs(uint64_t numRows, uint64_t limit,
130     uint64_t maxThreads,
131     bool parallelExecution,
132     bool generateRandValues,
133     bool hasDistinct)
134     {
135         Timer timer;
136         // This test creates TAS for single bigint column and run sorting on it.
137 
138         cout << endl;
139         cout << "------------------------------------------------------------" << endl;
140         timer.start("insert");
141 
142         stringstream ss2;
143         ss2.flush();
144         ss2 << "loading " << numRows << " rows into initial RowGroup.";
145         std::string message = ss2.str();
146         timer.start(message);
147 
148         ResourceManager* rm = ResourceManager::instance(true);
149 
150         joblist::JobInfo jobInfo = joblist::JobInfo(rm);
151         // 1st column is the sorting key
152         uint8_t tupleKey = 1;
153         uint8_t offset = 0;
154         uint16_t rowsPerRG = 8192; // hardcoded max rows in RowGroup value
155         uint32_t numberOfRGs = numRows / rowsPerRG;
156 
157         // set sorting rules
158         // true - ascending order, false otherwise
159         jobInfo.orderByColVec.push_back(make_pair(tupleKey, false));
160         jobInfo.limitStart = offset;
161         jobInfo.limitCount = limit;
162         jobInfo.hasDistinct = hasDistinct;
163         // JobInfo doesn't set these SP in ctor
164         jobInfo.umMemLimit.reset(new int64_t);
165         *(jobInfo.umMemLimit) = MEMORY_LIMIT;
166         SErrorInfo errorInfo(new ErrorInfo());
167         jobInfo.errorInfo = errorInfo;
168         uint32_t oid =3001;
169         // populate JobInfo.nonConstDelCols with dummy shared_pointers
170         // to notify TupleAnnexStep::initialise() about number of non-constant columns
171         execplan::SRCP srcp1, srcp2;
172         jobInfo.nonConstDelCols.push_back(srcp1);
173         jobInfo.nonConstDelCols.push_back(srcp2);
174 
175         // create two columns RG. 1st is the sorting key, second is the data column
176         std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
177         std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
178         offsets.push_back(2); offsets.push_back(10); offsets.push_back(18);
179         roids.push_back(oid); roids.push_back(oid);
180         tkeys.push_back(1); tkeys.push_back(1);
181         types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
182         types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
183         cscale.push_back(0); cscale.push_back(0);
184         cprecision.push_back(20); cprecision.push_back(20);
185         rowgroup::RowGroup inRG(2, //column count
186             offsets, //oldOffset
187             roids, // column oids
188             tkeys, //keys
189             types, // types
190             cscale, //scale
191             cprecision, // precision
192             20, // sTableThreshold
193             false //useStringTable
194         );
195         rowgroup::RowGroup jobInfoRG(inRG);
196         joblist::TupleAnnexStep tns = joblist::TupleAnnexStep(jobInfo);
197         tns.addOrderBy(new joblist::LimitedOrderBy());
198         tns.delivery(true);
199         // activate parallel sort
200         if(parallelExecution)
201         {
202             tns.setParallelOp();
203         }
204         tns.setMaxThreads(maxThreads);
205         tns.initialize(jobInfoRG, jobInfo);
206         tns.setLimit(0, limit);
207 
208         // Create JobStepAssociation mediator class ins to connect DL and JS
209         joblist::AnyDataListSPtr spdlIn(new AnyDataList());
210         //joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, jobInfo.fifoSize);
211         joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, 21001);
212         dlIn->OID(oid);
213         spdlIn->rowGroupDL(dlIn);
214         joblist::JobStepAssociation jsaIn;
215         jsaIn.outAdd(spdlIn);
216         tns.inputAssociation(jsaIn);
217 
218         uint64_t base = 42;
219         uint64_t maxInt = 0;
220         if(generateRandValues)
221             ::srand(base);
222         uint64_t nextValue;
223         for(uint32_t i = 1; i <= numberOfRGs; i++)
224         {
225             // create RGData with the RG structure and manually populate RG
226             // Use reinint(numberOfRGs) to preset an array
227             rowgroup::RGData rgD = rowgroup::RGData(inRG);
228             inRG.setData(&rgD);
229             rowgroup::Row r;
230             inRG.initRow(&r);
231             uint32_t rowSize = r.getSize();
232             inRG.getRow(0, &r);
233             // populate RGData
234             //for(uint64_t i = rowsPerRG+1; i > 0; i--)
235             for(uint64_t i = 0; i < rowsPerRG; i++) // Worst case scenario for PQ
236             {
237                 // TBD Use set..._offset methods to avoid offset calculation instructions
238                 if(generateRandValues)
239                 {
240                     nextValue = ::rand();
241                 }
242                 else
243                 {
244                     nextValue = base + i;
245                 }
246 
247                 r.setUintField<8>(nextValue, 0);
248                 r.setUintField<8>(nextValue, 1);
249                 if (maxInt < nextValue)
250                 {
251                     maxInt = nextValue;
252                 }
253                 r.nextRow(rowSize);
254             }
255             base += 1;
256             inRG.setRowCount(rowsPerRG);
257 
258            // Insert RGData into input DL
259             dlIn->insert(rgD);
260         }
261         // end of input signal
262         std::cout << "orderByTest_nRGs input DataList totalSize " << dlIn->totalSize() << std::endl;
263         dlIn->endOfInput();
264         timer.stop(message);
265 
266         joblist::AnyDataListSPtr spdlOut(new AnyDataList());
267         // Set the ring buffer big enough to take RGData-s with results b/c
268         // there is nobody to read out of the buffer.
269         joblist::RowGroupDL* dlOut = new RowGroupDL(1, numberOfRGs);
270         dlOut->OID(oid);
271         spdlOut->rowGroupDL(dlOut);
272         joblist::JobStepAssociation jsaOut;
273         jsaOut.outAdd(spdlOut);
274         //uint64_t outputDLIter = dlOut->getIterator();
275         tns.outputAssociation(jsaOut);
276 
277         // Run Annex Step
278         message = "Sorting";
279         timer.start(message);
280         tns.run();
281         tns.join();
282         timer.stop(message);
283 
284         // serialize RGData into bs and later back
285         // to follow ExeMgr whilst getting TAS result RowGroup
286         messageqcpp::ByteStream bs;
287         uint32_t result = 0;
288         rowgroup::RowGroup outRG(inRG);
289         rowgroup::RGData outRGData(outRG);
290         result = tns.nextBand(bs);
291         /*bool more = false;
292         do
293         {
294             dlOut->next(outputDLIter, &outRGData);
295         } while (more);*/
296         std::cout << "orderByTest_nRGs result " << result << std::endl;
297         //CPPUNIT_ASSERT( result == limit );
298         outRGData.deserialize(bs);
299         outRG.setData(&outRGData);
300 
301         //std::cout << "orderByTest_nRGs output RG " << outRG.toString() << std::endl;
302         std::cout << "maxInt " << maxInt << std::endl;
303         {
304             rowgroup::Row r;
305             outRG.initRow(&r);
306             outRG.getRow(0, &r);
307             CPPUNIT_ASSERT(limit == outRG.getRowCount() || outRG.getRowCount() == 8192);
308             CPPUNIT_ASSERT_EQUAL(maxInt, r.getUintField(1));
309         }
310 
311         cout << "------------------------------------------------------------" << endl;
312     }
313 
314 
ORDERBY_TIME_TEST()315     void ORDERBY_TIME_TEST()
316     {
317         uint64_t numRows = 8192;
318         uint64_t maxThreads = 8;
319         // limit == 100000 is still twice as good to sort in parallel
320         // limit == 1000000 however is better to sort using single threaded sorting
321         uint64_t limit = 100000;
322         bool parallel = true;
323         bool woParallel = false;
324         bool generateRandValues = true;
325         bool hasDistinct = true;
326         bool noDistinct = false;
327         orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct);
328         orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct);
329         orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct);
330         orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct);
331     }
QUICK_TEST()332     void QUICK_TEST()
333     {
334         float f = 1.1;
335         double d = 1.2;
336         uint64_t i = 1;
337         uint64_t* i_ptr = &i;
338         double* d_ptr = &d;
339         uint64_t* i2_ptr = (uint64_t*) d_ptr;
340         float* f_ptr = &f;
341         i_ptr = (uint64_t*) f_ptr;
342 
343         cout << "*i_ptr=" << *i_ptr << endl;
344         cout << "*i2_ptr=" << *i2_ptr << endl;
345         f_ptr = (float*) i_ptr;
346 
347         cout << "*f_ptr=" << *f_ptr << endl;
348 
349         cout << endl;
350 
351         if (d > i)
352             cout << "1.2 is greater than 1." << endl;
353 
354         if (f > i)
355             cout << "1.1 is greater than 1." << endl;
356 
357         if (d > f)
358             cout << "1.2 is greater than 1.1" << endl;
359 
360         if (*i_ptr < *i2_ptr)
361             cout << "1.1 < 1.2 when represented as uint64_t." << endl;
362 
363         cout << "sizeof(f) = " << sizeof(f) << endl;
364         cout << "sizeof(i) = " << sizeof(i) << endl;
365         cout << "sizeof(d) = " << sizeof(d) << endl;
366 
367         double dbl = 9.7;
368         double dbl2 = 1.3;
369         i_ptr = (uint64_t*) &dbl;
370         i2_ptr = (uint64_t*) &dbl2;
371         cout << endl;
372         cout << "9.7 as int is " << *i_ptr << endl;
373         cout << "9.7 as int is " << *i2_ptr << endl;
374         cout << "1.2 < 9.7 is " << (*i_ptr < *i2_ptr) << endl;
375     }
376 };
377 
378 CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver);
379 
380 
main(int argc,char ** argv)381 int main( int argc, char** argv)
382 {
383     CppUnit::TextUi::TestRunner runner;
384     CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
385     runner.addTest( registry.makeTest() );
386     bool wasSuccessful = runner.run( "", false );
387     return (wasSuccessful ? 0 : 1);
388 }
389 
390 
391