1 /* Copyright (C) 2019 MariaDB Corporaton.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18 // $Id: tdriver-filter.cpp 9210 2013-01-21 14:10:42Z rdempsey $
19
20 #include <list>
21 #include <sstream>
22 #include <pthread.h>
23 #include <iomanip>
24 #include <cppunit/extensions/HelperMacros.h>
25 #include <cppunit/extensions/TestFactoryRegistry.h>
26 #include <cppunit/ui/text/TestRunner.h>
27
28 #include "jobstep.h"
29 #include "funcexp.h"
30 #include "jlf_common.h"
31 #include "tupleannexstep.h"
32 #include "calpontsystemcatalog.h"
33 #include "resourcemanager.h"
34 #include <boost/any.hpp>
35 #include <boost/function.hpp>
36 #include "bytestream.h"
37 #include <time.h>
38 #include <sys/time.h>
39 #include <limits.h>
40
41 #define DEBUG
42 #define MEMORY_LIMIT 14983602176
43
44 using namespace std;
45 using namespace joblist;
46 using namespace messageqcpp;
47
48 // Timer class used by this tdriver to output elapsed times, etc.
49 class Timer
50 {
51 public:
start(const string & message)52 void start(const string& message)
53 {
54 if (!fHeaderDisplayed)
55 {
56 header();
57 fHeaderDisplayed = true;
58 }
59
60 gettimeofday(&fTvStart, 0);
61 cout << timestr() << " Start " << message << endl;
62 }
63
stop(const string & message)64 void stop(const string& message)
65 {
66 time_t now;
67 time(&now);
68 string secondsElapsed;
69 getTimeElapsed(secondsElapsed);
70 cout << timestr() << " " << secondsElapsed << " Stop " << message << endl;
71 }
72
Timer()73 Timer() : fHeaderDisplayed(false) {}
74
75 private:
76
77 struct timeval fTvStart;
78 bool fHeaderDisplayed;
79
getTimeElapsed(string & seconds)80 double getTimeElapsed(string& seconds)
81 {
82 struct timeval tvStop;
83 gettimeofday(&tvStop, 0);
84 double secondsElapsed =
85 (tvStop.tv_sec + (tvStop.tv_usec / 1000000.0)) -
86 (fTvStart.tv_sec + (fTvStart.tv_usec / 1000000.0));
87 ostringstream oss;
88 oss << secondsElapsed;
89 seconds = oss.str();
90 seconds.resize(8, '0');
91 return secondsElapsed;
92 }
93
timestr()94 string timestr()
95 {
96 struct tm tm;
97 struct timeval tv;
98
99 gettimeofday(&tv, 0);
100 localtime_r(&tv.tv_sec, &tm);
101
102 ostringstream oss;
103 oss << setfill('0')
104 << setw(2) << tm.tm_hour << ':'
105 << setw(2) << tm.tm_min << ':'
106 << setw(2) << tm.tm_sec << '.'
107 << setw(6) << tv.tv_usec
108 ;
109 return oss.str();
110 }
111
header()112 void header()
113 {
114 cout << endl;
115 cout << "Time Seconds Activity" << endl;
116 }
117 };
118
119 class FilterDriver : public CppUnit::TestFixture
120 {
121
122 CPPUNIT_TEST_SUITE(FilterDriver);
123
124 CPPUNIT_TEST(ORDERBY_TIME_TEST);
125
126 CPPUNIT_TEST_SUITE_END();
127
128 private:
orderByTest_nRGs(uint64_t numRows,uint64_t limit,uint64_t maxThreads,bool parallelExecution,bool generateRandValues,bool hasDistinct)129 void orderByTest_nRGs(uint64_t numRows, uint64_t limit,
130 uint64_t maxThreads,
131 bool parallelExecution,
132 bool generateRandValues,
133 bool hasDistinct)
134 {
135 Timer timer;
136 // This test creates TAS for single bigint column and run sorting on it.
137
138 cout << endl;
139 cout << "------------------------------------------------------------" << endl;
140 timer.start("insert");
141
142 stringstream ss2;
143 ss2.flush();
144 ss2 << "loading " << numRows << " rows into initial RowGroup.";
145 std::string message = ss2.str();
146 timer.start(message);
147
148 ResourceManager* rm = ResourceManager::instance(true);
149
150 joblist::JobInfo jobInfo = joblist::JobInfo(rm);
151 // 1st column is the sorting key
152 uint8_t tupleKey = 1;
153 uint8_t offset = 0;
154 uint16_t rowsPerRG = 8192; // hardcoded max rows in RowGroup value
155 uint32_t numberOfRGs = numRows / rowsPerRG;
156
157 // set sorting rules
158 // true - ascending order, false otherwise
159 jobInfo.orderByColVec.push_back(make_pair(tupleKey, false));
160 jobInfo.limitStart = offset;
161 jobInfo.limitCount = limit;
162 jobInfo.hasDistinct = hasDistinct;
163 // JobInfo doesn't set these SP in ctor
164 jobInfo.umMemLimit.reset(new int64_t);
165 *(jobInfo.umMemLimit) = MEMORY_LIMIT;
166 SErrorInfo errorInfo(new ErrorInfo());
167 jobInfo.errorInfo = errorInfo;
168 uint32_t oid =3001;
169 // populate JobInfo.nonConstDelCols with dummy shared_pointers
170 // to notify TupleAnnexStep::initialise() about number of non-constant columns
171 execplan::SRCP srcp1, srcp2;
172 jobInfo.nonConstDelCols.push_back(srcp1);
173 jobInfo.nonConstDelCols.push_back(srcp2);
174
175 // create two columns RG. 1st is the sorting key, second is the data column
176 std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
177 std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
178 offsets.push_back(2); offsets.push_back(10); offsets.push_back(18);
179 roids.push_back(oid); roids.push_back(oid);
180 tkeys.push_back(1); tkeys.push_back(1);
181 types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
182 types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
183 cscale.push_back(0); cscale.push_back(0);
184 cprecision.push_back(20); cprecision.push_back(20);
185 rowgroup::RowGroup inRG(2, //column count
186 offsets, //oldOffset
187 roids, // column oids
188 tkeys, //keys
189 types, // types
190 cscale, //scale
191 cprecision, // precision
192 20, // sTableThreshold
193 false //useStringTable
194 );
195 rowgroup::RowGroup jobInfoRG(inRG);
196 joblist::TupleAnnexStep tns = joblist::TupleAnnexStep(jobInfo);
197 tns.addOrderBy(new joblist::LimitedOrderBy());
198 tns.delivery(true);
199 // activate parallel sort
200 if(parallelExecution)
201 {
202 tns.setParallelOp();
203 }
204 tns.setMaxThreads(maxThreads);
205 tns.initialize(jobInfoRG, jobInfo);
206 tns.setLimit(0, limit);
207
208 // Create JobStepAssociation mediator class ins to connect DL and JS
209 joblist::AnyDataListSPtr spdlIn(new AnyDataList());
210 //joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, jobInfo.fifoSize);
211 joblist::RowGroupDL* dlIn = new RowGroupDL(maxThreads, 21001);
212 dlIn->OID(oid);
213 spdlIn->rowGroupDL(dlIn);
214 joblist::JobStepAssociation jsaIn;
215 jsaIn.outAdd(spdlIn);
216 tns.inputAssociation(jsaIn);
217
218 uint64_t base = 42;
219 uint64_t maxInt = 0;
220 if(generateRandValues)
221 ::srand(base);
222 uint64_t nextValue;
223 for(uint32_t i = 1; i <= numberOfRGs; i++)
224 {
225 // create RGData with the RG structure and manually populate RG
226 // Use reinint(numberOfRGs) to preset an array
227 rowgroup::RGData rgD = rowgroup::RGData(inRG);
228 inRG.setData(&rgD);
229 rowgroup::Row r;
230 inRG.initRow(&r);
231 uint32_t rowSize = r.getSize();
232 inRG.getRow(0, &r);
233 // populate RGData
234 //for(uint64_t i = rowsPerRG+1; i > 0; i--)
235 for(uint64_t i = 0; i < rowsPerRG; i++) // Worst case scenario for PQ
236 {
237 // TBD Use set..._offset methods to avoid offset calculation instructions
238 if(generateRandValues)
239 {
240 nextValue = ::rand();
241 }
242 else
243 {
244 nextValue = base + i;
245 }
246
247 r.setUintField<8>(nextValue, 0);
248 r.setUintField<8>(nextValue, 1);
249 if (maxInt < nextValue)
250 {
251 maxInt = nextValue;
252 }
253 r.nextRow(rowSize);
254 }
255 base += 1;
256 inRG.setRowCount(rowsPerRG);
257
258 // Insert RGData into input DL
259 dlIn->insert(rgD);
260 }
261 // end of input signal
262 std::cout << "orderByTest_nRGs input DataList totalSize " << dlIn->totalSize() << std::endl;
263 dlIn->endOfInput();
264 timer.stop(message);
265
266 joblist::AnyDataListSPtr spdlOut(new AnyDataList());
267 // Set the ring buffer big enough to take RGData-s with results b/c
268 // there is nobody to read out of the buffer.
269 joblist::RowGroupDL* dlOut = new RowGroupDL(1, numberOfRGs);
270 dlOut->OID(oid);
271 spdlOut->rowGroupDL(dlOut);
272 joblist::JobStepAssociation jsaOut;
273 jsaOut.outAdd(spdlOut);
274 //uint64_t outputDLIter = dlOut->getIterator();
275 tns.outputAssociation(jsaOut);
276
277 // Run Annex Step
278 message = "Sorting";
279 timer.start(message);
280 tns.run();
281 tns.join();
282 timer.stop(message);
283
284 // serialize RGData into bs and later back
285 // to follow ExeMgr whilst getting TAS result RowGroup
286 messageqcpp::ByteStream bs;
287 uint32_t result = 0;
288 rowgroup::RowGroup outRG(inRG);
289 rowgroup::RGData outRGData(outRG);
290 result = tns.nextBand(bs);
291 /*bool more = false;
292 do
293 {
294 dlOut->next(outputDLIter, &outRGData);
295 } while (more);*/
296 std::cout << "orderByTest_nRGs result " << result << std::endl;
297 //CPPUNIT_ASSERT( result == limit );
298 outRGData.deserialize(bs);
299 outRG.setData(&outRGData);
300
301 //std::cout << "orderByTest_nRGs output RG " << outRG.toString() << std::endl;
302 std::cout << "maxInt " << maxInt << std::endl;
303 {
304 rowgroup::Row r;
305 outRG.initRow(&r);
306 outRG.getRow(0, &r);
307 CPPUNIT_ASSERT(limit == outRG.getRowCount() || outRG.getRowCount() == 8192);
308 CPPUNIT_ASSERT_EQUAL(maxInt, r.getUintField(1));
309 }
310
311 cout << "------------------------------------------------------------" << endl;
312 }
313
314
ORDERBY_TIME_TEST()315 void ORDERBY_TIME_TEST()
316 {
317 uint64_t numRows = 8192;
318 uint64_t maxThreads = 8;
319 // limit == 100000 is still twice as good to sort in parallel
320 // limit == 1000000 however is better to sort using single threaded sorting
321 uint64_t limit = 100000;
322 bool parallel = true;
323 bool woParallel = false;
324 bool generateRandValues = true;
325 bool hasDistinct = true;
326 bool noDistinct = false;
327 orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, noDistinct);
328 orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, noDistinct);
329 orderByTest_nRGs(numRows * 14400, limit, maxThreads, woParallel, generateRandValues, hasDistinct);
330 orderByTest_nRGs(numRows * 14400, limit, maxThreads, parallel, generateRandValues, hasDistinct);
331 }
QUICK_TEST()332 void QUICK_TEST()
333 {
334 float f = 1.1;
335 double d = 1.2;
336 uint64_t i = 1;
337 uint64_t* i_ptr = &i;
338 double* d_ptr = &d;
339 uint64_t* i2_ptr = (uint64_t*) d_ptr;
340 float* f_ptr = &f;
341 i_ptr = (uint64_t*) f_ptr;
342
343 cout << "*i_ptr=" << *i_ptr << endl;
344 cout << "*i2_ptr=" << *i2_ptr << endl;
345 f_ptr = (float*) i_ptr;
346
347 cout << "*f_ptr=" << *f_ptr << endl;
348
349 cout << endl;
350
351 if (d > i)
352 cout << "1.2 is greater than 1." << endl;
353
354 if (f > i)
355 cout << "1.1 is greater than 1." << endl;
356
357 if (d > f)
358 cout << "1.2 is greater than 1.1" << endl;
359
360 if (*i_ptr < *i2_ptr)
361 cout << "1.1 < 1.2 when represented as uint64_t." << endl;
362
363 cout << "sizeof(f) = " << sizeof(f) << endl;
364 cout << "sizeof(i) = " << sizeof(i) << endl;
365 cout << "sizeof(d) = " << sizeof(d) << endl;
366
367 double dbl = 9.7;
368 double dbl2 = 1.3;
369 i_ptr = (uint64_t*) &dbl;
370 i2_ptr = (uint64_t*) &dbl2;
371 cout << endl;
372 cout << "9.7 as int is " << *i_ptr << endl;
373 cout << "9.7 as int is " << *i2_ptr << endl;
374 cout << "1.2 < 9.7 is " << (*i_ptr < *i2_ptr) << endl;
375 }
376 };
377
378 CPPUNIT_TEST_SUITE_REGISTRATION(FilterDriver);
379
380
main(int argc,char ** argv)381 int main( int argc, char** argv)
382 {
383 CppUnit::TextUi::TestRunner runner;
384 CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
385 runner.addTest( registry.makeTest() );
386 bool wasSuccessful = runner.run( "", false );
387 return (wasSuccessful ? 0 : 1);
388 }
389
390
391