1 /* Copyright (C) 2014 InfiniDB, Inc.
2 Copyright (C) 2016, 2017 MariaDB Corporation
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
7 the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 MA 02110-1301, USA. */
18
19 //
20 // $Id: batchprimitiveprocessor.cpp 2136 2013-07-24 21:04:30Z pleblanc $
21 // C++ Implementation: batchprimitiveprocessor
22 //
23 // Description:
24 //
25 //
26 // Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
27 //
28 // Copyright: See COPYING file that comes with this distribution
29 //
30 //
31
32 #include <stdexcept>
33 #include <unistd.h>
34 #include <cstring>
35 //#define NDEBUG
36 #include <cassert>
37 #include <string>
38 #include <sstream>
39 #include <set>
40 using namespace std;
41
42 #include <boost/thread.hpp>
43 #include <boost/shared_ptr.hpp>
44 using namespace boost;
45
46 #include "bpp.h"
47 #include "primitiveserver.h"
48 #include "errorcodes.h"
49 #include "exceptclasses.h"
50 #include "pp_logger.h"
51 #include "funcexpwrapper.h"
52 #include "fixedallocator.h"
53 #include "blockcacheclient.h"
54 #include "MonitorProcMem.h"
55 #include "threadnaming.h"
56 #include "vlarray.h"
57
58 #define MAX64 0x7fffffffffffffffLL
59 #define MIN64 0x8000000000000000LL
60
61 using namespace messageqcpp;
62 using namespace joiner;
63 using namespace std::tr1;
64 using namespace rowgroup;
65 using namespace funcexp;
66 using namespace logging;
67 using namespace utils;
68 using namespace joblist;
69
70 namespace primitiveprocessor
71 {
72
73 #ifdef PRIMPROC_STOPWATCH
74 #include "poormanprofiler.inc"
75 #endif
76
77 // these are config parms defined in primitiveserver.cpp, initialized by PrimProc main().
78 extern uint32_t blocksReadAhead;
79 extern uint32_t dictBufferSize;
80 extern uint32_t defaultBufferSize;
81 extern int fCacheCount;
82 extern uint32_t connectionsPerUM;
83 extern int noVB;
84
85 // copied from https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
nextPowOf2(uint x)86 uint nextPowOf2(uint x)
87 {
88 x--;
89 x |= x >> 1;
90 x |= x >> 2;
91 x |= x >> 4;
92 x |= x >> 8;
93 x |= x >> 16;
94 x++;
95 return x;
96 }
97
BatchPrimitiveProcessor()98 BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
99 ot(BPS_ELEMENT_TYPE),
100 txnID(0),
101 sessionID(0),
102 stepID(0),
103 uniqueID(0),
104 count(1),
105 baseRid(0),
106 ridCount(0),
107 needStrValues(false),
108 filterCount(0),
109 projectCount(0),
110 sendRidsAtDelivery(false),
111 ridMap(0),
112 gotAbsRids(false),
113 gotValues(false),
114 hasScan(false),
115 validCPData(false),
116 minVal(MAX64),
117 maxVal(MIN64),
118 lbidForCP(0),
119 busyLoaderCount(0),
120 physIO(0),
121 cachedIO(0),
122 touchedBlocks(0),
123 LBIDTrace(false),
124 fBusy(false),
125 doJoin(false),
126 hasFilterStep(false),
127 filtOnString(false),
128 prefetchThreshold(0),
129 hasDictStep(false),
130 sockIndex(0),
131 endOfJoinerRan(false),
132 processorThreads(0),
133 ptMask(0),
134 firstInstance(false)
135 {
136 pp.setLogicalBlockMode(true);
137 pp.setBlockPtr((int*) blockData);
138 pthread_mutex_init(&objLock, NULL);
139 }
140
BatchPrimitiveProcessor(ByteStream & b,double prefetch,boost::shared_ptr<BPPSendThread> bppst,uint _processorThreads)141 BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
142 boost::shared_ptr<BPPSendThread> bppst, uint _processorThreads) :
143 ot(BPS_ELEMENT_TYPE),
144 txnID(0),
145 sessionID(0),
146 stepID(0),
147 uniqueID(0),
148 count(1),
149 baseRid(0),
150 ridCount(0),
151 needStrValues(false),
152 filterCount(0),
153 projectCount(0),
154 sendRidsAtDelivery(false),
155 ridMap(0),
156 gotAbsRids(false),
157 gotValues(false),
158 hasScan(false),
159 validCPData(false),
160 minVal(MAX64),
161 maxVal(MIN64),
162 lbidForCP(0),
163 busyLoaderCount(0),
164 physIO(0),
165 cachedIO(0),
166 touchedBlocks(0),
167 LBIDTrace(false),
168 fBusy(false),
169 doJoin(false),
170 hasFilterStep(false),
171 filtOnString(false),
172 prefetchThreshold(prefetch),
173 hasDictStep(false),
174 sockIndex(0),
175 endOfJoinerRan(false),
176 processorThreads(_processorThreads),
177 //processorThreads(32),
178 //ptMask(processorThreads - 1),
179 firstInstance(true)
180 {
181 // promote processorThreads to next power of 2. also need to change the name to bucketCount or similar
182 processorThreads = nextPowOf2(processorThreads);
183 ptMask = processorThreads - 1;
184
185 pp.setLogicalBlockMode(true);
186 pp.setBlockPtr((int*) blockData);
187 sendThread = bppst;
188 pthread_mutex_init(&objLock, NULL);
189 initBPP(b);
190 }
191
192 #if 0
193 BatchPrimitiveProcessor::BatchPrimitiveProcessor(const BatchPrimitiveProcessor& bpp)
194 {
195 throw logic_error("copy BPP deprecated");
196 }
197 #endif
198
~BatchPrimitiveProcessor()199 BatchPrimitiveProcessor::~BatchPrimitiveProcessor()
200 {
201 //FIXME: just do a sync fetch
202 counterLock.lock(); // need to make sure the loader has exited
203
204 while (busyLoaderCount > 0)
205 {
206 counterLock.unlock();
207 usleep(100000);
208 counterLock.lock();
209 }
210
211 counterLock.unlock();
212 pthread_mutex_destroy(&objLock);
213 }
214
215 /**
216 * InitBPP Parses the creation messages from BatchPrimitiveProcessor-JL::createBPP()
217 * Refer to that fcn for message format info.
218 */
219
initBPP(ByteStream & bs)220 void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
221 {
222 uint32_t i;
223 uint8_t tmp8;
224 Command::CommandType type;
225
226 bs.advance(sizeof(ISMPacketHeader)); // skip the header
227 bs >> tmp8;
228 ot = static_cast<BPSOutputType>(tmp8);
229 bs >> txnID;
230 bs >> sessionID;
231 bs >> stepID;
232 bs >> uniqueID;
233 bs >> versionInfo;
234
235 bs >> tmp8;
236 needStrValues = tmp8 & NEED_STR_VALUES;
237 gotAbsRids = tmp8 & GOT_ABS_RIDS;
238 gotValues = tmp8 & GOT_VALUES;
239 LBIDTrace = tmp8 & LBID_TRACE;
240 sendRidsAtDelivery = tmp8 & SEND_RIDS_AT_DELIVERY;
241 doJoin = tmp8 & HAS_JOINER;
242 hasRowGroup = tmp8 & HAS_ROWGROUP;
243 getTupleJoinRowGroupData = tmp8 & JOIN_ROWGROUP_DATA;
244
245 // This used to signify that there was input row data from previous jobsteps, and
246 // it never quite worked right. No need to fix it or update it; all BPP's have started
247 // with a scan for years. Took it out.
248 assert(!hasRowGroup);
249
250 bs >> bop;
251 bs >> forHJ;
252
253 if (ot == ROW_GROUP)
254 {
255 bs >> outputRG;
256 //outputRG.setUseStringTable(true);
257 bs >> tmp8;
258
259 if (tmp8)
260 {
261 fe1.reset(new FuncExpWrapper());
262 bs >> *fe1;
263 bs >> fe1Input;
264 }
265
266 bs >> tmp8;
267
268 if (tmp8)
269 {
270 fe2.reset(new FuncExpWrapper());
271 bs >> *fe2;
272 bs >> fe2Output;
273 }
274 }
275
276 if (doJoin)
277 {
278 pthread_mutex_lock(&objLock);
279
280 if (ot == ROW_GROUP)
281 {
282 bs >> joinerCount;
283 // cout << "joinerCount = " << joinerCount << endl;
284 joinTypes.reset(new JoinType[joinerCount]);
285
286 tJoiners.reset(new boost::shared_array<boost::shared_ptr<TJoiner> >[joinerCount]);
287 for (uint j = 0; j < joinerCount; ++j)
288 tJoiners[j].reset(new boost::shared_ptr<TJoiner>[processorThreads]);
289
290 //_pools.reset(new boost::shared_ptr<utils::SimplePool>[joinerCount]);
291 tlJoiners.reset(new boost::shared_array<boost::shared_ptr<TLJoiner> >[joinerCount]);
292 for (uint j = 0; j < joinerCount; ++j)
293 tlJoiners[j].reset(new boost::shared_ptr<TLJoiner>[processorThreads]);
294
295 addToJoinerLocks.reset(new boost::scoped_array<boost::mutex>[joinerCount]);
296 for (uint j = 0; j < joinerCount; ++j)
297 addToJoinerLocks[j].reset(new boost::mutex[processorThreads]);
298
299 smallSideDataLocks.reset(new boost::mutex[joinerCount]);
300 tJoinerSizes.reset(new std::atomic<uint32_t>[joinerCount]);
301 largeSideKeyColumns.reset(new uint32_t[joinerCount]);
302 tlLargeSideKeyColumns.reset(new vector<uint32_t>[joinerCount]);
303 typelessJoin.reset(new bool[joinerCount]);
304 tlKeyLengths.reset(new uint32_t[joinerCount]);
305
306 storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
307 for (uint j = 0; j < joinerCount; ++j)
308 storedKeyAllocators[j].setUseLock(true);
309
310 joinNullValues.reset(new uint64_t[joinerCount]);
311 doMatchNulls.reset(new bool[joinerCount]);
312 joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
313 hasJoinFEFilters = false;
314 hasSmallOuterJoin = false;
315
316 for (i = 0; i < joinerCount; i++)
317 {
318 doMatchNulls[i] = false;
319 uint32_t tmp32;
320 bs >> tmp32;
321 tJoinerSizes[i] = tmp32;
322 //bs >> tJoinerSizes[i];
323 //cout << "joiner size = " << tJoinerSizes[i] << endl;
324 bs >> joinTypes[i];
325 bs >> tmp8;
326 typelessJoin[i] = (bool) tmp8;
327
328 if (joinTypes[i] & WITHFCNEXP)
329 {
330 hasJoinFEFilters = true;
331 joinFEFilters[i].reset(new FuncExpWrapper());
332 bs >> *joinFEFilters[i];
333 }
334
335 if (joinTypes[i] & SMALLOUTER)
336 hasSmallOuterJoin = true;
337
338 if (!typelessJoin[i])
339 {
340 bs >> joinNullValues[i];
341 bs >> largeSideKeyColumns[i];
342 //cout << "large side key is " << largeSideKeyColumns[i] << endl;
343 for (uint j = 0; j < processorThreads; ++j)
344 tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher()));
345 }
346 else
347 {
348 deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
349 bs >> tlKeyLengths[i];
350 //storedKeyAllocators[i] = PoolAllocator();
351 for (uint j = 0; j < processorThreads; ++j)
352 tlJoiners[i][j].reset(new TLJoiner(10,
353 TupleJoiner::TypelessDataHasher(&outputRG,
354 &tlLargeSideKeyColumns[i]),
355 TupleJoiner::TypelessDataComparator(&outputRG,
356 &tlLargeSideKeyColumns[i])));
357 }
358 }
359
360 if (hasJoinFEFilters)
361 {
362 joinFERG.reset(new RowGroup());
363 bs >> *joinFERG;
364 }
365
366 if (getTupleJoinRowGroupData)
367 {
368 deserializeVector(bs, smallSideRGs);
369 // cout << "deserialized " << smallSideRGs.size() << " small-side rowgroups\n";
370 idbassert(smallSideRGs.size() == joinerCount);
371 smallSideRowLengths.reset(new uint32_t[joinerCount]);
372 smallSideRowData.reset(new RGData[joinerCount]);
373 smallNullRowData.reset(new RGData[joinerCount]);
374 smallNullPointers.reset(new Row::Pointer[joinerCount]);
375 ssrdPos.reset(new uint64_t[joinerCount]);
376
377 for (i = 0; i < joinerCount; i++)
378 {
379 smallSideRowLengths[i] = smallSideRGs[i].getRowSize();;
380 smallSideRowData[i] = RGData(smallSideRGs[i], tJoinerSizes[i]);
381 // smallSideRowData[i].reset(new uint8_t[
382 // smallSideRGs[i].getEmptySize() +
383 // (uint64_t) smallSideRowLengths[i] * tJoinerSizes[i]]);
384 smallSideRGs[i].setData(&smallSideRowData[i]);
385 smallSideRGs[i].resetRowGroup(0);
386 ssrdPos[i] = smallSideRGs[i].getEmptySize();
387
388 if (joinTypes[i] & (LARGEOUTER | SEMI | ANTI))
389 {
390 Row smallRow;
391 smallSideRGs[i].initRow(&smallRow);
392 smallNullRowData[i] = RGData(smallSideRGs[i], 1);
393 smallSideRGs[i].setData(&smallNullRowData[i]);
394 smallSideRGs[i].getRow(0, &smallRow);
395 smallRow.initToNull();
396 smallNullPointers[i] = smallRow.getPointer();
397 smallSideRGs[i].setData(&smallSideRowData[i]);
398 }
399 }
400
401 bs >> largeSideRG;
402 bs >> joinedRG;
403 // cout << "got the joined Rowgroup: " << joinedRG.toString() << "\n";
404 }
405 }
406
407 #ifdef __FreeBSD__
408 pthread_mutex_unlock(&objLock);
409 #endif
410 }
411
412 bs >> filterCount;
413 filterSteps.resize(filterCount);
414 //cout << "deserializing " << filterCount << " filters\n";
415 hasScan = false;
416 hasPassThru = false;
417
418 for (i = 0; i < filterCount; ++i)
419 {
420 //cout << "deserializing step " << i << endl;
421 filterSteps[i] = SCommand(Command::makeCommand(bs, &type, filterSteps));
422
423 if (type == Command::COLUMN_COMMAND)
424 {
425 ColumnCommand* col = (ColumnCommand*) filterSteps[i].get();
426
427 if (col->isScan())
428 hasScan = true;
429
430 if (bop == BOP_OR)
431 col->setScan(true);
432 }
433 else if (type == Command::FILTER_COMMAND)
434 {
435 hasFilterStep = true;
436
437 if (dynamic_cast<StrFilterCmd*>(filterSteps[i].get()) != NULL)
438 filtOnString = true;
439 }
440 else if (type == Command::DICT_STEP || type == Command::RID_TO_STRING)
441 hasDictStep = true;
442 }
443
444 bs >> projectCount;
445 //cout << "deserializing " << projectCount << " projected columns\n\n";
446 projectSteps.resize(projectCount);
447
448 for (i = 0; i < projectCount; ++i)
449 {
450 //cout << "deserializing step " << i << endl;
451 projectSteps[i] = SCommand(Command::makeCommand(bs, &type, projectSteps));
452
453 if (type == Command::PASS_THRU)
454 hasPassThru = true;
455 else if (type == Command::DICT_STEP || type == Command::RID_TO_STRING)
456 hasDictStep = true;
457 }
458
459 if (ot == ROW_GROUP)
460 {
461 bs >> tmp8;
462
463 if (tmp8 > 0)
464 {
465 bs >> fAggregateRG;
466 fAggregator.reset(new RowAggregation);
467 bs >> *(fAggregator.get());
468
469 // If there's UDAF involved, set up for PM processing
470 for (uint64_t i = 0; i < fAggregator->getAggFunctions().size(); i++)
471 {
472 RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fAggregator->getAggFunctions()[i].get());
473
474 if (rowUDAF)
475 {
476 // On the PM, the aux column is not sent, but rather is output col + 1.
477 rowUDAF->fAuxColumnIndex = rowUDAF->fOutputColumnIndex + 1;
478 // Set the PM flag in case the UDAF cares.
479 rowUDAF->fUDAFContext.setContextFlags(rowUDAF->fUDAFContext.getContextFlags()
480 | mcsv1sdk::CONTEXT_IS_PM);
481 }
482 }
483 }
484 }
485
486 initProcessor();
487 }
488
489 /**
490 * resetBPP Parses the run messages from BatchPrimitiveProcessor-JL::runBPP()
491 * Refer to that fcn for message format info.
492 */
resetBPP(ByteStream & bs,const SP_UM_MUTEX & w,const SP_UM_IOSOCK & s)493 void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w,
494 const SP_UM_IOSOCK& s)
495 {
496 uint32_t i;
497 vector<uint64_t> preloads;
498
499 pthread_mutex_lock(&objLock);
500
501 writelock = w;
502 sock = s;
503 newConnection = true;
504
505 // skip the header, sessionID, stepID, uniqueID, and priority
506 bs.advance(sizeof(ISMPacketHeader) + 16);
507 bs >> dbRoot;
508 bs >> count;
509 bs >> ridCount;
510
511 if (gotAbsRids)
512 {
513 assert(0);
514 memcpy(absRids.get(), bs.buf(), ridCount << 3);
515 bs.advance(ridCount << 3);
516 /* TODO: this loop isn't always necessary or sensible */
517 ridMap = 0;
518 baseRid = absRids[0] & 0xffffffffffffe000ULL;
519
520 for (uint32_t i = 0; i < ridCount; i++)
521 {
522 relRids[i] = absRids[i] - baseRid;
523 ridMap |= 1 << (relRids[i] >> 10);
524 }
525 }
526 else
527 {
528 bs >> ridMap;
529 bs >> baseRid;
530 memcpy(relRids, bs.buf(), ridCount << 1);
531 bs.advance(ridCount << 1);
532 }
533
534 if (gotValues)
535 {
536 memcpy(values, bs.buf(), ridCount << 3);
537 bs.advance(ridCount << 3);
538 }
539
540 for (i = 0; i < filterCount; ++i)
541 {
542 filterSteps[i]->resetCommand(bs);
543 }
544
545 for (i = 0; i < projectCount; ++i)
546 {
547 projectSteps[i]->resetCommand(bs);
548 }
549
550 idbassert(bs.length() == 0);
551
552 /* init vars not part of the BS */
553 currentBlockOffset = 0;
554 memset(relLBID.get(), 0, sizeof(uint64_t) * (projectCount + 1));
555 memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 1));
556
557 buildVSSCache(count);
558 #ifdef __FreeBSD__
559 pthread_mutex_unlock(&objLock);
560 #endif
561 }
562
563 // This version of addToJoiner() is multithreaded. Values are first
564 // hashed into thread-local vectors corresponding to the shared hash
565 // tables. Once the incoming values are organized locally, it grabs
566 // the lock for each shared table and inserts them there.
addToJoiner(ByteStream & bs)567 void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
568 {
569 /* to get wall-time of hash table construction
570 idbassert(processorThreads != 0);
571 if (firstCallTime.is_not_a_date_time())
572 firstCallTime = boost::posix_time::microsec_clock::universal_time();
573 */
574
575 uint32_t count, i, joinerNum, tlIndex, startPos, bucket;
576 #pragma pack(push,1)
577 struct JoinerElements
578 {
579 uint64_t key;
580 uint32_t value;
581 } *arr;
582 #pragma pack(pop)
583
584 /* skip the header */
585 bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t));
586
587 bs >> count;
588 bs >> startPos;
589
590 if (ot == ROW_GROUP)
591 {
592 bs >> joinerNum;
593 idbassert(joinerNum < joinerCount);
594 arr = (JoinerElements*) bs.buf();
595
596 std::atomic<uint32_t> &tJoinerSize = tJoinerSizes[joinerNum];
597
598 // XXXPAT: enormous if stmts are evil. TODO: move each block into
599 // properly-named functions for clarity.
600 if (typelessJoin[joinerNum])
601 {
602 utils::VLArray<vector<pair<TypelessData, uint32_t> > > tmpBuckets(processorThreads);
603 TypelessData tlLargeKey;
604 uint8_t nullFlag;
605 PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum];
606 // this first loop hashes incoming values into vectors that parallel the hash tables.
607 uint nullCount = 0;
608 for (i = 0; i < count; ++i)
609 {
610 bs >> nullFlag;
611 if (nullFlag == 0)
612 {
613 tlLargeKey.deserialize(bs, storedKeyAllocator);
614 bs >> tlIndex;
615 bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[joinerNum]) & ptMask;
616 tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex));
617 }
618 else
619 ++nullCount;
620 }
621 tJoinerSize -= nullCount;
622
623 bool done = false, didSomeWork;
624 //uint loopCounter = 0, noWorkCounter = 0;
625 // this loop moves the elements from each vector into its corresponding hash table.
626 while (!done)
627 {
628 //++loopCounter;
629 done = true;
630 didSomeWork = false;
631 for (i = 0; i < processorThreads; ++i)
632 {
633 if (!tmpBuckets[i].empty())
634 {
635 bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
636 if (!gotIt)
637 {
638 done = false; // didn't get it, don't block, try the next bucket
639 continue;
640 }
641 for (auto &element : tmpBuckets[i])
642 tlJoiners[joinerNum][i]->insert(element);
643 addToJoinerLocks[joinerNum][i].unlock();
644 tmpBuckets[i].clear();
645 didSomeWork = true;
646 }
647 }
648 // if this iteration did no useful work, everything we need is locked; wait briefly
649 // and try again.
650 if (!done && !didSomeWork)
651 {
652 ::usleep(500 * processorThreads);
653 //++noWorkCounter;
654 }
655 }
656 //cout << "TL join insert. Took " << loopCounter << " loops" << endl;
657
658 }
659 else
660 {
661 boost::shared_array<boost::shared_ptr<TJoiner> > tJoiner = tJoiners[joinerNum];
662 uint64_t nullValue = joinNullValues[joinerNum];
663 bool &l_doMatchNulls = doMatchNulls[joinerNum];
664 joblist::JoinType joinType = joinTypes[joinerNum];
665 utils::VLArray<vector<pair<uint64_t, uint32_t> > > tmpBuckets(processorThreads);
666
667 if (joinType & MATCHNULLS)
668 {
669 // this first loop hashes incoming values into vectors that parallel the hash tables.
670 for (i = 0; i < count; ++i)
671 {
672 /* A minor optimization: the matchnull logic should only be used with
673 * the jointype specifies it and there's a null value in the small side */
674 if (!l_doMatchNulls && arr[i].key == nullValue)
675 l_doMatchNulls = true;
676 bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask;
677 tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
678 }
679
680
681 bool done = false, didSomeWork;
682 //uint loopCounter = 0, noWorkCounter = 0;
683 // this loop moves the elements from each vector into its corresponding hash table.
684 while (!done)
685 {
686 //++loopCounter;
687 done = true;
688 didSomeWork = false;
689 for (i = 0; i < processorThreads; ++i)
690 {
691 if (!tmpBuckets[i].empty())
692 {
693 bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
694 if (!gotIt)
695 {
696 done = false; // didn't get it, don't block, try the next bucket
697 continue;
698 }
699 for (auto &element : tmpBuckets[i])
700 tJoiners[joinerNum][i]->insert(element);
701 addToJoinerLocks[joinerNum][i].unlock();
702 tmpBuckets[i].clear();
703 didSomeWork = true;
704 }
705 }
706 // if this iteration did no useful work, everything we need is locked; wait briefly
707 // and try again.
708 if (!done && !didSomeWork)
709 {
710 ::usleep(500 * processorThreads);
711 //++noWorkCounter;
712 }
713 }
714
715 //cout << "T numeric join insert. Took " << loopCounter << " loops" << endl;
716 }
717 else
718 {
719 // this first loop hashes incoming values into vectors that parallel the hash tables.
720 for (i = 0; i < count; ++i)
721 {
722 bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask;
723 tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
724 }
725
726 bool done = false;
727 bool didSomeWork;
728 //uint loopCounter = 0, noWorkCounter = 0;
729 // this loop moves the elements from each vector into its corresponding hash table.
730 while (!done)
731 {
732 //++loopCounter;
733 done = true;
734 didSomeWork = false;
735 for (i = 0; i < processorThreads; ++i)
736 {
737 if (!tmpBuckets[i].empty())
738 {
739 bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
740 if (!gotIt)
741 {
742 done = false; // didn't get it, don't block, try the next bucket
743 continue;
744 }
745 for (auto &element : tmpBuckets[i])
746 tJoiners[joinerNum][i]->insert(element);
747 addToJoinerLocks[joinerNum][i].unlock();
748 tmpBuckets[i].clear();
749 didSomeWork = true;
750 }
751 }
752 // if this iteration did no useful work, everything we need is locked; wait briefly
753 // and try again.
754 if (!done && !didSomeWork)
755 {
756 ::usleep(500 * processorThreads);
757 //++noWorkCounter;
758 }
759
760 }
761 //cout << "T numeric join insert 2. Took " << loopCounter << " loops," <<
762 // " unproductive iterations = " << noWorkCounter << endl;
763 }
764 }
765
766 if (!typelessJoin[joinerNum])
767 bs.advance(count * sizeof(JoinerElements));
768
769 if (getTupleJoinRowGroupData)
770 {
771 RowGroup& smallSide = smallSideRGs[joinerNum];
772 RGData offTheWire;
773
774 // TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid
775 // the extra copying.
776 offTheWire.deserialize(bs);
777 boost::mutex::scoped_lock lk(smallSideDataLocks[joinerNum]);
778 smallSide.setData(&smallSideRowData[joinerNum]);
779 smallSide.append(offTheWire, startPos);
780
781 /* This prints the row data
782 smallSideRGs[joinerNum].initRow(&r);
783 for (i = 0; i < (tJoinerSizes[joinerNum] * smallSideRowLengths[joinerNum]); i+=r.getSize()) {
784 r.setData(&smallSideRowData[joinerNum][i + smallSideRGs[joinerNum].getEmptySize()]);
785 cout << " got row: " << r.toString() << endl;
786 }
787 */
788 }
789 }
790
791 idbassert(bs.length() == 0);
792 }
793
doneSendingJoinerData()794 void BatchPrimitiveProcessor::doneSendingJoinerData()
795 {
796 /* to get wall-time of hash table construction
797 if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000))
798 {
799 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
800 Logger logger;
801 ostringstream os;
802 os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime;
803 logger.logMessage(os.str());
804 cout << os.str() << endl;
805 }
806 */
807 }
808
endOfJoiner()809 int BatchPrimitiveProcessor::endOfJoiner()
810 {
811 /* Wait for all joiner elements to be added */
812 uint32_t i;
813 size_t currentSize;
814 // it should be safe to run this without grabbing this lock
815 //boost::mutex::scoped_lock scoped(addToJoinerLock);
816
817 if (endOfJoinerRan)
818 return 0;
819
820 // minor hack / optimization. The instances not inserting the table data don't
821 // need to check that the table is complete.
822 if (!firstInstance)
823 {
824 endOfJoinerRan = true;
825 pthread_mutex_unlock(&objLock);
826 return 0;
827 }
828
829 for (i = 0; i < joinerCount; i++)
830 {
831 if (!typelessJoin[i])
832 {
833 currentSize = 0;
834 for (uint j = 0; j < processorThreads; ++j)
835 if (!tJoiners[i] || !tJoiners[i][j])
836 return -1;
837 else
838 currentSize += tJoiners[i][j]->size();
839 if (currentSize != tJoinerSizes[i])
840 return -1;
841 //if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
842 // return -1;
843 }
844 else
845 {
846 currentSize = 0;
847 for (uint j = 0; j < processorThreads; ++j)
848 if (!tlJoiners[i] || !tlJoiners[i][j])
849 return -1;
850 else
851 currentSize += tlJoiners[i][j]->size();
852 if (currentSize != tJoinerSizes[i])
853 return -1;
854 //if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
855 // return -1;
856 }
857 }
858
859 endOfJoinerRan = true;
860
861 #ifndef __FreeBSD__
862 pthread_mutex_unlock(&objLock);
863 #endif
864 return 0;
865 }
866
initProcessor()867 void BatchPrimitiveProcessor::initProcessor()
868 {
869 uint32_t i, j;
870
871 if (gotAbsRids || needStrValues || hasRowGroup)
872 absRids.reset(new uint64_t[LOGICAL_BLOCK_RIDS]);
873
874 if (needStrValues)
875 strValues.reset(new string[LOGICAL_BLOCK_RIDS]);
876
877 outMsgSize = defaultBufferSize;
878 outputMsg.reset(new uint8_t[outMsgSize]);
879
880 if (ot == ROW_GROUP)
881 {
882 // calculate the projection -> outputRG mapping
883 projectionMap.reset(new int[projectCount]);
884 bool* reserved = (bool*)alloca(outputRG.getColumnCount() * sizeof(bool));
885
886 for (i = 0; i < outputRG.getColumnCount(); i++)
887 reserved[i] = false;
888
889 for (i = 0; i < projectCount; i++)
890 {
891 for (j = 0; j < outputRG.getColumnCount(); j++)
892 if (projectSteps[i]->getTupleKey() == outputRG.getKeys()[j] && !reserved[j])
893 {
894 projectionMap[i] = j;
895 reserved[j] = true;
896 break;
897 }
898
899 if (j == outputRG.getColumnCount())
900 projectionMap[i] = -1;
901 }
902
903 if (doJoin)
904 {
905 outputRG.initRow(&oldRow);
906 outputRG.initRow(&newRow);
907 tmpKeyAllocators.reset(new FixedAllocator[joinerCount]);
908
909 for (i = 0; i < joinerCount; i++)
910 if (typelessJoin[i])
911 tmpKeyAllocators[i] = FixedAllocator(tlKeyLengths[i], true);
912
913 tSmallSideMatches.reset(new MatchedData[joinerCount]);
914 keyColumnProj.reset(new bool[projectCount]);
915
916 for (i = 0; i < projectCount; i++)
917 {
918 keyColumnProj[i] = false;
919
920 for (j = 0; j < joinerCount; j++)
921 {
922 if (!typelessJoin[j])
923 {
924 if (projectionMap[i] == (int) largeSideKeyColumns[j])
925 {
926 keyColumnProj[i] = true;
927 break;
928 }
929 }
930 else
931 {
932 for (uint32_t k = 0; k < tlLargeSideKeyColumns[j].size(); k++)
933 {
934 if (projectionMap[i] == (int) tlLargeSideKeyColumns[j][k])
935 {
936 keyColumnProj[i] = true;
937 break;
938 }
939 }
940 }
941 }
942 }
943
944 if (hasJoinFEFilters)
945 {
946 joinFERG->initRow(&joinFERow, true);
947 joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
948 joinFERow.setData(joinFERowData.get());
949 joinFEMappings.reset(new shared_array<int>[joinerCount + 1]);
950
951 for (i = 0; i < joinerCount; i++)
952 joinFEMappings[i] = makeMapping(smallSideRGs[i], *joinFERG);
953
954 joinFEMappings[joinerCount] = makeMapping(largeSideRG, *joinFERG);
955 }
956 }
957
958 /*
959 Calculate the FE1 -> projection mapping
960 Calculate the projection step -> FE1 input mapping
961 */
962 if (fe1)
963 {
964 fe1ToProjection = makeMapping(fe1Input, outputRG);
965 projectForFE1.reset(new int[projectCount]);
966 bool* reserved = (bool*)alloca(fe1Input.getColumnCount() * sizeof(bool));
967
968 for (i = 0; i < fe1Input.getColumnCount(); i++)
969 reserved[i] = false;
970
971 for (i = 0; i < projectCount; i++)
972 {
973 projectForFE1[i] = -1;
974
975 for (j = 0; j < fe1Input.getColumnCount(); j++)
976 {
977 if (projectSteps[i]->getTupleKey() == fe1Input.getKeys()[j] && !reserved[j])
978 {
979 reserved[j] = true;
980 projectForFE1[i] = j;
981 break;
982 }
983 }
984 }
985
986 fe1Input.initRow(&fe1In);
987 outputRG.initRow(&fe1Out);
988 }
989
990 if (fe2)
991 {
992 fe2Input = (doJoin ? &joinedRG : &outputRG);
993 fe2Mapping = makeMapping(*fe2Input, fe2Output);
994 fe2Input->initRow(&fe2In);
995 fe2Output.initRow(&fe2Out);
996 }
997
998 if (getTupleJoinRowGroupData)
999 {
1000 gjrgPlaceHolders.reset(new uint32_t[joinerCount]);
1001 outputRG.initRow(&largeRow);
1002 joinedRG.initRow(&joinedRow);
1003 joinedRG.initRow(&baseJRow, true);
1004 smallRows.reset(new Row[joinerCount]);
1005
1006 for (i = 0; i < joinerCount; i++)
1007 smallSideRGs[i].initRow(&smallRows[i], true);
1008
1009 baseJRowMem.reset(new uint8_t[baseJRow.getSize()]);
1010 baseJRow.setData(baseJRowMem.get());
1011 gjrgMappings.reset(new shared_array<int>[joinerCount + 1]);
1012
1013 for (i = 0; i < joinerCount; i++)
1014 gjrgMappings[i] = makeMapping(smallSideRGs[i], joinedRG);
1015
1016 gjrgMappings[joinerCount] = makeMapping(outputRG, joinedRG);
1017 }
1018 }
1019
1020 // @bug 1051
1021 if (hasFilterStep)
1022 {
1023 for (uint64_t i = 0; i < 2; i++)
1024 {
1025 fFiltRidCount[i] = 0;
1026 fFiltCmdRids[i].reset(new uint16_t[LOGICAL_BLOCK_RIDS]);
1027 fFiltCmdValues[i].reset(new int64_t[LOGICAL_BLOCK_RIDS]);
1028
1029 if (filtOnString) fFiltStrValues[i].reset(new string[LOGICAL_BLOCK_RIDS]);
1030 }
1031 }
1032
1033 /* init the Commands */
1034 if (filterCount > 0)
1035 {
1036 for (i = 0; i < (uint32_t) filterCount - 1; ++i)
1037 {
1038 // cout << "prepping filter " << i << endl;
1039 filterSteps[i]->setBatchPrimitiveProcessor(this);
1040
1041 if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP)
1042 filterSteps[i]->prep(OT_BOTH, true);
1043 else if (filterSteps[i]->filterFeeder() != Command::NOT_FEEDER)
1044 filterSteps[i]->prep(OT_BOTH, false);
1045 else
1046 filterSteps[i]->prep(OT_RID, false);
1047 }
1048
1049 // cout << "prepping filter " << i << endl;
1050 filterSteps[i]->setBatchPrimitiveProcessor(this);
1051 filterSteps[i]->prep(OT_BOTH, false);
1052 }
1053
1054 for (i = 0; i < projectCount; ++i)
1055 {
1056 // cout << "prepping projection " << i << endl;
1057 projectSteps[i]->setBatchPrimitiveProcessor(this);
1058
1059 if (noVB)
1060 projectSteps[i]->prep(OT_BOTH, false);
1061 else
1062 projectSteps[i]->prep(OT_DATAVALUE, false);
1063
1064 if (0 < filterCount )
1065 {
1066 //if there is an rtscommand with a passThru, the passThru must make its own absRids
1067 //unless there is only one project step, then the last filter step can make absRids
1068 RTSCommand* rts = dynamic_cast<RTSCommand*>(projectSteps[i].get());
1069
1070 if (rts && rts->isPassThru())
1071 {
1072 if (1 == projectCount)
1073 filterSteps[filterCount - 1]->setMakeAbsRids(true);
1074 else rts->setAbsNull();
1075 }
1076 }
1077 }
1078
1079 if (fAggregator.get() != NULL)
1080 {
1081 //fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]);
1082 fAggRowGroupData.reinit(fAggregateRG);
1083 fAggregateRG.setData(&fAggRowGroupData);
1084
1085 if (doJoin)
1086 {
1087 fAggregator->setInputOutput(fe2 ? fe2Output : joinedRG, &fAggregateRG);
1088 fAggregator->setJoinRowGroups(&smallSideRGs, &largeSideRG);
1089 }
1090 else
1091 fAggregator->setInputOutput(fe2 ? fe2Output : outputRG, &fAggregateRG);
1092 }
1093
1094 minVal = MAX64;
1095 maxVal = MIN64;
1096
1097 // @bug 1269, initialize data used by execute() for async loading blocks
1098 // +1 for the scan filter step with no predicate, if any
1099 relLBID.reset(new uint64_t[projectCount + 1]);
1100 asyncLoaded.reset(new bool[projectCount + 1]);
1101 }
1102
1103 /* This version does a join on projected rows */
executeTupleJoin()1104 void BatchPrimitiveProcessor::executeTupleJoin()
1105 {
1106 uint32_t newRowCount = 0, i, j;
1107 vector<uint32_t> matches;
1108 uint64_t largeKey;
1109 TypelessData tlLargeKey;
1110
1111 outputRG.getRow(0, &oldRow);
1112 outputRG.getRow(0, &newRow);
1113
1114 //cout << "before join, RG has " << outputRG.getRowCount() << " BPP ridcount= " << ridCount << endl;
1115 for (i = 0; i < ridCount && !sendThread->aborted(); i++, oldRow.nextRow())
1116 {
1117 /* Decide whether this large-side row belongs in the output. The breaks
1118 * in the loop mean that it doesn't.
1119 *
1120 * In English the logic is:
1121 * Reject the row if there's no match and it's not an anti or an outer join
1122 * or if there is a match and it's an anti join with no filter.
1123 * If there's an antijoin with a filter nothing can be eliminated at this stage.
1124 * If there's an antijoin where the join op should match NULL values, and there
1125 * are NULL values to match against, but there is no filter, all rows can be eliminated.
1126 */
1127
1128 //cout << "large side row: " << oldRow.toString() << endl;
1129 for (j = 0; j < joinerCount; j++)
1130 {
1131 bool found;
1132
1133 if (UNLIKELY(joinTypes[j] & ANTI))
1134 {
1135 if (joinTypes[j] & WITHFCNEXP)
1136 continue;
1137 else if (doMatchNulls[j])
1138 break;
1139 }
1140
1141
1142 if (LIKELY(!typelessJoin[j]))
1143 {
1144 //cout << "not typeless join\n";
1145 bool isNull;
1146 uint32_t colIndex = largeSideKeyColumns[j];
1147
1148 if (oldRow.isUnsigned(colIndex))
1149 largeKey = oldRow.getUintField(colIndex);
1150 else
1151 largeKey = oldRow.getIntField(colIndex);
1152 uint bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask;
1153
1154 bool joinerIsEmpty = tJoiners[j][bucket]->empty() ? true : false;
1155
1156 found = (tJoiners[j][bucket]->find(largeKey) != tJoiners[j][bucket]->end());
1157 isNull = oldRow.isNullValue(colIndex);
1158 /* These conditions define when the row is NOT in the result set:
1159 * - if the key is not in the small side, and the join isn't a large-outer or anti join
1160 * - if the key is NULL, and the join isn't anti- or large-outer
1161 * - if it's an anti-join and the key is either in the small side or it's NULL
1162 */
1163
1164 if (((!found || isNull) && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
1165 ((joinTypes[j] & ANTI) && !joinerIsEmpty && ((isNull && (joinTypes[j] & MATCHNULLS)) || (found && !isNull))))
1166 {
1167 //cout << " - not in the result set\n";
1168 break;
1169 }
1170
1171 //else
1172 // cout << " - in the result set\n";
1173 }
1174 else
1175 {
1176 //cout << " typeless join\n";
1177 // the null values are not sent by UM in typeless case. null -> !found
1178 tlLargeKey = makeTypelessKey(oldRow, tlLargeSideKeyColumns[j], tlKeyLengths[j],
1179 &tmpKeyAllocators[j]);
1180 uint bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[j]) & ptMask;
1181 found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end();
1182
1183 if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
1184 (joinTypes[j] & ANTI))
1185 {
1186
1187 /* Separated the ANTI join logic for readability.
1188 *
1189 */
1190 if (joinTypes[j] & ANTI)
1191 {
1192 if (found)
1193 break;
1194 else if (joinTypes[j] & MATCHNULLS)
1195 {
1196 bool hasNull = false;
1197
1198 for (uint32_t z = 0; z < tlLargeSideKeyColumns[j].size(); z++)
1199 if (oldRow.isNullValue(tlLargeSideKeyColumns[j][z]))
1200 {
1201 hasNull = true;
1202 break;
1203 }
1204
1205 if (hasNull) // keys with nulls match everything
1206 break;
1207 else
1208 continue; // non-null keys not in the small side
1209
1210 // are in the result
1211 }
1212 else // signifies a not-exists query
1213 continue;
1214 }
1215
1216 break;
1217 }
1218 }
1219 }
1220
1221 if (j == joinerCount)
1222 {
1223 for (j = 0; j < joinerCount; j++)
1224 {
1225 uint32_t matchCount;
1226
1227 /* The result is already known if...
1228 * -- anti-join with no fcnexp
1229 * -- semi-join with no fcnexp and not scalar
1230 *
1231 * The ANTI join case isn't just a shortcut. getJoinResults() will produce results
1232 * for a different case and generate the wrong result. Need to fix that, later.
1233 */
1234 if ((joinTypes[j] & (SEMI | ANTI)) && !(joinTypes[j] & WITHFCNEXP) && !(joinTypes[j] & SCALAR))
1235 {
1236 tSmallSideMatches[j][newRowCount].push_back(-1);
1237 continue;
1238 }
1239
1240 getJoinResults(oldRow, j, tSmallSideMatches[j][newRowCount]);
1241 matchCount = tSmallSideMatches[j][newRowCount].size();
1242
1243 if (joinTypes[j] & WITHFCNEXP)
1244 {
1245 vector<uint32_t> newMatches;
1246 applyMapping(joinFEMappings[joinerCount], oldRow, &joinFERow);
1247
1248 for (uint32_t k = 0; k < matchCount; k++)
1249 {
1250 if (tSmallSideMatches[j][newRowCount][k] == (uint32_t) - 1)
1251 smallRows[j].setPointer(smallNullPointers[j]);
1252 else
1253 {
1254 smallSideRGs[j].getRow(tSmallSideMatches[j][newRowCount][k], &smallRows[j]);
1255 //uint64_t rowOffset = ((uint64_t) tSmallSideMatches[j][newRowCount][k]) *
1256 // smallRows[j].getSize() + smallSideRGs[j].getEmptySize();
1257 //smallRows[j].setData(&smallSideRowData[j][rowOffset]);
1258 }
1259
1260 applyMapping(joinFEMappings[j], smallRows[j], &joinFERow);
1261
1262 if (joinFEFilters[j]->evaluate(&joinFERow))
1263 {
1264 /* The first match includes it in a SEMI join result and excludes it from an ANTI join
1265 * result. If it's SEMI & SCALAR however, it needs to continue.
1266 */
1267 newMatches.push_back(tSmallSideMatches[j][newRowCount][k]);
1268
1269 if ((joinTypes[j] & ANTI) || ((joinTypes[j] & (SEMI | SCALAR)) == SEMI))
1270 break;
1271 }
1272 }
1273
1274 tSmallSideMatches[j][newRowCount].swap(newMatches);
1275 matchCount = tSmallSideMatches[j][newRowCount].size();
1276 }
1277
1278 if (matchCount == 0 && (joinTypes[j] & LARGEOUTER))
1279 {
1280 tSmallSideMatches[j][newRowCount].push_back(-1);
1281 matchCount = 1;
1282 }
1283
1284 /* Scalar check */
1285 if ((joinTypes[j] & SCALAR) && matchCount > 1)
1286 throw scalar_exception();
1287
1288 /* Reverse the result for anti-join */
1289 if (joinTypes[j] & ANTI)
1290 {
1291 if (matchCount == 0)
1292 {
1293 tSmallSideMatches[j][newRowCount].push_back(-1);
1294 matchCount = 1;
1295 }
1296 else
1297 {
1298 tSmallSideMatches[j][newRowCount].clear();
1299 matchCount = 0;
1300 }
1301 }
1302
1303 /* For all join types, no matches here means it's not in the result */
1304 if (matchCount == 0)
1305 break;
1306
1307 /* Pair non-scalar semi-joins with a NULL row */
1308 if ((joinTypes[j] & SEMI) && !(joinTypes[j] & SCALAR))
1309 {
1310 tSmallSideMatches[j][newRowCount].clear();
1311 tSmallSideMatches[j][newRowCount].push_back(-1);
1312 matchCount = 1;
1313 }
1314 }
1315
1316 /* Finally, copy the row into the output */
1317 if (j == joinerCount)
1318 {
1319 if (i != newRowCount)
1320 {
1321 values[newRowCount] = values[i];
1322 relRids[newRowCount] = relRids[i];
1323 copyRow(oldRow, &newRow);
1324 //cout << "joined row: " << newRow.toString() << endl;
1325 //memcpy(newRow.getData(), oldRow.getData(), oldRow.getSize());
1326 }
1327
1328 newRowCount++;
1329 newRow.nextRow();
1330 }
1331
1332 //else
1333 // cout << "j != joinerCount\n";
1334 }
1335 }
1336
1337 ridCount = newRowCount;
1338 outputRG.setRowCount(ridCount);
1339
1340 /* prints out the whole result set.
1341 if (ridCount != 0) {
1342 cout << "RG rowcount=" << outputRG.getRowCount() << " BPP ridcount=" << ridCount << endl;
1343 for (i = 0; i < joinerCount; i++) {
1344 for (j = 0; j < ridCount; j++) {
1345 cout << "joiner " << i << " has " << tSmallSideMatches[i][j].size() << " entries" << endl;
1346 cout << "row " << j << ":";
1347 for (uint32_t k = 0; k < tSmallSideMatches[i][j].size(); k++)
1348 cout << " " << tSmallSideMatches[i][j][k];
1349 cout << endl;
1350 }
1351 cout << endl;
1352 }
1353 }
1354 */
1355 }
1356
1357 #ifdef PRIMPROC_STOPWATCH
execute(StopWatch * stopwatch)1358 void BatchPrimitiveProcessor::execute(StopWatch* stopwatch)
1359 #else
1360 void BatchPrimitiveProcessor::execute()
1361 #endif
1362 {
1363 uint32_t i, j;
1364
1365 try
1366 {
1367 // Check memory up front
1368 if (MonitorProcMem::checkMemlimit())
1369 {
1370 throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1371 }
1372 #ifdef PRIMPROC_STOPWATCH
1373 stopwatch->start("BatchPrimitiveProcessor::execute first part");
1374 #endif
1375
1376 // if only one scan step which has no predicate, async load all columns
1377 if (filterCount == 1 && hasScan)
1378 {
1379 ColumnCommand* col = dynamic_cast<ColumnCommand*>(filterSteps[0].get());
1380
1381 if ((col != NULL) && (col->getFilterCount() == 0) && (col->getLBID() != 0))
1382 {
1383 // stored in last pos in relLBID[] and asyncLoaded[]
1384 uint64_t p = projectCount;
1385 asyncLoaded[p] = asyncLoaded[p] && (relLBID[p] % blocksReadAhead != 0);
1386 relLBID[p] += col->getWidth();
1387
1388 if (!asyncLoaded[p] && col->willPrefetch())
1389 {
1390 loadBlockAsync(col->getLBID(),
1391 versionInfo,
1392 txnID,
1393 col->getCompType(),
1394 &cachedIO,
1395 &physIO,
1396 LBIDTrace,
1397 sessionID,
1398 &counterLock,
1399 &busyLoaderCount,
1400 sendThread,
1401 &vssCache);
1402 asyncLoaded[p] = true;
1403 }
1404
1405 asyncLoadProjectColumns();
1406 }
1407 }
1408
1409 #ifdef PRIMPROC_STOPWATCH
1410 stopwatch->stop("BatchPrimitiveProcessor::execute first part");
1411 stopwatch->start("BatchPrimitiveProcessor::execute second part");
1412 #endif
1413
1414 // filters use relrids and values for intermediate results.
1415 if (bop == BOP_AND)
1416 for (j = 0; j < filterCount; ++j)
1417 {
1418 #ifdef PRIMPROC_STOPWATCH
1419 stopwatch->start("- filterSteps[j]->execute()");
1420 filterSteps[j]->execute();
1421 stopwatch->stop("- filterSteps[j]->execute()");
1422 #else
1423 filterSteps[j]->execute();
1424 #endif
1425 }
1426 else // BOP_OR
1427 {
1428
1429 /* XXXPAT: This is a hacky impl of OR logic. Each filter is configured to
1430 be a scan operation on init. This code runs each independently and
1431 unions their output ridlists using accumulator. At the end it turns
1432 accumulator into a final ridlist for subsequent steps.
1433
1434 If there's a join or a passthru command in the projection list, the
1435 values array has to contain values from the last filter step. In that
1436 case, the last filter step isn't part of the "OR" filter processing.
1437 JLF has added it to prep those operations, not to be a filter.
1438
1439 7/7/09 update: the multiple-table join required relocating the join op. It's
1440 no longer necessary to add the loader columncommand to the filter array.
1441 */
1442
1443 bool accumulator[LOGICAL_BLOCK_RIDS];
1444 // uint32_t realFilterCount = ((forHJ || hasPassThru) ? filterCount - 1 : filterCount);
1445 uint32_t realFilterCount = filterCount;
1446
1447 for (i = 0; i < LOGICAL_BLOCK_RIDS; i++)
1448 accumulator[i] = false;
1449
1450 if (!hasScan) // there are input rids
1451 for (i = 0; i < ridCount; i++)
1452 accumulator[relRids[i]] = true;
1453
1454 ridCount = 0;
1455
1456 for (i = 0; i < realFilterCount; ++i)
1457 {
1458 filterSteps[i]->execute();
1459
1460 if (! filterSteps[i]->filterFeeder())
1461 {
1462 for (j = 0; j < ridCount; j++)
1463 accumulator[relRids[j]] = true;
1464
1465 ridCount = 0;
1466 }
1467 }
1468
1469 for (ridMap = 0, i = 0; i < LOGICAL_BLOCK_RIDS; ++i)
1470 {
1471 if (accumulator[i])
1472 {
1473 relRids[ridCount] = i;
1474 ridMap |= 1 << (relRids[ridCount] >> 10);
1475 ++ridCount;
1476 }
1477 }
1478 }
1479
1480 #ifdef PRIMPROC_STOPWATCH
1481 stopwatch->stop("BatchPrimitiveProcessor::execute second part");
1482 stopwatch->start("BatchPrimitiveProcessor::execute third part");
1483 #endif
1484
1485 if (projectCount > 0 || ot == ROW_GROUP)
1486 {
1487 #ifdef PRIMPROC_STOPWATCH
1488 stopwatch->start("- writeProjectionPreamble");
1489 writeProjectionPreamble();
1490 stopwatch->stop("- writeProjectionPreamble");
1491 #else
1492 writeProjectionPreamble();
1493 #endif
1494 }
1495
1496 // async load blocks for project phase, if not alread loaded
1497 if (ridCount > 0)
1498 {
1499 #ifdef PRIMPROC_STOPWATCH
1500 stopwatch->start("- asyncLoadProjectColumns");
1501 asyncLoadProjectColumns();
1502 stopwatch->stop("- asyncLoadProjectColumns");
1503 #else
1504 asyncLoadProjectColumns();
1505 #endif
1506 }
1507
1508 #ifdef PRIMPROC_STOPWATCH
1509 stopwatch->stop("BatchPrimitiveProcessor::execute third part");
1510 stopwatch->start("BatchPrimitiveProcessor::execute fourth part");
1511 #endif
1512
1513 // projection commands read relrids and write output directly to a rowgroup
1514 // or the serialized bytestream
1515 if (ot != ROW_GROUP)
1516 for (j = 0; j < projectCount; ++j)
1517 {
1518 projectSteps[j]->project();
1519 }
1520 else
1521 {
1522 /* Function & Expression group 1 processing
1523 - project for FE1
1524 - execute FE1 row by row
1525 - if return value = true, map input row into the projection RG, adjust ridlist
1526 */
1527 #ifdef PRIMPROC_STOPWATCH
1528 stopwatch->start("- if(ot != ROW_GROUP) else");
1529 #endif
1530 outputRG.resetRowGroup(baseRid);
1531
1532 if (fe1)
1533 {
1534 uint32_t newRidCount = 0;
1535 fe1Input.resetRowGroup(baseRid);
1536 fe1Input.setRowCount(ridCount);
1537 fe1Input.getRow(0, &fe1In);
1538 outputRG.getRow(0, &fe1Out);
1539
1540 for (j = 0; j < projectCount; j++)
1541 if (projectForFE1[j] != -1)
1542 projectSteps[j]->projectIntoRowGroup(fe1Input, projectForFE1[j]);
1543
1544 for (j = 0; j < ridCount; j++, fe1In.nextRow())
1545 if (fe1->evaluate(&fe1In))
1546 {
1547 applyMapping(fe1ToProjection, fe1In, &fe1Out);
1548 relRids[newRidCount] = relRids[j];
1549 values[newRidCount++] = values[j];
1550 fe1Out.nextRow();
1551 }
1552
1553 ridCount = newRidCount;
1554 }
1555
1556 outputRG.setRowCount(ridCount);
1557
1558 if (sendRidsAtDelivery)
1559 {
1560 Row r;
1561 outputRG.initRow(&r);
1562 outputRG.getRow(0, &r);
1563
1564 for (j = 0; j < ridCount; ++j)
1565 {
1566 r.setRid(relRids[j]);
1567 r.nextRow();
1568 }
1569 }
1570
1571 /* 7/7/09 PL: I Changed the projection alg to reduce block touches when there's
1572 a join. The key columns get projected first, the join is executed to further
1573 reduce the ridlist, then the rest of the columns get projected */
1574
1575 if (!doJoin)
1576 {
1577 for (j = 0; j < projectCount; ++j)
1578 {
1579 // cout << "projectionMap[" << j << "] = " << projectionMap[j] << endl;
1580 if (projectionMap[j] != -1)
1581 {
1582 #ifdef PRIMPROC_STOPWATCH
1583 stopwatch->start("-- projectIntoRowGroup");
1584 projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1585 stopwatch->stop("-- projectIntoRowGroup");
1586 #else
1587 projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1588 #endif
1589 }
1590
1591 // else
1592 // cout << " no target found for OID " << projectSteps[j]->getOID() << endl;
1593 }
1594 }
1595 else
1596 {
1597 /* project the key columns. If there's the filter IN the join, project everything.
1598 Also need to project 'long' strings b/c executeTupleJoin may copy entire rows
1599 using copyRow(), which will try to interpret the uninit'd string ptr.
1600 Valgrind will legitimately complain about copying uninit'd values for the
1601 other types but that is technically safe. */
1602 for (j = 0; j < projectCount; j++)
1603 if (keyColumnProj[j] || (projectionMap[j] != -1 && (hasJoinFEFilters ||
1604 oldRow.isLongString(projectionMap[j]))))
1605 {
1606 #ifdef PRIMPROC_STOPWATCH
1607 stopwatch->start("-- projectIntoRowGroup");
1608 projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1609 stopwatch->stop("-- projectIntoRowGroup");
1610 #else
1611 projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1612 #endif
1613 }
1614
1615
1616 #ifdef PRIMPROC_STOPWATCH
1617 stopwatch->start("-- executeTupleJoin()");
1618 executeTupleJoin();
1619 stopwatch->stop("-- executeTupleJoin()");
1620 #else
1621 executeTupleJoin();
1622 #endif
1623
1624 /* project the non-key columns */
1625 for (j = 0; j < projectCount; ++j)
1626 {
1627 if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters &&
1628 !oldRow.isLongString(projectionMap[j]))
1629 {
1630 #ifdef PRIMPROC_STOPWATCH
1631 stopwatch->start("-- projectIntoRowGroup");
1632 projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1633 stopwatch->stop("-- projectIntoRowGroup");
1634 #else
1635 projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1636 #endif
1637 }
1638 }
1639 }
1640
1641 /* The RowGroup is fully joined at this point.
1642 Add additional RowGroup processing here.
1643 TODO: Try to clean up all of the switching */
1644
1645 if (doJoin && (fe2 || fAggregator))
1646 {
1647 bool moreRGs = true;
1648 ByteStream preamble = *serialized;
1649 initGJRG();
1650
1651 while (moreRGs && !sendThread->aborted())
1652 {
1653 /*
1654 generate 1 rowgroup (8192 rows max) of joined rows
1655 if there's an FE2, run it
1656 -pack results into a new rowgroup
1657 -if there are < 8192 rows in the new RG, continue
1658 if there's an agg, run it
1659 send the result
1660 */
1661 resetGJRG();
1662 moreRGs = generateJoinedRowGroup(baseJRow);
1663 *serialized << (uint8_t) !moreRGs;
1664
1665 if (fe2)
1666 {
1667 /* functionize this -> processFE2()*/
1668 fe2Output.resetRowGroup(baseRid);
1669 fe2Output.setDBRoot(dbRoot);
1670 fe2Output.getRow(0, &fe2Out);
1671 fe2Input->getRow(0, &fe2In);
1672
1673 for (j = 0; j < joinedRG.getRowCount(); j++, fe2In.nextRow())
1674 if (fe2->evaluate(&fe2In))
1675 {
1676 applyMapping(fe2Mapping, fe2In, &fe2Out);
1677 fe2Out.setRid(fe2In.getRelRid());
1678 fe2Output.incRowCount();
1679 fe2Out.nextRow();
1680 }
1681 }
1682
1683 RowGroup& nextRG = (fe2 ? fe2Output : joinedRG);
1684 nextRG.setDBRoot(dbRoot);
1685
1686 if (fAggregator)
1687 {
1688 fAggregator->addRowGroup(&nextRG);
1689
1690 if ((currentBlockOffset + 1) == count && moreRGs == false) // @bug4507, 8k
1691 {
1692 fAggregator->loadResult(*serialized); // @bug4507, 8k
1693 } // @bug4507, 8k
1694 else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
1695 {
1696 fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
1697 } // @bug4507, 8k
1698 else // @bug4507, 8k
1699 {
1700 fAggregator->loadResult(*serialized); // @bug4507, 8k
1701 fAggregator->aggReset(); // @bug4507, 8k
1702 } // @bug4507, 8k
1703 }
1704 else
1705 {
1706 //cerr <<" * serialzing " << nextRG.toString() << endl;
1707 nextRG.serializeRGData(*serialized);
1708 if (MonitorProcMem::checkMemlimit())
1709 {
1710 throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1711 }
1712 }
1713
1714 /* send the msg & reinit the BS */
1715 if (moreRGs)
1716 {
1717 sendResponse();
1718 serialized.reset(new ByteStream());
1719 *serialized = preamble;
1720 }
1721 }
1722
1723 if (hasSmallOuterJoin)
1724 {
1725 *serialized << ridCount;
1726
1727 for (i = 0; i < joinerCount; i++)
1728 for (j = 0; j < ridCount; ++j)
1729 serializeInlineVector<uint32_t>(*serialized,
1730 tSmallSideMatches[i][j]);
1731 }
1732 }
1733
1734 if (!doJoin && fe2)
1735 {
1736 /* functionize this -> processFE2() */
1737 fe2Output.resetRowGroup(baseRid);
1738 fe2Output.getRow(0, &fe2Out);
1739 fe2Input->getRow(0, &fe2In);
1740
1741 //cerr << "input row: " << fe2In.toString() << endl;
1742 for (j = 0; j < outputRG.getRowCount(); j++, fe2In.nextRow())
1743 {
1744 if (fe2->evaluate(&fe2In))
1745 {
1746 applyMapping(fe2Mapping, fe2In, &fe2Out);
1747 //cerr << " passed. output row: " << fe2Out.toString() << endl;
1748 fe2Out.setRid (fe2In.getRelRid());
1749 fe2Output.incRowCount();
1750 fe2Out.nextRow();
1751 }
1752 }
1753
1754 if (!fAggregator)
1755 {
1756 *serialized << (uint8_t) 1; // the "count this msg" var
1757 fe2Output.setDBRoot(dbRoot);
1758 fe2Output.serializeRGData(*serialized);
1759 if (MonitorProcMem::checkMemlimit())
1760 {
1761 throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1762 }
1763 //*serialized << fe2Output.getDataSize();
1764 //serialized->append(fe2Output.getData(), fe2Output.getDataSize());
1765 }
1766 }
1767
1768 if (!doJoin && fAggregator)
1769 {
1770 *serialized << (uint8_t) 1; // the "count this msg" var
1771
1772 RowGroup& toAggregate = (fe2 ? fe2Output : outputRG);
1773 //toAggregate.convertToInlineDataInPlace();
1774
1775 if (fe2)
1776 fe2Output.setDBRoot(dbRoot);
1777 else
1778 outputRG.setDBRoot(dbRoot);
1779
1780 fAggregator->addRowGroup(&toAggregate);
1781
1782 if ((currentBlockOffset + 1) == count) // @bug4507, 8k
1783 {
1784 fAggregator->loadResult(*serialized); // @bug4507, 8k
1785 } // @bug4507, 8k
1786 else if (utils::MonitorProcMem::isMemAvailable()) // @bug4507, 8k
1787 {
1788 fAggregator->loadEmptySet(*serialized); // @bug4507, 8k
1789 } // @bug4507, 8k
1790 else // @bug4507, 8k
1791 {
1792 fAggregator->loadResult(*serialized); // @bug4507, 8k
1793 fAggregator->aggReset(); // @bug4507, 8k
1794 } // @bug4507, 8k
1795 }
1796
1797 if (!fAggregator && !fe2)
1798 {
1799 *serialized << (uint8_t) 1; // the "count this msg" var
1800 outputRG.setDBRoot(dbRoot);
1801 //cerr << "serializing " << outputRG.toString() << endl;
1802 outputRG.serializeRGData(*serialized);
1803 if (MonitorProcMem::checkMemlimit())
1804 {
1805 throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1806 }
1807 //*serialized << outputRG.getDataSize();
1808 //serialized->append(outputRG.getData(), outputRG.getDataSize());
1809 if (doJoin)
1810 {
1811 for (i = 0; i < joinerCount; i++)
1812 {
1813 for (j = 0; j < ridCount; ++j)
1814 {
1815 serializeInlineVector<uint32_t>(*serialized,
1816 tSmallSideMatches[i][j]);
1817 }
1818 }
1819 }
1820 }
1821
1822 // clear small side match vector
1823 if (doJoin)
1824 {
1825 for (i = 0; i < joinerCount; i++)
1826 for (j = 0; j < ridCount; ++j)
1827 tSmallSideMatches[i][j].clear();
1828 }
1829
1830 #ifdef PRIMPROC_STOPWATCH
1831 stopwatch->stop("- if(ot != ROW_GROUP) else");
1832 #endif
1833 }
1834
1835 if (projectCount > 0 || ot == ROW_GROUP)
1836 {
1837 *serialized << cachedIO;
1838 cachedIO = 0;
1839 *serialized << physIO;
1840 physIO = 0;
1841 *serialized << touchedBlocks;
1842 touchedBlocks = 0;
1843 // cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO <<
1844 // " touchedBlocks=" << touchedBlocks << endl;
1845 }
1846
1847 #ifdef PRIMPROC_STOPWATCH
1848 stopwatch->stop("BatchPrimitiveProcessor::execute fourth part");
1849 #endif
1850
1851 }
1852 catch (logging::QueryDataExcept& qex)
1853 {
1854 writeErrorMsg(qex.what(), qex.errorCode());
1855 }
1856 catch (logging::DictionaryBufferOverflow& db)
1857 {
1858 writeErrorMsg(db.what(), db.errorCode());
1859 }
1860 catch (scalar_exception& se)
1861 {
1862 writeErrorMsg(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW), ERR_MORE_THAN_1_ROW, false);
1863 }
1864 catch (NeedToRestartJob& n)
1865 {
1866 #if 0
1867
1868 /* This block of code will flush the problematic OIDs from the
1869 * cache. It seems to have no effect on the problem, so it's commented
1870 * for now.
1871 *
1872 * This is currently thrown only on syscat queries. If we find the problem
1873 * in user tables also, we should avoid dropping entire OIDs if possible.
1874 *
1875 * In local testing there was no need for flushing, because DDL flushes
1876 * the syscat constantly. However, it can take a long time (>10 s) before
1877 * that happens. Doing it locally should make it much more likely only
1878 * one restart is necessary.
1879 */
1880
1881 try
1882 {
1883 vector<uint32_t> oids;
1884 uint32_t oid;
1885
1886 for (uint32_t i = 0; i < filterCount; i++)
1887 {
1888 oid = filterSteps[i]->getOID();
1889
1890 if (oid > 0)
1891 oids.push_back(oid);
1892 }
1893
1894 for (uint32_t i = 0; i < projectCount; i++)
1895 {
1896 oid = projectSteps[i]->getOID();
1897
1898 if (oid > 0)
1899 oids.push_back(oid);
1900 }
1901
1902 #if 0
1903 Logger logger;
1904 ostringstream os;
1905 os << "dropping OIDs: ";
1906
1907 for (int i = 0; i < oids.size(); i++)
1908 os << oids[i] << " ";
1909
1910 logger.logMessage(os.str());
1911 #endif
1912
1913 for (int i = 0; i < fCacheCount; i++)
1914 {
1915 dbbc::blockCacheClient bc(*BRPp[i]);
1916 // bc.flushCache();
1917 bc.flushOIDs(&oids[0], oids.size());
1918 }
1919 }
1920 catch (...) { } // doesn't matter if this fails, just avoid crashing
1921
1922 #endif
1923
1924 #ifndef __FreeBSD__
1925 pthread_mutex_unlock(&objLock);
1926 #endif
1927 throw n; // need to pass this through to BPPSeeder
1928 }
1929 catch (IDBExcept& iex)
1930 {
1931 writeErrorMsg(iex.what(), iex.errorCode(), true, false);
1932 }
1933 catch (const std::exception& ex)
1934 {
1935 writeErrorMsg(ex.what(), logging::batchPrimitiveProcessorErr);
1936 }
1937 catch (...)
1938 {
1939 string msg("BatchPrimitiveProcessor caught an unknown exception");
1940 writeErrorMsg(msg, logging::batchPrimitiveProcessorErr);
1941 }
1942 }
1943
writeErrorMsg(const string & error,uint16_t errCode,bool logIt,bool critical)1944 void BatchPrimitiveProcessor::writeErrorMsg(const string& error, uint16_t errCode, bool logIt, bool critical)
1945 {
1946 ISMPacketHeader ism;
1947 PrimitiveHeader ph;
1948
1949 // we don't need every field of these headers. Init'ing them anyway
1950 // makes memory checkers happy.
1951 void *ismp = static_cast<void*>(&ism);
1952 void *php = static_cast<void*>(&ph);
1953 memset(ismp, 0, sizeof(ISMPacketHeader));
1954 memset(php, 0, sizeof(PrimitiveHeader));
1955 ph.SessionID = sessionID;
1956 ph.StepID = stepID;
1957 ph.UniqueID = uniqueID;
1958 ism.Status = errCode;
1959
1960 serialized.reset(new ByteStream());
1961 serialized->append((uint8_t*) &ism, sizeof(ism));
1962 serialized->append((uint8_t*) &ph, sizeof(ph));
1963 *serialized << error;
1964
1965 if (logIt)
1966 {
1967 Logger log;
1968 log.logMessage(error, critical);
1969 }
1970 }
1971
writeProjectionPreamble()1972 void BatchPrimitiveProcessor::writeProjectionPreamble()
1973 {
1974 ISMPacketHeader ism;
1975 PrimitiveHeader ph;
1976
1977 // we don't need every field of these headers. Init'ing them anyway
1978 // makes memory checkers happy.
1979 void *ismp = static_cast<void*>(&ism);
1980 void *php = static_cast<void*>(&ph);
1981 memset(ismp, 0, sizeof(ISMPacketHeader));
1982 memset(php, 0, sizeof(PrimitiveHeader));
1983 ph.SessionID = sessionID;
1984 ph.StepID = stepID;
1985 ph.UniqueID = uniqueID;
1986
1987 serialized.reset(new ByteStream());
1988 serialized->append((uint8_t*) &ism, sizeof(ism));
1989 serialized->append((uint8_t*) &ph, sizeof(ph));
1990
1991 /* add-ons */
1992 if (hasScan)
1993 {
1994 if (validCPData)
1995 {
1996 *serialized << (uint8_t) 1;
1997 *serialized << lbidForCP;
1998 *serialized << (uint64_t) minVal;
1999 *serialized << (uint64_t) maxVal;
2000 }
2001 else
2002 {
2003 *serialized << (uint8_t) 0;
2004 *serialized << lbidForCP;
2005 }
2006 }
2007
2008 // ridsOut += ridCount;
2009 /* results */
2010
2011 if (ot != ROW_GROUP)
2012 {
2013 *serialized << ridCount;
2014
2015 if (sendRidsAtDelivery)
2016 {
2017 *serialized << baseRid;
2018 serialized->append((uint8_t*) relRids, ridCount << 1);
2019 }
2020 }
2021 }
2022
serializeElementTypes()2023 void BatchPrimitiveProcessor::serializeElementTypes()
2024 {
2025
2026 *serialized << baseRid;
2027 *serialized << ridCount;
2028 serialized->append((uint8_t*) relRids, ridCount << 1);
2029 serialized->append((uint8_t*) values, ridCount << 3);
2030 }
2031
serializeStrings()2032 void BatchPrimitiveProcessor::serializeStrings()
2033 {
2034
2035 *serialized << ridCount;
2036 serialized->append((uint8_t*) absRids.get(), ridCount << 3);
2037
2038 for (uint32_t i = 0; i < ridCount; ++i)
2039 *serialized << strValues[i];
2040 }
2041
sendResponse()2042 void BatchPrimitiveProcessor::sendResponse()
2043 {
2044
2045 if (sendThread->flowControlEnabled())
2046 {
2047 // newConnection should be set only for the first result of a batch job
2048 // it tells sendthread it should consider it for the connection array
2049 sendThread->sendResult(BPPSendThread::Msg_t(serialized, sock, writelock, sockIndex), newConnection);
2050 newConnection = false;
2051 }
2052 else
2053 {
2054 boost::mutex::scoped_lock lk(*writelock);
2055 sock->write(*serialized);
2056 }
2057
2058 serialized.reset();
2059 }
2060
2061 /* The output of a filter chain is either ELEMENT_TYPE or STRING_ELEMENT_TYPE */
makeResponse()2062 void BatchPrimitiveProcessor::makeResponse()
2063 {
2064 ISMPacketHeader ism;
2065 PrimitiveHeader ph;
2066
2067 // we don't need every field of these headers. Init'ing them anyway
2068 // makes memory checkers happy.
2069 void *ismp = static_cast<void*>(&ism);
2070 void *php = static_cast<void*>(&ph);
2071 memset(ismp, 0, sizeof(ISMPacketHeader));
2072 memset(php, 0, sizeof(PrimitiveHeader));
2073 ph.SessionID = sessionID;
2074 ph.StepID = stepID;
2075 ph.UniqueID = uniqueID;
2076
2077 serialized.reset(new ByteStream());
2078 serialized->append((uint8_t*) &ism, sizeof(ism));
2079 serialized->append((uint8_t*) &ph, sizeof(ph));
2080
2081 /* add-ons */
2082 if (hasScan)
2083 {
2084 if (validCPData)
2085 {
2086 *serialized << (uint8_t) 1;
2087 *serialized << lbidForCP;
2088 *serialized << (uint64_t) minVal;
2089 *serialized << (uint64_t) maxVal;
2090 }
2091 else
2092 {
2093 *serialized << (uint8_t) 0;
2094 *serialized << lbidForCP;
2095 }
2096 }
2097
2098 /* results */
2099 /* Take the rid and value arrays, munge into OutputType ot */
2100 switch (ot)
2101 {
2102 case BPS_ELEMENT_TYPE:
2103 serializeElementTypes();
2104 break;
2105
2106 case STRING_ELEMENT_TYPE:
2107 serializeStrings();
2108 break;
2109
2110 default:
2111 {
2112 ostringstream oss;
2113 oss << "BPP: makeResponse(): Bad output type: " << ot;
2114 throw logic_error(oss.str());
2115 }
2116
2117 //throw logic_error("BPP: makeResponse(): Bad output type");
2118 }
2119
2120 *serialized << cachedIO;
2121 cachedIO = 0;
2122 *serialized << physIO;
2123 physIO = 0;
2124 *serialized << touchedBlocks;
2125 touchedBlocks = 0;
2126
2127 // cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO <<
2128 // " touchedBlocks=" << touchedBlocks << endl;
2129 }
2130
operator ()()2131 int BatchPrimitiveProcessor::operator()()
2132 {
2133 utils::setThreadName("PPBatchPrimProc");
2134 #ifdef PRIMPROC_STOPWATCH
2135 const static std::string msg{"BatchPrimitiveProcessor::operator()"};
2136 logging::StopWatch* stopwatch = profiler.getTimer();
2137 #endif
2138
2139 if (currentBlockOffset == 0)
2140 {
2141 #ifdef PRIMPROC_STOPWATCH
2142 stopwatch->start(msg);
2143 #endif
2144 idbassert(count > 0);
2145 }
2146
2147 if (fAggregator && currentBlockOffset == 0) // @bug4507, 8k
2148 fAggregator->aggReset(); // @bug4507, 8k
2149
2150 for (; currentBlockOffset < count; currentBlockOffset++)
2151 {
2152 if (!(sessionID & 0x80000000)) // can't do this with syscat queries
2153 {
2154 if (sendThread->aborted())
2155 break;
2156
2157 if (!sendThread->okToProceed())
2158 {
2159 freeLargeBuffers();
2160 return -1; // the reschedule error code
2161 }
2162 }
2163
2164 allocLargeBuffers();
2165 minVal = MAX64;
2166 maxVal = MIN64;
2167 validCPData = false;
2168 #ifdef PRIMPROC_STOPWATCH
2169 stopwatch->start("BPP() execute");
2170 execute(stopwatch);
2171 stopwatch->stop("BPP() execute");
2172 #else
2173 execute();
2174 #endif
2175
2176 if (projectCount == 0 && ot != ROW_GROUP)
2177 makeResponse();
2178
2179 try
2180 {
2181 sendResponse();
2182 }
2183 catch (std::exception& e)
2184 {
2185 cerr << "BPP::sendResponse(): " << e.what() << endl;
2186 break; // If we make this throw, be sure to do the cleanup at the end
2187 }
2188
2189 // Bug 4475: Control outgoing socket so that all messages from a
2190 // batch go out the same socket
2191 sockIndex = (sockIndex + 1) % connectionsPerUM;
2192
2193 nextLBID();
2194
2195 /* Base RIDs are now a combination of partition#, segment#, extent#, and block#. */
2196 uint32_t partNum;
2197 uint16_t segNum;
2198 uint8_t extentNum;
2199 uint16_t blockNum;
2200 rowgroup::getLocationFromRid(baseRid, &partNum, &segNum, &extentNum, &blockNum);
2201 /*
2202 cout << "baseRid=" << baseRid << " partNum=" << partNum << " segNum=" << segNum <<
2203 " extentNum=" << (int) extentNum
2204 << " blockNum=" << blockNum << endl;
2205 */
2206 blockNum++;
2207 baseRid = rowgroup::convertToRid(partNum, segNum, extentNum, blockNum);
2208 /*
2209 cout << "-- baseRid=" << baseRid << " partNum=" << partNum << " extentNum=" << (int) extentNum
2210 << " blockNum=" << blockNum << endl;
2211 */
2212 }
2213 vssCache.clear();
2214 #ifndef __FreeBSD__
2215 pthread_mutex_unlock(&objLock);
2216 #endif
2217 freeLargeBuffers();
2218 #ifdef PRIMPROC_STOPWATCH
2219 stopwatch->stop(msg);
2220 #endif
2221 // cout << "sent " << count << " responses" << endl;
2222 fBusy = false;
2223 return 0;
2224 }
2225
allocLargeBuffers()2226 void BatchPrimitiveProcessor::allocLargeBuffers()
2227 {
2228 if (ot == ROW_GROUP && !outRowGroupData)
2229 {
2230 //outputRG.setUseStringTable(true);
2231 outRowGroupData.reset(new RGData(outputRG));
2232 outputRG.setData(outRowGroupData.get());
2233 }
2234
2235 if (fe1 && !fe1Data)
2236 {
2237 //fe1Input.setUseStringTable(true);
2238 fe1Data.reset(new RGData(fe1Input));
2239 //fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
2240 fe1Input.setData(fe1Data.get());
2241 }
2242
2243 if (fe2 && !fe2Data)
2244 {
2245 //fe2Output.setUseStringTable(true);
2246 fe2Data.reset(new RGData(fe2Output));
2247 fe2Output.setData(fe2Data.get());
2248 }
2249
2250 if (getTupleJoinRowGroupData && !joinedRGMem)
2251 {
2252 //joinedRG.setUseStringTable(true);
2253 joinedRGMem.reset(new RGData(joinedRG));
2254 joinedRG.setData(joinedRGMem.get());
2255 }
2256 }
2257
freeLargeBuffers()2258 void BatchPrimitiveProcessor::freeLargeBuffers()
2259 {
2260 /* Get rid of large buffers */
2261 if (ot == ROW_GROUP && outputRG.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2262 outRowGroupData.reset();
2263
2264 if (fe1 && fe1Input.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2265 fe1Data.reset();
2266
2267 if (fe2 && fe2Output.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2268 fe2Data.reset();
2269
2270 if (getTupleJoinRowGroupData && joinedRG.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2271 joinedRGMem.reset();
2272 }
2273
nextLBID()2274 void BatchPrimitiveProcessor::nextLBID()
2275 {
2276 uint32_t i;
2277
2278 for (i = 0; i < filterCount; i++)
2279 filterSteps[i]->nextLBID();
2280
2281 for (i = 0; i < projectCount; i++)
2282 projectSteps[i]->nextLBID();
2283 }
2284
duplicate()2285 SBPP BatchPrimitiveProcessor::duplicate()
2286 {
2287 SBPP bpp;
2288 uint32_t i;
2289
2290 // cout << "duplicating a bpp\n";
2291
2292 bpp.reset(new BatchPrimitiveProcessor());
2293 bpp->ot = ot;
2294 bpp->versionInfo = versionInfo;
2295 bpp->txnID = txnID;
2296 bpp->sessionID = sessionID;
2297 bpp->stepID = stepID;
2298 bpp->uniqueID = uniqueID;
2299 bpp->needStrValues = needStrValues;
2300 bpp->gotAbsRids = gotAbsRids;
2301 bpp->gotValues = gotValues;
2302 bpp->LBIDTrace = LBIDTrace;
2303 bpp->hasScan = hasScan;
2304 bpp->hasFilterStep = hasFilterStep;
2305 bpp->filtOnString = filtOnString;
2306 bpp->hasRowGroup = hasRowGroup;
2307 bpp->getTupleJoinRowGroupData = getTupleJoinRowGroupData;
2308 bpp->bop = bop;
2309 bpp->hasPassThru = hasPassThru;
2310 bpp->forHJ = forHJ;
2311 bpp->processorThreads = processorThreads; // is a power-of-2 at this point
2312 bpp->ptMask = processorThreads - 1;
2313
2314 if (ot == ROW_GROUP)
2315 {
2316 bpp->outputRG = outputRG;
2317
2318 if (fe1)
2319 {
2320 bpp->fe1.reset(new FuncExpWrapper(*fe1));
2321 bpp->fe1Input = fe1Input;
2322 }
2323
2324 if (fe2)
2325 {
2326 bpp->fe2.reset(new FuncExpWrapper(*fe2));
2327 bpp->fe2Output = fe2Output;
2328 }
2329 }
2330
2331 bpp->doJoin = doJoin;
2332
2333 if (doJoin)
2334 {
2335 pthread_mutex_lock(&bpp->objLock);
2336 /* There are add'l join vars, but only these are necessary for processing
2337 a join */
2338 bpp->tJoinerSizes = tJoinerSizes;
2339 bpp->joinerCount = joinerCount;
2340 bpp->joinTypes = joinTypes;
2341 bpp->largeSideKeyColumns = largeSideKeyColumns;
2342 bpp->tJoiners = tJoiners;
2343 //bpp->_pools = _pools;
2344 bpp->typelessJoin = typelessJoin;
2345 bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns;
2346 bpp->tlJoiners = tlJoiners;
2347 bpp->tlKeyLengths = tlKeyLengths;
2348 bpp->storedKeyAllocators = storedKeyAllocators;
2349 bpp->joinNullValues = joinNullValues;
2350 bpp->doMatchNulls = doMatchNulls;
2351 bpp->hasJoinFEFilters = hasJoinFEFilters;
2352 bpp->hasSmallOuterJoin = hasSmallOuterJoin;
2353
2354 if (hasJoinFEFilters)
2355 {
2356 bpp->joinFERG = joinFERG;
2357 bpp->joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
2358
2359 for (i = 0; i < joinerCount; i++)
2360 if (joinFEFilters[i])
2361 bpp->joinFEFilters[i].reset(new FuncExpWrapper(*joinFEFilters[i]));
2362 }
2363
2364 if (getTupleJoinRowGroupData)
2365 {
2366 bpp->smallSideRGs = smallSideRGs;
2367 bpp->largeSideRG = largeSideRG;
2368 bpp->smallSideRowLengths = smallSideRowLengths;
2369 bpp->smallSideRowData = smallSideRowData;
2370 bpp->smallNullRowData = smallNullRowData;
2371 bpp->smallNullPointers = smallNullPointers;
2372 bpp->joinedRG = joinedRG;
2373 }
2374
2375 #ifdef __FreeBSD__
2376 pthread_mutex_unlock(&bpp->objLock);
2377 #endif
2378 }
2379
2380 bpp->filterCount = filterCount;
2381 bpp->filterSteps.resize(filterCount);
2382
2383 for (i = 0; i < filterCount; ++i)
2384 bpp->filterSteps[i] = filterSteps[i]->duplicate();
2385
2386 bpp->projectCount = projectCount;
2387 bpp->projectSteps.resize(projectCount);
2388
2389 for (i = 0; i < projectCount; ++i)
2390 bpp->projectSteps[i] = projectSteps[i]->duplicate();
2391
2392 if (fAggregator.get() != NULL)
2393 {
2394 bpp->fAggregateRG = fAggregateRG;
2395 bpp->fAggregator.reset(new RowAggregation(
2396 fAggregator->getGroupByCols(), fAggregator->getAggFunctions()));
2397 bpp->fAggregator->timeZone(fAggregator->timeZone());
2398 }
2399
2400 bpp->sendRidsAtDelivery = sendRidsAtDelivery;
2401 bpp->prefetchThreshold = prefetchThreshold;
2402
2403 bpp->sock = sock;
2404 bpp->writelock = writelock;
2405 bpp->hasDictStep = hasDictStep;
2406 bpp->sendThread = sendThread;
2407 bpp->newConnection = true;
2408 bpp->initProcessor();
2409 return bpp;
2410 }
2411
2412 #if 0
2413 bool BatchPrimitiveProcessor::operator==(const BatchPrimitiveProcessor& bpp) const
2414 {
2415 uint32_t i;
2416
2417 if (ot != bpp.ot)
2418 return false;
2419
2420 if (versionInfo != bpp.versionInfo)
2421 return false;
2422
2423 if (txnID != bpp.txnID)
2424 return false;
2425
2426 if (sessionID != bpp.sessionID)
2427 return false;
2428
2429 if (stepID != bpp.stepID)
2430 return false;
2431
2432 if (uniqueID != bpp.uniqueID)
2433 return false;
2434
2435 if (gotValues != bpp.gotValues)
2436 return false;
2437
2438 if (gotAbsRids != bpp.gotAbsRids)
2439 return false;
2440
2441 if (needStrValues != bpp.needStrValues)
2442 return false;
2443
2444 if (filterCount != bpp.filterCount)
2445 return false;
2446
2447 if (projectCount != bpp.projectCount)
2448 return false;
2449
2450 if (sendRidsAtDelivery != bpp.sendRidsAtDelivery)
2451 return false;
2452
2453 if (hasScan != bpp.hasScan)
2454 return false;
2455
2456 if (hasFilterStep != bpp.hasFilterStep)
2457 return false;
2458
2459 if (filtOnString != bpp.filtOnString)
2460 return false;
2461
2462 if (doJoin != bpp.doJoin)
2463 return false;
2464
2465 if (doJoin)
2466
2467 /* Join equality test is a bit out of date */
2468 if (joiner != bpp.joiner || joinerSize != bpp.joinerSize)
2469 return false;
2470
2471 for (i = 0; i < filterCount; i++)
2472 if (*filterSteps[i] != *bpp.filterSteps[i])
2473 return false;
2474
2475 return true;
2476 }
2477 #endif
2478
2479
asyncLoadProjectColumns()2480 void BatchPrimitiveProcessor::asyncLoadProjectColumns()
2481 {
2482 // relLBID is the LBID related to the primMsg->LBID,
2483 // it is used to keep the read ahead boundary for asyncLoads
2484 // 1. scan driven case: load blocks in # to (# + blocksReadAhead - 1) range,
2485 // where # is a multiple of ColScanReadAheadBlocks in Columnstore.xml
2486 // 2. non-scan driven case: load blocks in the logical block.
2487 // because 1 logical block per primMsg, asyncLoad only once per message.
2488 for (uint64_t i = 0; i < projectCount; ++i)
2489 {
2490 // only care about column commands
2491 ColumnCommand* col = dynamic_cast<ColumnCommand*>(projectSteps[i].get());
2492
2493 if (col != NULL)
2494 {
2495 asyncLoaded[i] = asyncLoaded[i] && (relLBID[i] % blocksReadAhead != 0);
2496 relLBID[i] += col->getWidth();
2497
2498 if (!asyncLoaded[i] && col->willPrefetch())
2499 {
2500 loadBlockAsync(col->getLBID(),
2501 versionInfo,
2502 txnID,
2503 col->getCompType(),
2504 &cachedIO,
2505 &physIO,
2506 LBIDTrace,
2507 sessionID,
2508 &counterLock,
2509 &busyLoaderCount,
2510 sendThread,
2511 &vssCache);
2512 asyncLoaded[i] = true;
2513 }
2514 }
2515 }
2516 }
2517
generateJoinedRowGroup(rowgroup::Row & baseRow,const uint32_t depth)2518 bool BatchPrimitiveProcessor::generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth)
2519 {
2520 Row& smallRow = smallRows[depth];
2521 const bool lowestLvl = (depth == joinerCount - 1);
2522
2523 while (gjrgRowNumber < ridCount &&
2524 gjrgPlaceHolders[depth] < tSmallSideMatches[depth][gjrgRowNumber].size() &&
2525 !gjrgFull)
2526 {
2527 const vector<uint32_t>& results = tSmallSideMatches[depth][gjrgRowNumber];
2528 const uint32_t size = results.size();
2529
2530 if (depth == 0)
2531 {
2532 outputRG.getRow(gjrgRowNumber, &largeRow);
2533 applyMapping(gjrgMappings[joinerCount], largeRow, &baseRow);
2534 baseRow.setRid(largeRow.getRelRid());
2535 }
2536
2537 //cout << "rowNum = " << gjrgRowNumber << " at depth " << depth << " size is " << size << endl;
2538 for (uint32_t& i = gjrgPlaceHolders[depth]; i < size && !gjrgFull; i++)
2539 {
2540 if (results[i] != (uint32_t) - 1)
2541 {
2542 smallSideRGs[depth].getRow(results[i], &smallRow);
2543 //rowOffset = ((uint64_t) results[i]) * smallRowSize;
2544 //smallRow.setData(&rowDataAtThisLvl.rowData[rowOffset] + emptySize);
2545 }
2546 else
2547 smallRow.setPointer(smallNullPointers[depth]);
2548
2549 //cout << "small row: " << smallRow.toString() << endl;
2550 applyMapping(gjrgMappings[depth], smallRow, &baseRow);
2551
2552 if (!lowestLvl)
2553 generateJoinedRowGroup(baseRow, depth + 1);
2554 else
2555 {
2556 copyRow(baseRow, &joinedRow);
2557 //memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize());
2558 //cerr << "joined row " << joinedRG.getRowCount() << ": " << joinedRow.toString() << endl;
2559 joinedRow.nextRow();
2560 joinedRG.incRowCount();
2561
2562 if (joinedRG.getRowCount() == 8192)
2563 {
2564 i++;
2565 gjrgFull = true;
2566 }
2567 }
2568
2569 if (gjrgFull)
2570 break;
2571 }
2572
2573 if (depth == 0 && gjrgPlaceHolders[0] == tSmallSideMatches[0][gjrgRowNumber].size())
2574 {
2575 gjrgPlaceHolders[0] = 0;
2576 gjrgRowNumber++;
2577 }
2578 }
2579
2580 // if (depth == 0)
2581 // cout << "gjrg returning " << (uint32_t) gjrgFull << endl;
2582 if (!gjrgFull)
2583 gjrgPlaceHolders[depth] = 0;
2584
2585 return gjrgFull;
2586 }
2587
resetGJRG()2588 void BatchPrimitiveProcessor::resetGJRG()
2589 {
2590 gjrgFull = false;
2591 joinedRG.resetRowGroup(baseRid);
2592 joinedRG.getRow(0, &joinedRow);
2593 joinedRG.setDBRoot(dbRoot);
2594 }
2595
initGJRG()2596 void BatchPrimitiveProcessor::initGJRG()
2597 {
2598 for (uint32_t z = 0; z < joinerCount; z++)
2599 gjrgPlaceHolders[z] = 0;
2600
2601 gjrgRowNumber = 0;
2602 }
2603
getJoinResults(const Row & r,uint32_t jIndex,vector<uint32_t> & v)2604 inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jIndex, vector<uint32_t>& v)
2605 {
2606 uint bucket;
2607
2608 if (!typelessJoin[jIndex])
2609 {
2610 if (r.isNullValue(largeSideKeyColumns[jIndex]))
2611 {
2612 /* Bug 3524. This matches everything. */
2613 if (joinTypes[jIndex] & ANTI)
2614 {
2615 TJoiner::iterator it;
2616
2617 for (uint i = 0; i < processorThreads; ++i)
2618 for (it = tJoiners[jIndex][i]->begin(); it != tJoiners[jIndex][i]->end(); ++it)
2619 v.push_back(it->second);
2620
2621 return;
2622 }
2623 else
2624 return;
2625 }
2626
2627 uint64_t largeKey;
2628 uint32_t colIndex = largeSideKeyColumns[jIndex];
2629
2630 if (r.isUnsigned(colIndex))
2631 {
2632 largeKey = r.getUintField(colIndex);
2633 }
2634 else
2635 {
2636 largeKey = r.getIntField(colIndex);
2637 }
2638
2639 bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask;
2640 pair<TJoiner::iterator, TJoiner::iterator> range = tJoiners[jIndex][bucket]->equal_range(largeKey);
2641 for (; range.first != range.second; ++range.first)
2642 v.push_back(range.first->second);
2643
2644 if (doMatchNulls[jIndex]) // add the nulls to the match list
2645 {
2646 bucket = bucketPicker((char *) &joinNullValues[jIndex], 8, bpSeed) & ptMask;
2647 range = tJoiners[jIndex][bucket]->equal_range(joinNullValues[jIndex]);
2648 for (; range.first != range.second; ++range.first)
2649 v.push_back(range.first->second);
2650 }
2651 }
2652 else
2653 {
2654 /* Bug 3524. Large-side NULL + ANTI join matches everything. */
2655 if (joinTypes[jIndex] & ANTI)
2656 {
2657 bool hasNullValue = false;
2658
2659 for (uint32_t i = 0; i < tlLargeSideKeyColumns[jIndex].size(); i++)
2660 if (r.isNullValue(tlLargeSideKeyColumns[jIndex][i]))
2661 {
2662 hasNullValue = true;
2663 break;
2664 }
2665
2666 if (hasNullValue)
2667 {
2668 TLJoiner::iterator it;
2669 for (uint i = 0; i < processorThreads; ++i)
2670 for (it = tlJoiners[jIndex][i]->begin(); it != tlJoiners[jIndex][i]->end(); ++it)
2671 v.push_back(it->second);
2672
2673 return;
2674 }
2675 }
2676
2677 TypelessData largeKey = makeTypelessKey(r, tlLargeSideKeyColumns[jIndex],
2678 tlKeyLengths[jIndex], &tmpKeyAllocators[jIndex]);
2679 pair<TLJoiner::iterator, TLJoiner::iterator> range;
2680 bucket = largeKey.hash(outputRG, tlLargeSideKeyColumns[jIndex]) & ptMask;
2681 range = tlJoiners[jIndex][bucket]->equal_range(largeKey);
2682 for (; range.first != range.second; ++range.first)
2683 v.push_back(range.first->second);
2684 }
2685 }
2686
buildVSSCache(uint32_t loopCount)2687 void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount)
2688 {
2689 vector<int64_t> lbidList;
2690 vector<BRM::VSSData> vssData;
2691 uint32_t i;
2692 int rc;
2693
2694 for (i = 0; i < filterCount; i++)
2695 filterSteps[i]->getLBIDList(loopCount, &lbidList);
2696
2697 for (i = 0; i < projectCount; i++)
2698 projectSteps[i]->getLBIDList(loopCount, &lbidList);
2699
2700 rc = brm->bulkVSSLookup(lbidList, versionInfo, (int) txnID, &vssData);
2701
2702 if (rc == 0)
2703 for (i = 0; i < vssData.size(); i++)
2704 vssCache.insert(make_pair(lbidList[i], vssData[i]));
2705
2706 // cout << "buildVSSCache inserted " << vssCache.size() << " elements" << endl;
2707 }
2708
resetMem()2709 void BatchPrimitiveProcessor::resetMem()
2710 {
2711 serialized.reset();
2712 outputMsg.reset();
2713 std::vector<SCommand>().swap(filterSteps);
2714 std::vector<SCommand>().swap(projectSteps);
2715 vssCache.clear();
2716 #ifndef __FreeBSD__
2717 pthread_mutex_unlock(&objLock);
2718 #endif
2719 outRowGroupData.reset();
2720 fe1Data.reset();
2721 fe2Data.reset();
2722 joinedRGMem.reset();
2723 }
2724
2725 }
2726 // vim:ts=4 sw=4:
2727