1 /* Copyright (C) 2014 InfiniDB, Inc.
2    Copyright (C) 2016, 2017 MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License
6    as published by the Free Software Foundation; version 2 of
7    the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17    MA 02110-1301, USA. */
18 
19 //
20 // $Id: batchprimitiveprocessor.cpp 2136 2013-07-24 21:04:30Z pleblanc $
21 // C++ Implementation: batchprimitiveprocessor
22 //
23 // Description:
24 //
25 //
26 // Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
27 //
28 // Copyright: See COPYING file that comes with this distribution
29 //
30 //
31 
32 #include <stdexcept>
33 #include <unistd.h>
34 #include <cstring>
35 //#define NDEBUG
36 #include <cassert>
37 #include <string>
38 #include <sstream>
39 #include <set>
40 using namespace std;
41 
42 #include <boost/thread.hpp>
43 #include <boost/shared_ptr.hpp>
44 using namespace boost;
45 
46 #include "bpp.h"
47 #include "primitiveserver.h"
48 #include "errorcodes.h"
49 #include "exceptclasses.h"
50 #include "pp_logger.h"
51 #include "funcexpwrapper.h"
52 #include "fixedallocator.h"
53 #include "blockcacheclient.h"
54 #include "MonitorProcMem.h"
55 #include "threadnaming.h"
56 #include "vlarray.h"
57 
58 #define MAX64 0x7fffffffffffffffLL
59 #define MIN64 0x8000000000000000LL
60 
61 using namespace messageqcpp;
62 using namespace joiner;
63 using namespace std::tr1;
64 using namespace rowgroup;
65 using namespace funcexp;
66 using namespace logging;
67 using namespace utils;
68 using namespace joblist;
69 
70 namespace primitiveprocessor
71 {
72 
73 #ifdef PRIMPROC_STOPWATCH
74 #include "poormanprofiler.inc"
75 #endif
76 
77 // these are config parms defined in primitiveserver.cpp, initialized by PrimProc main().
78 extern uint32_t blocksReadAhead;
79 extern uint32_t dictBufferSize;
80 extern uint32_t defaultBufferSize;
81 extern int fCacheCount;
82 extern uint32_t connectionsPerUM;
83 extern int noVB;
84 
85 // copied from https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
nextPowOf2(uint x)86 uint nextPowOf2(uint x)
87 {
88     x--;
89     x |= x >> 1;
90     x |= x >> 2;
91     x |= x >> 4;
92     x |= x >> 8;
93     x |= x >> 16;
94     x++;
95     return x;
96 }
97 
BatchPrimitiveProcessor()98 BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
99     ot(BPS_ELEMENT_TYPE),
100     txnID(0),
101     sessionID(0),
102     stepID(0),
103     uniqueID(0),
104     count(1),
105     baseRid(0),
106     ridCount(0),
107     needStrValues(false),
108     filterCount(0),
109     projectCount(0),
110     sendRidsAtDelivery(false),
111     ridMap(0),
112     gotAbsRids(false),
113     gotValues(false),
114     hasScan(false),
115     validCPData(false),
116     minVal(MAX64),
117     maxVal(MIN64),
118     lbidForCP(0),
119     busyLoaderCount(0),
120     physIO(0),
121     cachedIO(0),
122     touchedBlocks(0),
123     LBIDTrace(false),
124     fBusy(false),
125     doJoin(false),
126     hasFilterStep(false),
127     filtOnString(false),
128     prefetchThreshold(0),
129     hasDictStep(false),
130     sockIndex(0),
131     endOfJoinerRan(false),
132     processorThreads(0),
133     ptMask(0),
134     firstInstance(false)
135 {
136     pp.setLogicalBlockMode(true);
137     pp.setBlockPtr((int*) blockData);
138     pthread_mutex_init(&objLock, NULL);
139 }
140 
BatchPrimitiveProcessor(ByteStream & b,double prefetch,boost::shared_ptr<BPPSendThread> bppst,uint _processorThreads)141 BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
142         boost::shared_ptr<BPPSendThread> bppst, uint _processorThreads) :
143     ot(BPS_ELEMENT_TYPE),
144     txnID(0),
145     sessionID(0),
146     stepID(0),
147     uniqueID(0),
148     count(1),
149     baseRid(0),
150     ridCount(0),
151     needStrValues(false),
152     filterCount(0),
153     projectCount(0),
154     sendRidsAtDelivery(false),
155     ridMap(0),
156     gotAbsRids(false),
157     gotValues(false),
158     hasScan(false),
159     validCPData(false),
160     minVal(MAX64),
161     maxVal(MIN64),
162     lbidForCP(0),
163     busyLoaderCount(0),
164     physIO(0),
165     cachedIO(0),
166     touchedBlocks(0),
167     LBIDTrace(false),
168     fBusy(false),
169     doJoin(false),
170     hasFilterStep(false),
171     filtOnString(false),
172     prefetchThreshold(prefetch),
173     hasDictStep(false),
174     sockIndex(0),
175     endOfJoinerRan(false),
176     processorThreads(_processorThreads),
177     //processorThreads(32),
178     //ptMask(processorThreads - 1),
179     firstInstance(true)
180 {
181     // promote processorThreads to next power of 2.  also need to change the name to bucketCount or similar
182     processorThreads = nextPowOf2(processorThreads);
183     ptMask = processorThreads - 1;
184 
185     pp.setLogicalBlockMode(true);
186     pp.setBlockPtr((int*) blockData);
187     sendThread = bppst;
188     pthread_mutex_init(&objLock, NULL);
189     initBPP(b);
190 }
191 
192 #if 0
193 BatchPrimitiveProcessor::BatchPrimitiveProcessor(const BatchPrimitiveProcessor& bpp)
194 {
195     throw logic_error("copy BPP deprecated");
196 }
197 #endif
198 
~BatchPrimitiveProcessor()199 BatchPrimitiveProcessor::~BatchPrimitiveProcessor()
200 {
201     //FIXME: just do a sync fetch
202     counterLock.lock(); // need to make sure the loader has exited
203 
204     while (busyLoaderCount > 0)
205     {
206         counterLock.unlock();
207         usleep(100000);
208         counterLock.lock();
209     }
210 
211     counterLock.unlock();
212     pthread_mutex_destroy(&objLock);
213 }
214 
215 /**
216  * InitBPP Parses the creation messages from BatchPrimitiveProcessor-JL::createBPP()
217  * Refer to that fcn for message format info.
218  */
219 
initBPP(ByteStream & bs)220 void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
221 {
222     uint32_t i;
223     uint8_t tmp8;
224     Command::CommandType type;
225 
226     bs.advance(sizeof(ISMPacketHeader));  // skip the header
227     bs >> tmp8;
228     ot = static_cast<BPSOutputType>(tmp8);
229     bs >> txnID;
230     bs >> sessionID;
231     bs >> stepID;
232     bs >> uniqueID;
233     bs >> versionInfo;
234 
235     bs >> tmp8;
236     needStrValues = tmp8 & NEED_STR_VALUES;
237     gotAbsRids = tmp8 & GOT_ABS_RIDS;
238     gotValues = tmp8 & GOT_VALUES;
239     LBIDTrace = tmp8 & LBID_TRACE;
240     sendRidsAtDelivery = tmp8 & SEND_RIDS_AT_DELIVERY;
241     doJoin = tmp8 & HAS_JOINER;
242     hasRowGroup = tmp8 & HAS_ROWGROUP;
243     getTupleJoinRowGroupData = tmp8 & JOIN_ROWGROUP_DATA;
244 
245     // This used to signify that there was input row data from previous jobsteps, and
246     // it never quite worked right. No need to fix it or update it; all BPP's have started
247     // with a scan for years.  Took it out.
248     assert(!hasRowGroup);
249 
250     bs >> bop;
251     bs >> forHJ;
252 
253     if (ot == ROW_GROUP)
254     {
255         bs >> outputRG;
256         //outputRG.setUseStringTable(true);
257         bs >> tmp8;
258 
259         if (tmp8)
260         {
261             fe1.reset(new FuncExpWrapper());
262             bs >> *fe1;
263             bs >> fe1Input;
264         }
265 
266         bs >> tmp8;
267 
268         if (tmp8)
269         {
270             fe2.reset(new FuncExpWrapper());
271             bs >> *fe2;
272             bs >> fe2Output;
273         }
274     }
275 
276     if (doJoin)
277     {
278         pthread_mutex_lock(&objLock);
279 
280         if (ot == ROW_GROUP)
281         {
282             bs >> joinerCount;
283 // 			cout << "joinerCount = " << joinerCount << endl;
284             joinTypes.reset(new JoinType[joinerCount]);
285 
286             tJoiners.reset(new boost::shared_array<boost::shared_ptr<TJoiner> >[joinerCount]);
287             for (uint j = 0; j < joinerCount; ++j)
288                 tJoiners[j].reset(new boost::shared_ptr<TJoiner>[processorThreads]);
289 
290             //_pools.reset(new boost::shared_ptr<utils::SimplePool>[joinerCount]);
291             tlJoiners.reset(new boost::shared_array<boost::shared_ptr<TLJoiner> >[joinerCount]);
292             for (uint j = 0; j < joinerCount; ++j)
293                 tlJoiners[j].reset(new boost::shared_ptr<TLJoiner>[processorThreads]);
294 
295             addToJoinerLocks.reset(new boost::scoped_array<boost::mutex>[joinerCount]);
296             for (uint j = 0; j < joinerCount; ++j)
297                 addToJoinerLocks[j].reset(new boost::mutex[processorThreads]);
298 
299             smallSideDataLocks.reset(new boost::mutex[joinerCount]);
300             tJoinerSizes.reset(new std::atomic<uint32_t>[joinerCount]);
301             largeSideKeyColumns.reset(new uint32_t[joinerCount]);
302             tlLargeSideKeyColumns.reset(new vector<uint32_t>[joinerCount]);
303             typelessJoin.reset(new bool[joinerCount]);
304             tlKeyLengths.reset(new uint32_t[joinerCount]);
305 
306             storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
307             for (uint j = 0; j < joinerCount; ++j)
308                 storedKeyAllocators[j].setUseLock(true);
309 
310             joinNullValues.reset(new uint64_t[joinerCount]);
311             doMatchNulls.reset(new bool[joinerCount]);
312             joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
313             hasJoinFEFilters = false;
314             hasSmallOuterJoin = false;
315 
316             for (i = 0; i < joinerCount; i++)
317             {
318                 doMatchNulls[i] = false;
319                 uint32_t tmp32;
320                 bs >> tmp32;
321                 tJoinerSizes[i] = tmp32;
322                 //bs >> tJoinerSizes[i];
323                 //cout << "joiner size = " << tJoinerSizes[i] << endl;
324                 bs >> joinTypes[i];
325                 bs >> tmp8;
326                 typelessJoin[i] = (bool) tmp8;
327 
328                 if (joinTypes[i] & WITHFCNEXP)
329                 {
330                     hasJoinFEFilters = true;
331                     joinFEFilters[i].reset(new FuncExpWrapper());
332                     bs >> *joinFEFilters[i];
333                 }
334 
335                 if (joinTypes[i] & SMALLOUTER)
336                     hasSmallOuterJoin = true;
337 
338                 if (!typelessJoin[i])
339                 {
340                     bs >> joinNullValues[i];
341                     bs >> largeSideKeyColumns[i];
342                     //cout << "large side key is " << largeSideKeyColumns[i] << endl;
343                     for (uint j = 0; j < processorThreads; ++j)
344                         tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher()));
345                 }
346                 else
347                 {
348                     deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
349                     bs >> tlKeyLengths[i];
350                     //storedKeyAllocators[i] = PoolAllocator();
351                     for (uint j = 0; j < processorThreads; ++j)
352                         tlJoiners[i][j].reset(new TLJoiner(10,
353                                                            TupleJoiner::TypelessDataHasher(&outputRG,
354                                                                                &tlLargeSideKeyColumns[i]),
355                                                            TupleJoiner::TypelessDataComparator(&outputRG,
356                                                                                    &tlLargeSideKeyColumns[i])));
357                 }
358             }
359 
360             if (hasJoinFEFilters)
361             {
362                 joinFERG.reset(new RowGroup());
363                 bs >> *joinFERG;
364             }
365 
366             if (getTupleJoinRowGroupData)
367             {
368                 deserializeVector(bs, smallSideRGs);
369 // 				cout << "deserialized " << smallSideRGs.size() << " small-side rowgroups\n";
370                 idbassert(smallSideRGs.size() == joinerCount);
371                 smallSideRowLengths.reset(new uint32_t[joinerCount]);
372                 smallSideRowData.reset(new RGData[joinerCount]);
373                 smallNullRowData.reset(new RGData[joinerCount]);
374                 smallNullPointers.reset(new Row::Pointer[joinerCount]);
375                 ssrdPos.reset(new uint64_t[joinerCount]);
376 
377                 for (i = 0; i < joinerCount; i++)
378                 {
379                     smallSideRowLengths[i] = smallSideRGs[i].getRowSize();;
380                     smallSideRowData[i] = RGData(smallSideRGs[i], tJoinerSizes[i]);
381 //					smallSideRowData[i].reset(new uint8_t[
382 //					  smallSideRGs[i].getEmptySize() +
383 //					  (uint64_t) smallSideRowLengths[i] * tJoinerSizes[i]]);
384                     smallSideRGs[i].setData(&smallSideRowData[i]);
385                     smallSideRGs[i].resetRowGroup(0);
386                     ssrdPos[i] = smallSideRGs[i].getEmptySize();
387 
388                     if (joinTypes[i] & (LARGEOUTER | SEMI | ANTI))
389                     {
390                         Row smallRow;
391                         smallSideRGs[i].initRow(&smallRow);
392                         smallNullRowData[i] = RGData(smallSideRGs[i], 1);
393                         smallSideRGs[i].setData(&smallNullRowData[i]);
394                         smallSideRGs[i].getRow(0, &smallRow);
395                         smallRow.initToNull();
396                         smallNullPointers[i] = smallRow.getPointer();
397                         smallSideRGs[i].setData(&smallSideRowData[i]);
398                     }
399                 }
400 
401                 bs >> largeSideRG;
402                 bs >> joinedRG;
403 // 				cout << "got the joined Rowgroup: " << joinedRG.toString() << "\n";
404             }
405         }
406 
407 #ifdef __FreeBSD__
408         pthread_mutex_unlock(&objLock);
409 #endif
410     }
411 
412     bs >> filterCount;
413     filterSteps.resize(filterCount);
414     //cout << "deserializing " << filterCount << " filters\n";
415     hasScan = false;
416     hasPassThru = false;
417 
418     for (i = 0; i < filterCount; ++i)
419     {
420         //cout << "deserializing step " << i << endl;
421         filterSteps[i] = SCommand(Command::makeCommand(bs, &type, filterSteps));
422 
423         if (type == Command::COLUMN_COMMAND)
424         {
425             ColumnCommand* col = (ColumnCommand*) filterSteps[i].get();
426 
427             if (col->isScan())
428                 hasScan = true;
429 
430             if (bop == BOP_OR)
431                 col->setScan(true);
432         }
433         else if (type == Command::FILTER_COMMAND)
434         {
435             hasFilterStep = true;
436 
437             if (dynamic_cast<StrFilterCmd*>(filterSteps[i].get()) != NULL)
438                 filtOnString = true;
439         }
440         else if (type == Command::DICT_STEP || type == Command::RID_TO_STRING)
441             hasDictStep = true;
442     }
443 
444     bs >> projectCount;
445     //cout << "deserializing " << projectCount << " projected columns\n\n";
446     projectSteps.resize(projectCount);
447 
448     for (i = 0; i < projectCount; ++i)
449     {
450         //cout << "deserializing step " << i << endl;
451         projectSteps[i] = SCommand(Command::makeCommand(bs, &type, projectSteps));
452 
453         if (type == Command::PASS_THRU)
454             hasPassThru = true;
455         else if (type == Command::DICT_STEP || type == Command::RID_TO_STRING)
456             hasDictStep = true;
457     }
458 
459     if (ot == ROW_GROUP)
460     {
461         bs >> tmp8;
462 
463         if (tmp8 > 0)
464         {
465             bs >> fAggregateRG;
466             fAggregator.reset(new RowAggregation);
467             bs >> *(fAggregator.get());
468 
469             // If there's UDAF involved, set up for PM processing
470             for (uint64_t i = 0; i < fAggregator->getAggFunctions().size(); i++)
471             {
472                 RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fAggregator->getAggFunctions()[i].get());
473 
474                 if (rowUDAF)
475                 {
476                     // On the PM, the aux column is not sent, but rather is output col + 1.
477                     rowUDAF->fAuxColumnIndex = rowUDAF->fOutputColumnIndex + 1;
478                     // Set the PM flag in case the UDAF cares.
479                     rowUDAF->fUDAFContext.setContextFlags(rowUDAF->fUDAFContext.getContextFlags()
480                                                           | mcsv1sdk::CONTEXT_IS_PM);
481                 }
482             }
483         }
484     }
485 
486     initProcessor();
487 }
488 
489 /**
490  * resetBPP Parses the run messages from BatchPrimitiveProcessor-JL::runBPP()
491  * Refer to that fcn for message format info.
492  */
resetBPP(ByteStream & bs,const SP_UM_MUTEX & w,const SP_UM_IOSOCK & s)493 void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w,
494                                        const SP_UM_IOSOCK& s)
495 {
496     uint32_t i;
497     vector<uint64_t> preloads;
498 
499     pthread_mutex_lock(&objLock);
500 
501     writelock = w;
502     sock = s;
503     newConnection = true;
504 
505     // skip the header, sessionID, stepID, uniqueID, and priority
506     bs.advance(sizeof(ISMPacketHeader) + 16);
507     bs >> dbRoot;
508     bs >> count;
509     bs >> ridCount;
510 
511     if (gotAbsRids)
512     {
513         assert(0);
514         memcpy(absRids.get(), bs.buf(), ridCount << 3);
515         bs.advance(ridCount << 3);
516         /* TODO: this loop isn't always necessary or sensible */
517         ridMap = 0;
518         baseRid = absRids[0] & 0xffffffffffffe000ULL;
519 
520         for (uint32_t i = 0; i < ridCount; i++)
521         {
522             relRids[i] = absRids[i] - baseRid;
523             ridMap |= 1 << (relRids[i] >> 10);
524         }
525     }
526     else
527     {
528         bs >> ridMap;
529         bs >> baseRid;
530         memcpy(relRids, bs.buf(), ridCount << 1);
531         bs.advance(ridCount << 1);
532     }
533 
534     if (gotValues)
535     {
536         memcpy(values, bs.buf(), ridCount << 3);
537         bs.advance(ridCount << 3);
538     }
539 
540     for (i = 0; i < filterCount; ++i)
541     {
542         filterSteps[i]->resetCommand(bs);
543     }
544 
545     for (i = 0; i < projectCount; ++i)
546     {
547         projectSteps[i]->resetCommand(bs);
548     }
549 
550     idbassert(bs.length() == 0);
551 
552     /* init vars not part of the BS */
553     currentBlockOffset = 0;
554     memset(relLBID.get(), 0, sizeof(uint64_t) * (projectCount + 1));
555     memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 1));
556 
557     buildVSSCache(count);
558 #ifdef __FreeBSD__
559     pthread_mutex_unlock(&objLock);
560 #endif
561 }
562 
563 // This version of addToJoiner() is multithreaded.  Values are first
564 // hashed into thread-local vectors corresponding to the shared hash
565 // tables.  Once the incoming values are organized locally, it grabs
566 // the lock for each shared table and inserts them there.
addToJoiner(ByteStream & bs)567 void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
568 {
569 /*  to get wall-time of hash table construction
570     idbassert(processorThreads != 0);
571     if (firstCallTime.is_not_a_date_time())
572         firstCallTime = boost::posix_time::microsec_clock::universal_time();
573 */
574 
575     uint32_t count, i, joinerNum, tlIndex, startPos, bucket;
576 #pragma pack(push,1)
577     struct JoinerElements
578     {
579         uint64_t key;
580         uint32_t value;
581     } *arr;
582 #pragma pack(pop)
583 
584     /* skip the header */
585     bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t));
586 
587     bs >> count;
588     bs >> startPos;
589 
590     if (ot == ROW_GROUP)
591     {
592         bs >> joinerNum;
593         idbassert(joinerNum < joinerCount);
594         arr = (JoinerElements*) bs.buf();
595 
596         std::atomic<uint32_t> &tJoinerSize = tJoinerSizes[joinerNum];
597 
598         // XXXPAT: enormous if stmts are evil.  TODO: move each block into
599         // properly-named functions for clarity.
600         if (typelessJoin[joinerNum])
601         {
602             utils::VLArray<vector<pair<TypelessData, uint32_t> > > tmpBuckets(processorThreads);
603             TypelessData tlLargeKey;
604             uint8_t nullFlag;
605             PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum];
606             // this first loop hashes incoming values into vectors that parallel the hash tables.
607             uint nullCount = 0;
608             for (i = 0; i < count; ++i)
609             {
610                 bs >> nullFlag;
611                 if (nullFlag == 0)
612                 {
613                     tlLargeKey.deserialize(bs, storedKeyAllocator);
614                     bs >> tlIndex;
615                     bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[joinerNum]) & ptMask;
616                     tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex));
617                 }
618                 else
619                     ++nullCount;
620             }
621             tJoinerSize -= nullCount;
622 
623             bool done = false, didSomeWork;
624             //uint loopCounter = 0, noWorkCounter = 0;
625             // this loop moves the elements from each vector into its corresponding hash table.
626             while (!done)
627             {
628                 //++loopCounter;
629                 done = true;
630                 didSomeWork = false;
631                 for (i = 0; i < processorThreads; ++i)
632                 {
633                     if (!tmpBuckets[i].empty())
634                     {
635                         bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
636                         if (!gotIt)
637                         {
638                             done = false;   // didn't get it, don't block, try the next bucket
639                             continue;
640                         }
641                         for (auto &element : tmpBuckets[i])
642                             tlJoiners[joinerNum][i]->insert(element);
643                         addToJoinerLocks[joinerNum][i].unlock();
644                         tmpBuckets[i].clear();
645                         didSomeWork = true;
646                     }
647                 }
648                 // if this iteration did no useful work, everything we need is locked; wait briefly
649                 // and try again.
650                 if (!done && !didSomeWork)
651                 {
652                     ::usleep(500 * processorThreads);
653                     //++noWorkCounter;
654                 }
655             }
656             //cout << "TL join insert.  Took " << loopCounter << " loops" << endl;
657 
658         }
659         else
660         {
661             boost::shared_array<boost::shared_ptr<TJoiner> > tJoiner = tJoiners[joinerNum];
662             uint64_t nullValue = joinNullValues[joinerNum];
663             bool &l_doMatchNulls = doMatchNulls[joinerNum];
664             joblist::JoinType joinType = joinTypes[joinerNum];
665             utils::VLArray<vector<pair<uint64_t, uint32_t> > > tmpBuckets(processorThreads);
666 
667             if (joinType & MATCHNULLS)
668             {
669                 // this first loop hashes incoming values into vectors that parallel the hash tables.
670                 for (i = 0; i < count; ++i)
671                 {
672                     /* A minor optimization: the matchnull logic should only be used with
673                      * the jointype specifies it and there's a null value in the small side */
674                     if (!l_doMatchNulls && arr[i].key == nullValue)
675                         l_doMatchNulls = true;
676                     bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask;
677                     tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
678                 }
679 
680 
681                 bool done = false, didSomeWork;
682                 //uint loopCounter = 0, noWorkCounter = 0;
683                 // this loop moves the elements from each vector into its corresponding hash table.
684                 while (!done)
685                 {
686                     //++loopCounter;
687                     done = true;
688                     didSomeWork = false;
689                     for (i = 0; i < processorThreads; ++i)
690                     {
691                         if (!tmpBuckets[i].empty())
692                         {
693                             bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
694                             if (!gotIt)
695                             {
696                                 done = false;   // didn't get it, don't block, try the next bucket
697                                 continue;
698                             }
699                             for (auto &element : tmpBuckets[i])
700                                 tJoiners[joinerNum][i]->insert(element);
701                             addToJoinerLocks[joinerNum][i].unlock();
702                             tmpBuckets[i].clear();
703                             didSomeWork = true;
704                         }
705                     }
706                     // if this iteration did no useful work, everything we need is locked; wait briefly
707                     // and try again.
708                     if (!done && !didSomeWork)
709                     {
710                         ::usleep(500 * processorThreads);
711                         //++noWorkCounter;
712                     }
713                 }
714 
715                 //cout << "T numeric join insert.  Took " << loopCounter << " loops" << endl;
716             }
717             else
718             {
719                 // this first loop hashes incoming values into vectors that parallel the hash tables.
720                 for (i = 0; i < count; ++i)
721                 {
722                     bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask;
723                     tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
724                 }
725 
726                 bool done = false;
727                 bool didSomeWork;
728                 //uint loopCounter = 0, noWorkCounter = 0;
729                 // this loop moves the elements from each vector into its corresponding hash table.
730                 while (!done)
731                 {
732                     //++loopCounter;
733                     done = true;
734                     didSomeWork = false;
735                     for (i = 0; i < processorThreads; ++i)
736                     {
737                         if (!tmpBuckets[i].empty())
738                         {
739                             bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
740                             if (!gotIt)
741                             {
742                                 done = false;   // didn't get it, don't block, try the next bucket
743                                 continue;
744                             }
745                             for (auto &element : tmpBuckets[i])
746                                 tJoiners[joinerNum][i]->insert(element);
747                             addToJoinerLocks[joinerNum][i].unlock();
748                             tmpBuckets[i].clear();
749                             didSomeWork = true;
750                         }
751                     }
752                     // if this iteration did no useful work, everything we need is locked; wait briefly
753                     // and try again.
754                     if (!done && !didSomeWork)
755                     {
756                         ::usleep(500 * processorThreads);
757                         //++noWorkCounter;
758                     }
759 
760                 }
761                 //cout << "T numeric join insert 2.  Took " << loopCounter << " loops," <<
762                 //    " unproductive iterations = " << noWorkCounter << endl;
763             }
764         }
765 
766         if (!typelessJoin[joinerNum])
767             bs.advance(count * sizeof(JoinerElements));
768 
769         if (getTupleJoinRowGroupData)
770         {
771             RowGroup& smallSide = smallSideRGs[joinerNum];
772             RGData offTheWire;
773 
774             // TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid
775             // the extra copying.
776             offTheWire.deserialize(bs);
777             boost::mutex::scoped_lock lk(smallSideDataLocks[joinerNum]);
778             smallSide.setData(&smallSideRowData[joinerNum]);
779             smallSide.append(offTheWire, startPos);
780 
781             /*  This prints the row data
782             			smallSideRGs[joinerNum].initRow(&r);
783             			for (i = 0; i < (tJoinerSizes[joinerNum] * smallSideRowLengths[joinerNum]); i+=r.getSize()) {
784             				r.setData(&smallSideRowData[joinerNum][i + smallSideRGs[joinerNum].getEmptySize()]);
785             				cout << " got row: " << r.toString() << endl;
786             			}
787             */
788         }
789     }
790 
791     idbassert(bs.length() == 0);
792 }
793 
doneSendingJoinerData()794 void BatchPrimitiveProcessor::doneSendingJoinerData()
795 {
796 	/*  to get wall-time of hash table construction
797     if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000))
798     {
799         boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
800         Logger logger;
801         ostringstream os;
802         os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime;
803         logger.logMessage(os.str());
804         cout << os.str() << endl;
805     }
806 	*/
807 }
808 
endOfJoiner()809 int BatchPrimitiveProcessor::endOfJoiner()
810 {
811     /* Wait for all joiner elements to be added */
812     uint32_t i;
813     size_t currentSize;
814     // it should be safe to run this without grabbing this lock
815     //boost::mutex::scoped_lock scoped(addToJoinerLock);
816 
817     if (endOfJoinerRan)
818         return 0;
819 
820     // minor hack / optimization.  The instances not inserting the table data don't
821     // need to check that the table is complete.
822     if (!firstInstance)
823     {
824         endOfJoinerRan = true;
825         pthread_mutex_unlock(&objLock);
826         return 0;
827     }
828 
829     for (i = 0; i < joinerCount; i++)
830     {
831         if (!typelessJoin[i])
832         {
833             currentSize = 0;
834             for (uint j = 0; j < processorThreads; ++j)
835                 if (!tJoiners[i] || !tJoiners[i][j])
836                     return -1;
837                 else
838                     currentSize += tJoiners[i][j]->size();
839             if (currentSize != tJoinerSizes[i])
840                 return -1;
841             //if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
842             //    return -1;
843         }
844         else
845         {
846             currentSize = 0;
847             for (uint j = 0; j < processorThreads; ++j)
848                 if (!tlJoiners[i] || !tlJoiners[i][j])
849                     return -1;
850                 else
851                     currentSize += tlJoiners[i][j]->size();
852             if (currentSize != tJoinerSizes[i])
853                 return -1;
854             //if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
855             //    return -1;
856         }
857     }
858 
859     endOfJoinerRan = true;
860 
861 #ifndef __FreeBSD__
862     pthread_mutex_unlock(&objLock);
863 #endif
864     return 0;
865 }
866 
initProcessor()867 void BatchPrimitiveProcessor::initProcessor()
868 {
869     uint32_t i, j;
870 
871     if (gotAbsRids || needStrValues || hasRowGroup)
872         absRids.reset(new uint64_t[LOGICAL_BLOCK_RIDS]);
873 
874     if (needStrValues)
875         strValues.reset(new string[LOGICAL_BLOCK_RIDS]);
876 
877     outMsgSize = defaultBufferSize;
878     outputMsg.reset(new uint8_t[outMsgSize]);
879 
880     if (ot == ROW_GROUP)
881     {
882         // calculate the projection -> outputRG mapping
883         projectionMap.reset(new int[projectCount]);
884         bool* reserved = (bool*)alloca(outputRG.getColumnCount() * sizeof(bool));
885 
886         for (i = 0; i < outputRG.getColumnCount(); i++)
887             reserved[i] = false;
888 
889         for (i = 0; i < projectCount; i++)
890         {
891             for (j = 0; j < outputRG.getColumnCount(); j++)
892                 if (projectSteps[i]->getTupleKey() == outputRG.getKeys()[j] && !reserved[j])
893                 {
894                     projectionMap[i] = j;
895                     reserved[j] = true;
896                     break;
897                 }
898 
899             if (j == outputRG.getColumnCount())
900                 projectionMap[i] = -1;
901         }
902 
903         if (doJoin)
904         {
905             outputRG.initRow(&oldRow);
906             outputRG.initRow(&newRow);
907             tmpKeyAllocators.reset(new FixedAllocator[joinerCount]);
908 
909             for (i = 0; i < joinerCount; i++)
910                 if (typelessJoin[i])
911                     tmpKeyAllocators[i] = FixedAllocator(tlKeyLengths[i], true);
912 
913             tSmallSideMatches.reset(new MatchedData[joinerCount]);
914             keyColumnProj.reset(new bool[projectCount]);
915 
916             for (i = 0; i < projectCount; i++)
917             {
918                 keyColumnProj[i] = false;
919 
920                 for (j = 0; j < joinerCount; j++)
921                 {
922                     if (!typelessJoin[j])
923                     {
924                         if (projectionMap[i] == (int) largeSideKeyColumns[j])
925                         {
926                             keyColumnProj[i] = true;
927                             break;
928                         }
929                     }
930                     else
931                     {
932                         for (uint32_t k = 0; k < tlLargeSideKeyColumns[j].size(); k++)
933                         {
934                             if (projectionMap[i] == (int) tlLargeSideKeyColumns[j][k])
935                             {
936                                 keyColumnProj[i] = true;
937                                 break;
938                             }
939                         }
940                     }
941                 }
942             }
943 
944             if (hasJoinFEFilters)
945             {
946                 joinFERG->initRow(&joinFERow, true);
947                 joinFERowData.reset(new uint8_t[joinFERow.getSize()]);
948                 joinFERow.setData(joinFERowData.get());
949                 joinFEMappings.reset(new shared_array<int>[joinerCount + 1]);
950 
951                 for (i = 0; i < joinerCount; i++)
952                     joinFEMappings[i] = makeMapping(smallSideRGs[i], *joinFERG);
953 
954                 joinFEMappings[joinerCount] = makeMapping(largeSideRG, *joinFERG);
955             }
956         }
957 
958         /*
959         Calculate the FE1 -> projection mapping
960         Calculate the projection step -> FE1 input mapping
961         */
962         if (fe1)
963         {
964             fe1ToProjection = makeMapping(fe1Input, outputRG);
965             projectForFE1.reset(new int[projectCount]);
966             bool* reserved = (bool*)alloca(fe1Input.getColumnCount() * sizeof(bool));
967 
968             for (i = 0; i < fe1Input.getColumnCount(); i++)
969                 reserved[i] = false;
970 
971             for (i = 0; i < projectCount; i++)
972             {
973                 projectForFE1[i] = -1;
974 
975                 for (j = 0; j < fe1Input.getColumnCount(); j++)
976                 {
977                     if (projectSteps[i]->getTupleKey() == fe1Input.getKeys()[j] && !reserved[j])
978                     {
979                         reserved[j] = true;
980                         projectForFE1[i] = j;
981                         break;
982                     }
983                 }
984             }
985 
986             fe1Input.initRow(&fe1In);
987             outputRG.initRow(&fe1Out);
988         }
989 
990         if (fe2)
991         {
992             fe2Input = (doJoin ? &joinedRG : &outputRG);
993             fe2Mapping = makeMapping(*fe2Input, fe2Output);
994             fe2Input->initRow(&fe2In);
995             fe2Output.initRow(&fe2Out);
996         }
997 
998         if (getTupleJoinRowGroupData)
999         {
1000             gjrgPlaceHolders.reset(new uint32_t[joinerCount]);
1001             outputRG.initRow(&largeRow);
1002             joinedRG.initRow(&joinedRow);
1003             joinedRG.initRow(&baseJRow, true);
1004             smallRows.reset(new Row[joinerCount]);
1005 
1006             for (i = 0; i < joinerCount; i++)
1007                 smallSideRGs[i].initRow(&smallRows[i], true);
1008 
1009             baseJRowMem.reset(new uint8_t[baseJRow.getSize()]);
1010             baseJRow.setData(baseJRowMem.get());
1011             gjrgMappings.reset(new shared_array<int>[joinerCount + 1]);
1012 
1013             for (i = 0; i < joinerCount; i++)
1014                 gjrgMappings[i] = makeMapping(smallSideRGs[i], joinedRG);
1015 
1016             gjrgMappings[joinerCount] = makeMapping(outputRG, joinedRG);
1017         }
1018     }
1019 
1020     // @bug 1051
1021     if (hasFilterStep)
1022     {
1023         for (uint64_t i = 0; i < 2; i++)
1024         {
1025             fFiltRidCount[i] = 0;
1026             fFiltCmdRids[i].reset(new uint16_t[LOGICAL_BLOCK_RIDS]);
1027             fFiltCmdValues[i].reset(new int64_t[LOGICAL_BLOCK_RIDS]);
1028 
1029             if (filtOnString) fFiltStrValues[i].reset(new string[LOGICAL_BLOCK_RIDS]);
1030         }
1031     }
1032 
1033     /* init the Commands */
1034     if (filterCount > 0)
1035     {
1036         for (i = 0; i < (uint32_t) filterCount - 1; ++i)
1037         {
1038 // 			cout << "prepping filter " << i << endl;
1039             filterSteps[i]->setBatchPrimitiveProcessor(this);
1040 
1041             if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP)
1042                 filterSteps[i]->prep(OT_BOTH, true);
1043             else if (filterSteps[i]->filterFeeder() != Command::NOT_FEEDER)
1044                 filterSteps[i]->prep(OT_BOTH, false);
1045             else
1046                 filterSteps[i]->prep(OT_RID, false);
1047         }
1048 
1049 // 		cout << "prepping filter " << i << endl;
1050         filterSteps[i]->setBatchPrimitiveProcessor(this);
1051         filterSteps[i]->prep(OT_BOTH, false);
1052     }
1053 
1054     for (i = 0; i < projectCount; ++i)
1055     {
1056 // 		cout << "prepping projection " << i << endl;
1057         projectSteps[i]->setBatchPrimitiveProcessor(this);
1058 
1059         if (noVB)
1060             projectSteps[i]->prep(OT_BOTH, false);
1061         else
1062             projectSteps[i]->prep(OT_DATAVALUE, false);
1063 
1064         if (0 < filterCount )
1065         {
1066             //if there is an rtscommand with a passThru, the passThru must make its own absRids
1067             //unless there is only one project step, then the last filter step can make absRids
1068             RTSCommand* rts = dynamic_cast<RTSCommand*>(projectSteps[i].get());
1069 
1070             if (rts && rts->isPassThru())
1071             {
1072                 if (1 == projectCount)
1073                     filterSteps[filterCount - 1]->setMakeAbsRids(true);
1074                 else rts->setAbsNull();
1075             }
1076         }
1077     }
1078 
1079     if (fAggregator.get() != NULL)
1080     {
1081         //fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]);
1082         fAggRowGroupData.reinit(fAggregateRG);
1083         fAggregateRG.setData(&fAggRowGroupData);
1084 
1085         if (doJoin)
1086         {
1087             fAggregator->setInputOutput(fe2 ? fe2Output : joinedRG, &fAggregateRG);
1088             fAggregator->setJoinRowGroups(&smallSideRGs, &largeSideRG);
1089         }
1090         else
1091             fAggregator->setInputOutput(fe2 ? fe2Output : outputRG, &fAggregateRG);
1092     }
1093 
1094     minVal = MAX64;
1095     maxVal = MIN64;
1096 
1097     // @bug 1269, initialize data used by execute() for async loading blocks
1098     // +1 for the scan filter step with no predicate, if any
1099     relLBID.reset(new uint64_t[projectCount + 1]);
1100     asyncLoaded.reset(new bool[projectCount + 1]);
1101 }
1102 
1103 /* This version does a join on projected rows */
executeTupleJoin()1104 void BatchPrimitiveProcessor::executeTupleJoin()
1105 {
1106     uint32_t newRowCount = 0, i, j;
1107     vector<uint32_t> matches;
1108     uint64_t largeKey;
1109     TypelessData tlLargeKey;
1110 
1111     outputRG.getRow(0, &oldRow);
1112     outputRG.getRow(0, &newRow);
1113 
1114     //cout << "before join, RG has " << outputRG.getRowCount() << " BPP ridcount= " << ridCount << endl;
1115     for (i = 0; i < ridCount && !sendThread->aborted(); i++, oldRow.nextRow())
1116     {
1117         /* Decide whether this large-side row belongs in the output.  The breaks
1118          * in the loop mean that it doesn't.
1119          *
1120          * In English the logic is:
1121          * 		Reject the row if there's no match and it's not an anti or an outer join
1122          *      or if there is a match and it's an anti join with no filter.
1123          * 		If there's an antijoin with a filter nothing can be eliminated at this stage.
1124          * 		If there's an antijoin where the join op should match NULL values, and there
1125          * 		  are NULL values to match against, but there is no filter, all rows can be eliminated.
1126          */
1127 
1128         //cout << "large side row: " << oldRow.toString() << endl;
1129         for (j = 0; j < joinerCount; j++)
1130         {
1131             bool found;
1132 
1133             if (UNLIKELY(joinTypes[j] & ANTI))
1134             {
1135                 if (joinTypes[j] & WITHFCNEXP)
1136                     continue;
1137                 else if (doMatchNulls[j])
1138                     break;
1139             }
1140 
1141 
1142             if (LIKELY(!typelessJoin[j]))
1143             {
1144                 //cout << "not typeless join\n";
1145                 bool isNull;
1146                 uint32_t colIndex = largeSideKeyColumns[j];
1147 
1148                 if (oldRow.isUnsigned(colIndex))
1149                     largeKey = oldRow.getUintField(colIndex);
1150                 else
1151                     largeKey = oldRow.getIntField(colIndex);
1152                 uint bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask;
1153 
1154                 bool joinerIsEmpty = tJoiners[j][bucket]->empty() ? true : false;
1155 
1156                 found = (tJoiners[j][bucket]->find(largeKey) != tJoiners[j][bucket]->end());
1157                 isNull = oldRow.isNullValue(colIndex);
1158                 /* These conditions define when the row is NOT in the result set:
1159                  *    - if the key is not in the small side, and the join isn't a large-outer or anti join
1160                  *    - if the key is NULL, and the join isn't anti- or large-outer
1161                  *    - if it's an anti-join and the key is either in the small side or it's NULL
1162                  */
1163 
1164                 if (((!found || isNull) && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
1165                         ((joinTypes[j] & ANTI) && !joinerIsEmpty && ((isNull && (joinTypes[j] & MATCHNULLS)) || (found && !isNull))))
1166                 {
1167                     //cout << " - not in the result set\n";
1168                     break;
1169                 }
1170 
1171                 //else
1172                 //	cout << " - in the result set\n";
1173             }
1174             else
1175             {
1176                 //cout << " typeless join\n";
1177                 // the null values are not sent by UM in typeless case.  null -> !found
1178                 tlLargeKey = makeTypelessKey(oldRow, tlLargeSideKeyColumns[j], tlKeyLengths[j],
1179                                              &tmpKeyAllocators[j]);
1180                 uint bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[j]) & ptMask;
1181                 found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end();
1182 
1183                 if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
1184                         (joinTypes[j] & ANTI))
1185                 {
1186 
1187                     /* Separated the ANTI join logic for readability.
1188                      *
1189                      */
1190                     if (joinTypes[j] & ANTI)
1191                     {
1192                         if (found)
1193                             break;
1194                         else if (joinTypes[j] & MATCHNULLS)
1195                         {
1196                             bool hasNull = false;
1197 
1198                             for (uint32_t z = 0; z < tlLargeSideKeyColumns[j].size(); z++)
1199                                 if (oldRow.isNullValue(tlLargeSideKeyColumns[j][z]))
1200                                 {
1201                                     hasNull = true;
1202                                     break;
1203                                 }
1204 
1205                             if (hasNull)  // keys with nulls match everything
1206                                 break;
1207                             else
1208                                 continue;    // non-null keys not in the small side
1209 
1210                             // are in the result
1211                         }
1212                         else    // signifies a not-exists query
1213                             continue;
1214                     }
1215 
1216                     break;
1217                 }
1218             }
1219         }
1220 
1221         if (j == joinerCount)
1222         {
1223             for (j = 0; j < joinerCount; j++)
1224             {
1225                 uint32_t matchCount;
1226 
1227                 /* The result is already known if...
1228                  *   -- anti-join with no fcnexp
1229                  *   -- semi-join with no fcnexp and not scalar
1230                  *
1231                  * The ANTI join case isn't just a shortcut.  getJoinResults() will produce results
1232                  * for a different case and generate the wrong result.  Need to fix that, later.
1233                  */
1234                 if ((joinTypes[j] & (SEMI | ANTI)) && !(joinTypes[j] & WITHFCNEXP) && !(joinTypes[j] & SCALAR))
1235                 {
1236                     tSmallSideMatches[j][newRowCount].push_back(-1);
1237                     continue;
1238                 }
1239 
1240                 getJoinResults(oldRow, j, tSmallSideMatches[j][newRowCount]);
1241                 matchCount = tSmallSideMatches[j][newRowCount].size();
1242 
1243                 if (joinTypes[j] & WITHFCNEXP)
1244                 {
1245                     vector<uint32_t> newMatches;
1246                     applyMapping(joinFEMappings[joinerCount], oldRow, &joinFERow);
1247 
1248                     for (uint32_t k = 0; k < matchCount; k++)
1249                     {
1250                         if (tSmallSideMatches[j][newRowCount][k] == (uint32_t) - 1)
1251                             smallRows[j].setPointer(smallNullPointers[j]);
1252                         else
1253                         {
1254                             smallSideRGs[j].getRow(tSmallSideMatches[j][newRowCount][k], &smallRows[j]);
1255                             //uint64_t rowOffset = ((uint64_t) tSmallSideMatches[j][newRowCount][k]) *
1256                             //		smallRows[j].getSize() + smallSideRGs[j].getEmptySize();
1257                             //smallRows[j].setData(&smallSideRowData[j][rowOffset]);
1258                         }
1259 
1260                         applyMapping(joinFEMappings[j], smallRows[j], &joinFERow);
1261 
1262                         if (joinFEFilters[j]->evaluate(&joinFERow))
1263                         {
1264                             /* The first match includes it in a SEMI join result and excludes it from an ANTI join
1265                              * result.  If it's SEMI & SCALAR however, it needs to continue.
1266                              */
1267                             newMatches.push_back(tSmallSideMatches[j][newRowCount][k]);
1268 
1269                             if ((joinTypes[j] & ANTI) || ((joinTypes[j] & (SEMI | SCALAR)) == SEMI))
1270                                 break;
1271                         }
1272                     }
1273 
1274                     tSmallSideMatches[j][newRowCount].swap(newMatches);
1275                     matchCount = tSmallSideMatches[j][newRowCount].size();
1276                 }
1277 
1278                 if (matchCount == 0 && (joinTypes[j] & LARGEOUTER))
1279                 {
1280                     tSmallSideMatches[j][newRowCount].push_back(-1);
1281                     matchCount = 1;
1282                 }
1283 
1284                 /* Scalar check */
1285                 if ((joinTypes[j] & SCALAR) && matchCount > 1)
1286                     throw scalar_exception();
1287 
1288                 /* Reverse the result for anti-join */
1289                 if (joinTypes[j] & ANTI)
1290                 {
1291                     if (matchCount == 0)
1292                     {
1293                         tSmallSideMatches[j][newRowCount].push_back(-1);
1294                         matchCount = 1;
1295                     }
1296                     else
1297                     {
1298                         tSmallSideMatches[j][newRowCount].clear();
1299                         matchCount = 0;
1300                     }
1301                 }
1302 
1303                 /* For all join types, no matches here means it's not in the result */
1304                 if (matchCount == 0)
1305                     break;
1306 
1307                 /* Pair non-scalar semi-joins with a NULL row */
1308                 if ((joinTypes[j] & SEMI) && !(joinTypes[j] & SCALAR))
1309                 {
1310                     tSmallSideMatches[j][newRowCount].clear();
1311                     tSmallSideMatches[j][newRowCount].push_back(-1);
1312                     matchCount = 1;
1313                 }
1314             }
1315 
1316             /* Finally, copy the row into the output */
1317             if (j == joinerCount)
1318             {
1319                 if (i != newRowCount)
1320                 {
1321                     values[newRowCount] = values[i];
1322                     relRids[newRowCount] = relRids[i];
1323                     copyRow(oldRow, &newRow);
1324                     //cout << "joined row: " << newRow.toString() << endl;
1325                     //memcpy(newRow.getData(), oldRow.getData(), oldRow.getSize());
1326                 }
1327 
1328                 newRowCount++;
1329                 newRow.nextRow();
1330             }
1331 
1332             //else
1333             //	cout << "j != joinerCount\n";
1334         }
1335     }
1336 
1337     ridCount = newRowCount;
1338     outputRG.setRowCount(ridCount);
1339 
1340     /* prints out the whole result set.
1341     	if (ridCount != 0) {
1342     		cout << "RG rowcount=" << outputRG.getRowCount() << " BPP ridcount=" << ridCount << endl;
1343     		for (i = 0; i < joinerCount; i++) {
1344     			for (j = 0; j < ridCount; j++) {
1345     				cout << "joiner " << i << " has " << tSmallSideMatches[i][j].size() << " entries" << endl;
1346     				cout << "row " << j << ":";
1347     				for (uint32_t k = 0; k < tSmallSideMatches[i][j].size(); k++)
1348     					cout << "  " << tSmallSideMatches[i][j][k];
1349     				cout << endl;
1350     			}
1351     			cout << endl;
1352     		}
1353     	}
1354     */
1355 }
1356 
1357 #ifdef PRIMPROC_STOPWATCH
execute(StopWatch * stopwatch)1358 void BatchPrimitiveProcessor::execute(StopWatch* stopwatch)
1359 #else
1360 void BatchPrimitiveProcessor::execute()
1361 #endif
1362 {
1363     uint32_t i, j;
1364 
1365     try
1366     {
1367         // Check memory up front
1368         if (MonitorProcMem::checkMemlimit())
1369         {
1370             throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1371         }
1372 #ifdef PRIMPROC_STOPWATCH
1373         stopwatch->start("BatchPrimitiveProcessor::execute first part");
1374 #endif
1375 
1376         // if only one scan step which has no predicate, async load all columns
1377         if (filterCount == 1 && hasScan)
1378         {
1379             ColumnCommand* col = dynamic_cast<ColumnCommand*>(filterSteps[0].get());
1380 
1381             if ((col != NULL) && (col->getFilterCount() == 0) && (col->getLBID() != 0))
1382             {
1383                 // stored in last pos in relLBID[] and asyncLoaded[]
1384                 uint64_t p = projectCount;
1385                 asyncLoaded[p] = asyncLoaded[p] && (relLBID[p] % blocksReadAhead != 0);
1386                 relLBID[p] += col->getWidth();
1387 
1388                 if (!asyncLoaded[p] && col->willPrefetch())
1389                 {
1390                     loadBlockAsync(col->getLBID(),
1391                                    versionInfo,
1392                                    txnID,
1393                                    col->getCompType(),
1394                                    &cachedIO,
1395                                    &physIO,
1396                                    LBIDTrace,
1397                                    sessionID,
1398                                    &counterLock,
1399                                    &busyLoaderCount,
1400                                    sendThread,
1401                                    &vssCache);
1402                     asyncLoaded[p] = true;
1403                 }
1404 
1405                 asyncLoadProjectColumns();
1406             }
1407         }
1408 
1409 #ifdef PRIMPROC_STOPWATCH
1410         stopwatch->stop("BatchPrimitiveProcessor::execute first part");
1411         stopwatch->start("BatchPrimitiveProcessor::execute second part");
1412 #endif
1413 
1414         // filters use relrids and values for intermediate results.
1415         if (bop == BOP_AND)
1416             for (j = 0; j < filterCount; ++j)
1417             {
1418 #ifdef PRIMPROC_STOPWATCH
1419                 stopwatch->start("- filterSteps[j]->execute()");
1420                 filterSteps[j]->execute();
1421                 stopwatch->stop("- filterSteps[j]->execute()");
1422 #else
1423                 filterSteps[j]->execute();
1424 #endif
1425             }
1426         else  			// BOP_OR
1427         {
1428 
1429             /* XXXPAT: This is a hacky impl of OR logic.  Each filter is configured to
1430             be a scan operation on init.  This code runs each independently and
1431             unions their output ridlists using accumulator.  At the end it turns
1432             accumulator into a final ridlist for subsequent steps.
1433 
1434             If there's a join or a passthru command in the projection list, the
1435             values array has to contain values from the last filter step.  In that
1436             case, the last filter step isn't part of the "OR" filter processing.
1437             JLF has added it to prep those operations, not to be a filter.
1438 
1439             7/7/09 update: the multiple-table join required relocating the join op.  It's
1440             no longer necessary to add the loader columncommand to the filter array.
1441             */
1442 
1443             bool accumulator[LOGICAL_BLOCK_RIDS];
1444 // 		uint32_t realFilterCount = ((forHJ || hasPassThru) ? filterCount - 1 : filterCount);
1445             uint32_t realFilterCount = filterCount;
1446 
1447             for (i = 0; i < LOGICAL_BLOCK_RIDS; i++)
1448                 accumulator[i] = false;
1449 
1450             if (!hasScan)  // there are input rids
1451                 for (i = 0; i < ridCount; i++)
1452                     accumulator[relRids[i]] = true;
1453 
1454             ridCount = 0;
1455 
1456             for (i = 0; i < realFilterCount; ++i)
1457             {
1458                 filterSteps[i]->execute();
1459 
1460                 if (! filterSteps[i]->filterFeeder())
1461                 {
1462                     for (j = 0; j < ridCount; j++)
1463                         accumulator[relRids[j]] = true;
1464 
1465                     ridCount = 0;
1466                 }
1467             }
1468 
1469             for (ridMap = 0, i = 0; i < LOGICAL_BLOCK_RIDS; ++i)
1470             {
1471                 if (accumulator[i])
1472                 {
1473                     relRids[ridCount] = i;
1474                     ridMap |= 1 << (relRids[ridCount] >> 10);
1475                     ++ridCount;
1476                 }
1477             }
1478         }
1479 
1480 #ifdef PRIMPROC_STOPWATCH
1481         stopwatch->stop("BatchPrimitiveProcessor::execute second part");
1482         stopwatch->start("BatchPrimitiveProcessor::execute third part");
1483 #endif
1484 
1485         if (projectCount > 0 || ot == ROW_GROUP)
1486         {
1487 #ifdef PRIMPROC_STOPWATCH
1488             stopwatch->start("- writeProjectionPreamble");
1489             writeProjectionPreamble();
1490             stopwatch->stop("- writeProjectionPreamble");
1491 #else
1492             writeProjectionPreamble();
1493 #endif
1494         }
1495 
1496         // async load blocks for project phase, if not alread loaded
1497         if (ridCount > 0)
1498         {
1499 #ifdef PRIMPROC_STOPWATCH
1500             stopwatch->start("- asyncLoadProjectColumns");
1501             asyncLoadProjectColumns();
1502             stopwatch->stop("- asyncLoadProjectColumns");
1503 #else
1504             asyncLoadProjectColumns();
1505 #endif
1506         }
1507 
1508 #ifdef PRIMPROC_STOPWATCH
1509         stopwatch->stop("BatchPrimitiveProcessor::execute third part");
1510         stopwatch->start("BatchPrimitiveProcessor::execute fourth part");
1511 #endif
1512 
1513         // projection commands read relrids and write output directly to a rowgroup
1514         // or the serialized bytestream
1515         if (ot != ROW_GROUP)
1516             for (j = 0; j < projectCount; ++j)
1517             {
1518                 projectSteps[j]->project();
1519             }
1520         else
1521         {
1522             /* Function & Expression group 1 processing
1523             	- project for FE1
1524             	- execute FE1 row by row
1525             	- if return value = true, map input row into the projection RG, adjust ridlist
1526             */
1527 #ifdef PRIMPROC_STOPWATCH
1528             stopwatch->start("- if(ot != ROW_GROUP) else");
1529 #endif
1530             outputRG.resetRowGroup(baseRid);
1531 
1532             if (fe1)
1533             {
1534                 uint32_t newRidCount = 0;
1535                 fe1Input.resetRowGroup(baseRid);
1536                 fe1Input.setRowCount(ridCount);
1537                 fe1Input.getRow(0, &fe1In);
1538                 outputRG.getRow(0, &fe1Out);
1539 
1540                 for (j = 0; j < projectCount; j++)
1541                     if (projectForFE1[j] != -1)
1542                         projectSteps[j]->projectIntoRowGroup(fe1Input, projectForFE1[j]);
1543 
1544                 for (j = 0; j < ridCount; j++, fe1In.nextRow())
1545                     if (fe1->evaluate(&fe1In))
1546                     {
1547                         applyMapping(fe1ToProjection, fe1In, &fe1Out);
1548                         relRids[newRidCount] = relRids[j];
1549                         values[newRidCount++] = values[j];
1550                         fe1Out.nextRow();
1551                     }
1552 
1553                 ridCount = newRidCount;
1554             }
1555 
1556             outputRG.setRowCount(ridCount);
1557 
1558             if (sendRidsAtDelivery)
1559             {
1560                 Row r;
1561                 outputRG.initRow(&r);
1562                 outputRG.getRow(0, &r);
1563 
1564                 for (j = 0; j < ridCount; ++j)
1565                 {
1566                     r.setRid(relRids[j]);
1567                     r.nextRow();
1568                 }
1569             }
1570 
1571             /* 7/7/09 PL: I Changed the projection alg to reduce block touches when there's
1572             a join.  The key columns get projected first, the join is executed to further
1573             reduce the ridlist, then the rest of the columns get projected */
1574 
1575             if (!doJoin)
1576             {
1577                 for (j = 0; j < projectCount; ++j)
1578                 {
1579 // 				cout << "projectionMap[" << j << "] = " << projectionMap[j] << endl;
1580                     if (projectionMap[j] != -1)
1581                     {
1582 #ifdef PRIMPROC_STOPWATCH
1583                         stopwatch->start("-- projectIntoRowGroup");
1584                         projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1585                         stopwatch->stop("-- projectIntoRowGroup");
1586 #else
1587                         projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1588 #endif
1589                     }
1590 
1591 //				else
1592 //					cout << "   no target found for OID " << projectSteps[j]->getOID() << endl;
1593                 }
1594             }
1595             else
1596             {
1597                 /* project the key columns.  If there's the filter IN the join, project everything.
1598                    Also need to project 'long' strings b/c executeTupleJoin may copy entire rows
1599                    using copyRow(), which will try to interpret the uninit'd string ptr.
1600                    Valgrind will legitimately complain about copying uninit'd values for the
1601                    other types but that is technically safe. */
1602                 for (j = 0; j < projectCount; j++)
1603                     if (keyColumnProj[j] || (projectionMap[j] != -1 && (hasJoinFEFilters ||
1604                         oldRow.isLongString(projectionMap[j]))))
1605                     {
1606 #ifdef PRIMPROC_STOPWATCH
1607                         stopwatch->start("-- projectIntoRowGroup");
1608                         projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1609                         stopwatch->stop("-- projectIntoRowGroup");
1610 #else
1611                         projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1612 #endif
1613                     }
1614 
1615 
1616 #ifdef PRIMPROC_STOPWATCH
1617                 stopwatch->start("-- executeTupleJoin()");
1618                 executeTupleJoin();
1619                 stopwatch->stop("-- executeTupleJoin()");
1620 #else
1621                 executeTupleJoin();
1622 #endif
1623 
1624                 /* project the non-key columns */
1625                 for (j = 0; j < projectCount; ++j)
1626                 {
1627                     if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters &&
1628                         !oldRow.isLongString(projectionMap[j]))
1629                     {
1630 #ifdef PRIMPROC_STOPWATCH
1631                         stopwatch->start("-- projectIntoRowGroup");
1632                         projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1633                         stopwatch->stop("-- projectIntoRowGroup");
1634 #else
1635                         projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
1636 #endif
1637                     }
1638                 }
1639             }
1640 
1641             /* The RowGroup is fully joined at this point.
1642             Add additional RowGroup processing here.
1643             TODO:  Try to clean up all of the switching */
1644 
1645             if (doJoin && (fe2 || fAggregator))
1646             {
1647                 bool moreRGs = true;
1648                 ByteStream preamble = *serialized;
1649                 initGJRG();
1650 
1651                 while (moreRGs && !sendThread->aborted())
1652                 {
1653                     /*
1654                     	generate 1 rowgroup (8192 rows max) of joined rows
1655                     	if there's an FE2, run it
1656                     		-pack results into a new rowgroup
1657                     		-if there are < 8192 rows in the new RG, continue
1658                     	if there's an agg, run it
1659                     	send the result
1660                     */
1661                     resetGJRG();
1662                     moreRGs = generateJoinedRowGroup(baseJRow);
1663                     *serialized << (uint8_t) !moreRGs;
1664 
1665                     if (fe2)
1666                     {
1667                         /* functionize this -> processFE2()*/
1668                         fe2Output.resetRowGroup(baseRid);
1669                         fe2Output.setDBRoot(dbRoot);
1670                         fe2Output.getRow(0, &fe2Out);
1671                         fe2Input->getRow(0, &fe2In);
1672 
1673                         for (j = 0; j < joinedRG.getRowCount(); j++, fe2In.nextRow())
1674                             if (fe2->evaluate(&fe2In))
1675                             {
1676                                 applyMapping(fe2Mapping, fe2In, &fe2Out);
1677                                 fe2Out.setRid(fe2In.getRelRid());
1678                                 fe2Output.incRowCount();
1679                                 fe2Out.nextRow();
1680                             }
1681                     }
1682 
1683                     RowGroup& nextRG = (fe2 ? fe2Output : joinedRG);
1684                     nextRG.setDBRoot(dbRoot);
1685 
1686                     if (fAggregator)
1687                     {
1688                         fAggregator->addRowGroup(&nextRG);
1689 
1690                         if ((currentBlockOffset + 1) == count && moreRGs == false)  // @bug4507, 8k
1691                         {
1692                             fAggregator->loadResult(*serialized);                   // @bug4507, 8k
1693                         }                                                           // @bug4507, 8k
1694                         else if (utils::MonitorProcMem::isMemAvailable())           // @bug4507, 8k
1695                         {
1696                             fAggregator->loadEmptySet(*serialized);                 // @bug4507, 8k
1697                         }                                                           // @bug4507, 8k
1698                         else                                                        // @bug4507, 8k
1699                         {
1700                             fAggregator->loadResult(*serialized);                   // @bug4507, 8k
1701                             fAggregator->aggReset();                                // @bug4507, 8k
1702                         }                                                           // @bug4507, 8k
1703                     }
1704                     else
1705                     {
1706                         //cerr <<" * serialzing " << nextRG.toString() << endl;
1707                         nextRG.serializeRGData(*serialized);
1708                         if (MonitorProcMem::checkMemlimit())
1709                         {
1710                             throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1711                         }
1712                     }
1713 
1714                     /* send the msg & reinit the BS */
1715                     if (moreRGs)
1716                     {
1717                         sendResponse();
1718                         serialized.reset(new ByteStream());
1719                         *serialized = preamble;
1720                     }
1721                 }
1722 
1723                 if (hasSmallOuterJoin)
1724                 {
1725                     *serialized << ridCount;
1726 
1727                     for (i = 0; i < joinerCount; i++)
1728                         for (j = 0; j < ridCount; ++j)
1729                             serializeInlineVector<uint32_t>(*serialized,
1730                                                             tSmallSideMatches[i][j]);
1731                 }
1732             }
1733 
1734             if (!doJoin && fe2)
1735             {
1736                 /* functionize this -> processFE2() */
1737                 fe2Output.resetRowGroup(baseRid);
1738                 fe2Output.getRow(0, &fe2Out);
1739                 fe2Input->getRow(0, &fe2In);
1740 
1741                 //cerr << "input row: " << fe2In.toString() << endl;
1742                 for (j = 0; j < outputRG.getRowCount(); j++, fe2In.nextRow())
1743                 {
1744                     if (fe2->evaluate(&fe2In))
1745                     {
1746                         applyMapping(fe2Mapping, fe2In, &fe2Out);
1747                         //cerr << "   passed. output row: " << fe2Out.toString() << endl;
1748                         fe2Out.setRid (fe2In.getRelRid());
1749                         fe2Output.incRowCount();
1750                         fe2Out.nextRow();
1751                     }
1752                 }
1753 
1754                 if (!fAggregator)
1755                 {
1756                     *serialized << (uint8_t) 1;  // the "count this msg" var
1757                     fe2Output.setDBRoot(dbRoot);
1758                     fe2Output.serializeRGData(*serialized);
1759                     if (MonitorProcMem::checkMemlimit())
1760                     {
1761                         throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1762                     }
1763                     //*serialized << fe2Output.getDataSize();
1764                     //serialized->append(fe2Output.getData(), fe2Output.getDataSize());
1765                 }
1766             }
1767 
1768             if (!doJoin && fAggregator)
1769             {
1770                 *serialized << (uint8_t) 1;  // the "count this msg" var
1771 
1772                 RowGroup& toAggregate = (fe2 ? fe2Output : outputRG);
1773                 //toAggregate.convertToInlineDataInPlace();
1774 
1775                 if (fe2)
1776                     fe2Output.setDBRoot(dbRoot);
1777                 else
1778                     outputRG.setDBRoot(dbRoot);
1779 
1780                 fAggregator->addRowGroup(&toAggregate);
1781 
1782                 if ((currentBlockOffset + 1) == count)                    // @bug4507, 8k
1783                 {
1784                     fAggregator->loadResult(*serialized);                 // @bug4507, 8k
1785                 }                                                         // @bug4507, 8k
1786                 else if (utils::MonitorProcMem::isMemAvailable())         // @bug4507, 8k
1787                 {
1788                     fAggregator->loadEmptySet(*serialized);               // @bug4507, 8k
1789                 }                                                         // @bug4507, 8k
1790                 else                                                      // @bug4507, 8k
1791                 {
1792                     fAggregator->loadResult(*serialized);                 // @bug4507, 8k
1793                     fAggregator->aggReset();                              // @bug4507, 8k
1794                 }                                                         // @bug4507, 8k
1795             }
1796 
1797             if (!fAggregator && !fe2)
1798             {
1799                 *serialized << (uint8_t) 1;  // the "count this msg" var
1800                 outputRG.setDBRoot(dbRoot);
1801                 //cerr << "serializing " << outputRG.toString() << endl;
1802                 outputRG.serializeRGData(*serialized);
1803                 if (MonitorProcMem::checkMemlimit())
1804                 {
1805                     throw logging::IDBExcept(logging::ERR_PRIMPROC_LOW_MEMORY);
1806                 }
1807                 //*serialized << outputRG.getDataSize();
1808                 //serialized->append(outputRG.getData(), outputRG.getDataSize());
1809                 if (doJoin)
1810                 {
1811                     for (i = 0; i < joinerCount; i++)
1812                     {
1813                         for (j = 0; j < ridCount; ++j)
1814                         {
1815                             serializeInlineVector<uint32_t>(*serialized,
1816                                                             tSmallSideMatches[i][j]);
1817                         }
1818                     }
1819                 }
1820             }
1821 
1822             // clear small side match vector
1823             if (doJoin)
1824             {
1825                 for (i = 0; i < joinerCount; i++)
1826                     for (j = 0; j < ridCount; ++j)
1827                         tSmallSideMatches[i][j].clear();
1828             }
1829 
1830 #ifdef PRIMPROC_STOPWATCH
1831             stopwatch->stop("- if(ot != ROW_GROUP) else");
1832 #endif
1833         }
1834 
1835         if (projectCount > 0 || ot == ROW_GROUP)
1836         {
1837             *serialized << cachedIO;
1838             cachedIO = 0;
1839             *serialized << physIO;
1840             physIO = 0;
1841             *serialized << touchedBlocks;
1842             touchedBlocks = 0;
1843 // 		cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO <<
1844 // 			" touchedBlocks=" << touchedBlocks << endl;
1845         }
1846 
1847 #ifdef PRIMPROC_STOPWATCH
1848         stopwatch->stop("BatchPrimitiveProcessor::execute fourth part");
1849 #endif
1850 
1851     }
1852     catch (logging::QueryDataExcept& qex)
1853     {
1854         writeErrorMsg(qex.what(), qex.errorCode());
1855     }
1856     catch (logging::DictionaryBufferOverflow& db)
1857     {
1858         writeErrorMsg(db.what(), db.errorCode());
1859     }
1860     catch (scalar_exception& se)
1861     {
1862         writeErrorMsg(IDBErrorInfo::instance()->errorMsg(ERR_MORE_THAN_1_ROW), ERR_MORE_THAN_1_ROW, false);
1863     }
1864     catch (NeedToRestartJob& n)
1865     {
1866 #if 0
1867 
1868         /* This block of code will flush the problematic OIDs from the
1869          * cache.  It seems to have no effect on the problem, so it's commented
1870          * for now.
1871          *
1872          * This is currently thrown only on syscat queries.  If we find the problem
1873          * in user tables also, we should avoid dropping entire OIDs if possible.
1874          *
1875          * In local testing there was no need for flushing, because DDL flushes
1876          * the syscat constantly.  However, it can take a long time (>10 s) before
1877          * that happens.  Doing it locally should make it much more likely only
1878          * one restart is necessary.
1879          */
1880 
1881         try
1882         {
1883             vector<uint32_t> oids;
1884             uint32_t oid;
1885 
1886             for (uint32_t i = 0; i < filterCount; i++)
1887             {
1888                 oid = filterSteps[i]->getOID();
1889 
1890                 if (oid > 0)
1891                     oids.push_back(oid);
1892             }
1893 
1894             for (uint32_t i = 0; i < projectCount; i++)
1895             {
1896                 oid = projectSteps[i]->getOID();
1897 
1898                 if (oid > 0)
1899                     oids.push_back(oid);
1900             }
1901 
1902 #if 0
1903             Logger logger;
1904             ostringstream os;
1905             os << "dropping OIDs: ";
1906 
1907             for (int i = 0; i < oids.size(); i++)
1908                 os << oids[i] << " ";
1909 
1910             logger.logMessage(os.str());
1911 #endif
1912 
1913             for (int i = 0; i < fCacheCount; i++)
1914             {
1915                 dbbc::blockCacheClient bc(*BRPp[i]);
1916 //				bc.flushCache();
1917                 bc.flushOIDs(&oids[0], oids.size());
1918             }
1919         }
1920         catch (...) { }     // doesn't matter if this fails, just avoid crashing
1921 
1922 #endif
1923 
1924 #ifndef __FreeBSD__
1925         pthread_mutex_unlock(&objLock);
1926 #endif
1927         throw n;   // need to pass this through to BPPSeeder
1928     }
1929     catch (IDBExcept& iex)
1930     {
1931         writeErrorMsg(iex.what(), iex.errorCode(), true, false);
1932     }
1933     catch (const std::exception& ex)
1934     {
1935         writeErrorMsg(ex.what(), logging::batchPrimitiveProcessorErr);
1936     }
1937     catch (...)
1938     {
1939         string msg("BatchPrimitiveProcessor caught an unknown exception");
1940         writeErrorMsg(msg, logging::batchPrimitiveProcessorErr);
1941     }
1942 }
1943 
writeErrorMsg(const string & error,uint16_t errCode,bool logIt,bool critical)1944 void BatchPrimitiveProcessor::writeErrorMsg(const string& error, uint16_t errCode, bool logIt, bool critical)
1945 {
1946     ISMPacketHeader ism;
1947     PrimitiveHeader ph;
1948 
1949     // we don't need every field of these headers.  Init'ing them anyway
1950     // makes memory checkers happy.
1951     void *ismp = static_cast<void*>(&ism);
1952     void *php = static_cast<void*>(&ph);
1953     memset(ismp, 0, sizeof(ISMPacketHeader));
1954     memset(php, 0, sizeof(PrimitiveHeader));
1955     ph.SessionID = sessionID;
1956     ph.StepID = stepID;
1957     ph.UniqueID = uniqueID;
1958     ism.Status = errCode;
1959 
1960     serialized.reset(new ByteStream());
1961     serialized->append((uint8_t*) &ism, sizeof(ism));
1962     serialized->append((uint8_t*) &ph, sizeof(ph));
1963     *serialized << error;
1964 
1965     if (logIt)
1966     {
1967         Logger log;
1968         log.logMessage(error, critical);
1969     }
1970 }
1971 
writeProjectionPreamble()1972 void BatchPrimitiveProcessor::writeProjectionPreamble()
1973 {
1974     ISMPacketHeader ism;
1975     PrimitiveHeader ph;
1976 
1977     // we don't need every field of these headers.  Init'ing them anyway
1978     // makes memory checkers happy.
1979     void *ismp = static_cast<void*>(&ism);
1980     void *php = static_cast<void*>(&ph);
1981     memset(ismp, 0, sizeof(ISMPacketHeader));
1982     memset(php, 0, sizeof(PrimitiveHeader));
1983     ph.SessionID = sessionID;
1984     ph.StepID = stepID;
1985     ph.UniqueID = uniqueID;
1986 
1987     serialized.reset(new ByteStream());
1988     serialized->append((uint8_t*) &ism, sizeof(ism));
1989     serialized->append((uint8_t*) &ph, sizeof(ph));
1990 
1991     /* add-ons */
1992     if (hasScan)
1993     {
1994         if (validCPData)
1995         {
1996             *serialized << (uint8_t) 1;
1997             *serialized << lbidForCP;
1998             *serialized << (uint64_t) minVal;
1999             *serialized << (uint64_t) maxVal;
2000         }
2001         else
2002         {
2003             *serialized << (uint8_t) 0;
2004             *serialized << lbidForCP;
2005         }
2006     }
2007 
2008 // 	ridsOut += ridCount;
2009     /* results */
2010 
2011     if (ot != ROW_GROUP)
2012     {
2013         *serialized << ridCount;
2014 
2015         if (sendRidsAtDelivery)
2016         {
2017             *serialized << baseRid;
2018             serialized->append((uint8_t*) relRids, ridCount << 1);
2019         }
2020     }
2021 }
2022 
serializeElementTypes()2023 void BatchPrimitiveProcessor::serializeElementTypes()
2024 {
2025 
2026     *serialized << baseRid;
2027     *serialized << ridCount;
2028     serialized->append((uint8_t*) relRids, ridCount << 1);
2029     serialized->append((uint8_t*) values, ridCount << 3);
2030 }
2031 
serializeStrings()2032 void BatchPrimitiveProcessor::serializeStrings()
2033 {
2034 
2035     *serialized << ridCount;
2036     serialized->append((uint8_t*) absRids.get(), ridCount << 3);
2037 
2038     for (uint32_t i = 0; i < ridCount; ++i)
2039         *serialized << strValues[i];
2040 }
2041 
sendResponse()2042 void BatchPrimitiveProcessor::sendResponse()
2043 {
2044 
2045     if (sendThread->flowControlEnabled())
2046     {
2047         // newConnection should be set only for the first result of a batch job
2048         // it tells sendthread it should consider it for the connection array
2049         sendThread->sendResult(BPPSendThread::Msg_t(serialized, sock, writelock, sockIndex), newConnection);
2050         newConnection = false;
2051     }
2052     else
2053     {
2054         boost::mutex::scoped_lock lk(*writelock);
2055         sock->write(*serialized);
2056     }
2057 
2058     serialized.reset();
2059 }
2060 
2061 /* The output of a filter chain is either ELEMENT_TYPE or STRING_ELEMENT_TYPE */
makeResponse()2062 void BatchPrimitiveProcessor::makeResponse()
2063 {
2064     ISMPacketHeader ism;
2065     PrimitiveHeader ph;
2066 
2067     // we don't need every field of these headers.  Init'ing them anyway
2068     // makes memory checkers happy.
2069     void *ismp = static_cast<void*>(&ism);
2070     void *php = static_cast<void*>(&ph);
2071     memset(ismp, 0, sizeof(ISMPacketHeader));
2072     memset(php, 0, sizeof(PrimitiveHeader));
2073     ph.SessionID = sessionID;
2074     ph.StepID = stepID;
2075     ph.UniqueID = uniqueID;
2076 
2077     serialized.reset(new ByteStream());
2078     serialized->append((uint8_t*) &ism, sizeof(ism));
2079     serialized->append((uint8_t*) &ph, sizeof(ph));
2080 
2081     /* add-ons */
2082     if (hasScan)
2083     {
2084         if (validCPData)
2085         {
2086             *serialized << (uint8_t) 1;
2087             *serialized << lbidForCP;
2088             *serialized << (uint64_t) minVal;
2089             *serialized << (uint64_t) maxVal;
2090         }
2091         else
2092         {
2093             *serialized << (uint8_t) 0;
2094             *serialized << lbidForCP;
2095         }
2096     }
2097 
2098     /* results */
2099     /* Take the rid and value arrays, munge into OutputType ot */
2100     switch (ot)
2101     {
2102         case BPS_ELEMENT_TYPE:
2103             serializeElementTypes();
2104             break;
2105 
2106         case STRING_ELEMENT_TYPE:
2107             serializeStrings();
2108             break;
2109 
2110         default:
2111         {
2112             ostringstream oss;
2113             oss << "BPP: makeResponse(): Bad output type: " << ot;
2114             throw logic_error(oss.str());
2115         }
2116 
2117             //throw logic_error("BPP: makeResponse(): Bad output type");
2118     }
2119 
2120     *serialized << cachedIO;
2121     cachedIO = 0;
2122     *serialized << physIO;
2123     physIO = 0;
2124     *serialized << touchedBlocks;
2125     touchedBlocks = 0;
2126 
2127 // 	cout << "sent physIO=" << physIO << " cachedIO=" << cachedIO <<
2128 // 		" touchedBlocks=" << touchedBlocks << endl;
2129 }
2130 
operator ()()2131 int BatchPrimitiveProcessor::operator()()
2132 {
2133     utils::setThreadName("PPBatchPrimProc");
2134 #ifdef PRIMPROC_STOPWATCH
2135     const static std::string msg{"BatchPrimitiveProcessor::operator()"};
2136     logging::StopWatch* stopwatch = profiler.getTimer();
2137 #endif
2138 
2139     if (currentBlockOffset == 0)
2140     {
2141 #ifdef PRIMPROC_STOPWATCH
2142         stopwatch->start(msg);
2143 #endif
2144         idbassert(count > 0);
2145     }
2146 
2147     if (fAggregator && currentBlockOffset == 0)                     // @bug4507, 8k
2148         fAggregator->aggReset();                                    // @bug4507, 8k
2149 
2150     for (; currentBlockOffset < count; currentBlockOffset++)
2151     {
2152         if (!(sessionID & 0x80000000))     // can't do this with syscat queries
2153         {
2154             if (sendThread->aborted())
2155                 break;
2156 
2157             if (!sendThread->okToProceed())
2158             {
2159                 freeLargeBuffers();
2160                 return -1; // the reschedule error code
2161             }
2162         }
2163 
2164         allocLargeBuffers();
2165         minVal = MAX64;
2166         maxVal = MIN64;
2167         validCPData = false;
2168 #ifdef PRIMPROC_STOPWATCH
2169         stopwatch->start("BPP() execute");
2170         execute(stopwatch);
2171         stopwatch->stop("BPP() execute");
2172 #else
2173         execute();
2174 #endif
2175 
2176         if (projectCount == 0 && ot != ROW_GROUP)
2177             makeResponse();
2178 
2179         try
2180         {
2181             sendResponse();
2182         }
2183         catch (std::exception& e)
2184         {
2185             cerr << "BPP::sendResponse(): " << e.what() << endl;
2186             break;  // If we make this throw, be sure to do the cleanup at the end
2187         }
2188 
2189         // Bug 4475: Control outgoing socket so that all messages from a
2190         // batch go out the same socket
2191         sockIndex = (sockIndex + 1) % connectionsPerUM;
2192 
2193         nextLBID();
2194 
2195         /* Base RIDs are now a combination of partition#, segment#, extent#, and block#. */
2196         uint32_t partNum;
2197         uint16_t segNum;
2198         uint8_t extentNum;
2199         uint16_t blockNum;
2200         rowgroup::getLocationFromRid(baseRid, &partNum, &segNum, &extentNum, &blockNum);
2201         /*
2202         cout << "baseRid=" << baseRid << " partNum=" << partNum << " segNum=" << segNum <<
2203         		" extentNum=" << (int) extentNum
2204         		<< " blockNum=" << blockNum << endl;
2205         */
2206         blockNum++;
2207         baseRid = rowgroup::convertToRid(partNum, segNum, extentNum, blockNum);
2208         /*
2209         cout << "-- baseRid=" << baseRid << " partNum=" << partNum << " extentNum=" << (int) extentNum
2210         		<< " blockNum=" << blockNum << endl;
2211         */
2212     }
2213     vssCache.clear();
2214 #ifndef __FreeBSD__
2215     pthread_mutex_unlock(&objLock);
2216 #endif
2217     freeLargeBuffers();
2218 #ifdef PRIMPROC_STOPWATCH
2219     stopwatch->stop(msg);
2220 #endif
2221 // 	cout << "sent " << count << " responses" << endl;
2222     fBusy = false;
2223     return 0;
2224 }
2225 
allocLargeBuffers()2226 void BatchPrimitiveProcessor::allocLargeBuffers()
2227 {
2228     if (ot == ROW_GROUP && !outRowGroupData)
2229     {
2230         //outputRG.setUseStringTable(true);
2231         outRowGroupData.reset(new RGData(outputRG));
2232         outputRG.setData(outRowGroupData.get());
2233     }
2234 
2235     if (fe1 && !fe1Data)
2236     {
2237         //fe1Input.setUseStringTable(true);
2238         fe1Data.reset(new RGData(fe1Input));
2239         //fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
2240         fe1Input.setData(fe1Data.get());
2241     }
2242 
2243     if (fe2 && !fe2Data)
2244     {
2245         //fe2Output.setUseStringTable(true);
2246         fe2Data.reset(new RGData(fe2Output));
2247         fe2Output.setData(fe2Data.get());
2248     }
2249 
2250     if (getTupleJoinRowGroupData && !joinedRGMem)
2251     {
2252         //joinedRG.setUseStringTable(true);
2253         joinedRGMem.reset(new RGData(joinedRG));
2254         joinedRG.setData(joinedRGMem.get());
2255     }
2256 }
2257 
freeLargeBuffers()2258 void BatchPrimitiveProcessor::freeLargeBuffers()
2259 {
2260     /* Get rid of large buffers */
2261     if (ot == ROW_GROUP && outputRG.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2262         outRowGroupData.reset();
2263 
2264     if (fe1 && fe1Input.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2265         fe1Data.reset();
2266 
2267     if (fe2 && fe2Output.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2268         fe2Data.reset();
2269 
2270     if (getTupleJoinRowGroupData && joinedRG.getMaxDataSizeWithStrings() > maxIdleBufferSize)
2271         joinedRGMem.reset();
2272 }
2273 
nextLBID()2274 void BatchPrimitiveProcessor::nextLBID()
2275 {
2276     uint32_t i;
2277 
2278     for (i = 0; i < filterCount; i++)
2279         filterSteps[i]->nextLBID();
2280 
2281     for (i = 0; i < projectCount; i++)
2282         projectSteps[i]->nextLBID();
2283 }
2284 
duplicate()2285 SBPP BatchPrimitiveProcessor::duplicate()
2286 {
2287     SBPP bpp;
2288     uint32_t i;
2289 
2290 //	cout << "duplicating a bpp\n";
2291 
2292     bpp.reset(new BatchPrimitiveProcessor());
2293     bpp->ot = ot;
2294     bpp->versionInfo = versionInfo;
2295     bpp->txnID = txnID;
2296     bpp->sessionID = sessionID;
2297     bpp->stepID = stepID;
2298     bpp->uniqueID = uniqueID;
2299     bpp->needStrValues = needStrValues;
2300     bpp->gotAbsRids = gotAbsRids;
2301     bpp->gotValues = gotValues;
2302     bpp->LBIDTrace = LBIDTrace;
2303     bpp->hasScan = hasScan;
2304     bpp->hasFilterStep = hasFilterStep;
2305     bpp->filtOnString = filtOnString;
2306     bpp->hasRowGroup = hasRowGroup;
2307     bpp->getTupleJoinRowGroupData = getTupleJoinRowGroupData;
2308     bpp->bop = bop;
2309     bpp->hasPassThru = hasPassThru;
2310     bpp->forHJ = forHJ;
2311     bpp->processorThreads = processorThreads;   // is a power-of-2 at this point
2312     bpp->ptMask = processorThreads - 1;
2313 
2314     if (ot == ROW_GROUP)
2315     {
2316         bpp->outputRG = outputRG;
2317 
2318         if (fe1)
2319         {
2320             bpp->fe1.reset(new FuncExpWrapper(*fe1));
2321             bpp->fe1Input = fe1Input;
2322         }
2323 
2324         if (fe2)
2325         {
2326             bpp->fe2.reset(new FuncExpWrapper(*fe2));
2327             bpp->fe2Output = fe2Output;
2328         }
2329     }
2330 
2331     bpp->doJoin = doJoin;
2332 
2333     if (doJoin)
2334     {
2335         pthread_mutex_lock(&bpp->objLock);
2336         /* There are add'l join vars, but only these are necessary for processing
2337              a join */
2338         bpp->tJoinerSizes = tJoinerSizes;
2339         bpp->joinerCount = joinerCount;
2340         bpp->joinTypes = joinTypes;
2341         bpp->largeSideKeyColumns = largeSideKeyColumns;
2342         bpp->tJoiners = tJoiners;
2343         //bpp->_pools = _pools;
2344         bpp->typelessJoin = typelessJoin;
2345         bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns;
2346         bpp->tlJoiners = tlJoiners;
2347         bpp->tlKeyLengths = tlKeyLengths;
2348         bpp->storedKeyAllocators = storedKeyAllocators;
2349         bpp->joinNullValues = joinNullValues;
2350         bpp->doMatchNulls = doMatchNulls;
2351         bpp->hasJoinFEFilters = hasJoinFEFilters;
2352         bpp->hasSmallOuterJoin = hasSmallOuterJoin;
2353 
2354         if (hasJoinFEFilters)
2355         {
2356             bpp->joinFERG = joinFERG;
2357             bpp->joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
2358 
2359             for (i = 0; i < joinerCount; i++)
2360                 if (joinFEFilters[i])
2361                     bpp->joinFEFilters[i].reset(new FuncExpWrapper(*joinFEFilters[i]));
2362         }
2363 
2364         if (getTupleJoinRowGroupData)
2365         {
2366             bpp->smallSideRGs = smallSideRGs;
2367             bpp->largeSideRG = largeSideRG;
2368             bpp->smallSideRowLengths = smallSideRowLengths;
2369             bpp->smallSideRowData = smallSideRowData;
2370             bpp->smallNullRowData = smallNullRowData;
2371             bpp->smallNullPointers = smallNullPointers;
2372             bpp->joinedRG = joinedRG;
2373         }
2374 
2375 #ifdef __FreeBSD__
2376         pthread_mutex_unlock(&bpp->objLock);
2377 #endif
2378     }
2379 
2380     bpp->filterCount = filterCount;
2381     bpp->filterSteps.resize(filterCount);
2382 
2383     for (i = 0; i < filterCount; ++i)
2384         bpp->filterSteps[i] = filterSteps[i]->duplicate();
2385 
2386     bpp->projectCount = projectCount;
2387     bpp->projectSteps.resize(projectCount);
2388 
2389     for (i = 0; i < projectCount; ++i)
2390         bpp->projectSteps[i] = projectSteps[i]->duplicate();
2391 
2392     if (fAggregator.get() != NULL)
2393     {
2394         bpp->fAggregateRG = fAggregateRG;
2395         bpp->fAggregator.reset(new RowAggregation(
2396                                    fAggregator->getGroupByCols(), fAggregator->getAggFunctions()));
2397         bpp->fAggregator->timeZone(fAggregator->timeZone());
2398     }
2399 
2400     bpp->sendRidsAtDelivery = sendRidsAtDelivery;
2401     bpp->prefetchThreshold = prefetchThreshold;
2402 
2403     bpp->sock = sock;
2404     bpp->writelock = writelock;
2405     bpp->hasDictStep = hasDictStep;
2406     bpp->sendThread = sendThread;
2407     bpp->newConnection = true;
2408     bpp->initProcessor();
2409     return bpp;
2410 }
2411 
2412 #if 0
2413 bool BatchPrimitiveProcessor::operator==(const BatchPrimitiveProcessor& bpp) const
2414 {
2415     uint32_t i;
2416 
2417     if (ot != bpp.ot)
2418         return false;
2419 
2420     if (versionInfo != bpp.versionInfo)
2421         return false;
2422 
2423     if (txnID != bpp.txnID)
2424         return false;
2425 
2426     if (sessionID != bpp.sessionID)
2427         return false;
2428 
2429     if (stepID != bpp.stepID)
2430         return false;
2431 
2432     if (uniqueID != bpp.uniqueID)
2433         return false;
2434 
2435     if (gotValues != bpp.gotValues)
2436         return false;
2437 
2438     if (gotAbsRids != bpp.gotAbsRids)
2439         return false;
2440 
2441     if (needStrValues != bpp.needStrValues)
2442         return false;
2443 
2444     if (filterCount != bpp.filterCount)
2445         return false;
2446 
2447     if (projectCount != bpp.projectCount)
2448         return false;
2449 
2450     if (sendRidsAtDelivery != bpp.sendRidsAtDelivery)
2451         return false;
2452 
2453     if (hasScan != bpp.hasScan)
2454         return false;
2455 
2456     if (hasFilterStep != bpp.hasFilterStep)
2457         return false;
2458 
2459     if (filtOnString != bpp.filtOnString)
2460         return false;
2461 
2462     if (doJoin != bpp.doJoin)
2463         return false;
2464 
2465     if (doJoin)
2466 
2467         /* Join equality test is a bit out of date */
2468         if (joiner != bpp.joiner || joinerSize != bpp.joinerSize)
2469             return false;
2470 
2471     for (i = 0; i < filterCount; i++)
2472         if (*filterSteps[i] != *bpp.filterSteps[i])
2473             return false;
2474 
2475     return true;
2476 }
2477 #endif
2478 
2479 
asyncLoadProjectColumns()2480 void BatchPrimitiveProcessor::asyncLoadProjectColumns()
2481 {
2482     // relLBID is the LBID related to the primMsg->LBID,
2483     // it is used to keep the read ahead boundary for asyncLoads
2484     // 1. scan driven case: load blocks in # to (# + blocksReadAhead - 1) range,
2485     //    where # is a multiple of ColScanReadAheadBlocks in Columnstore.xml
2486     // 2. non-scan driven case: load blocks in the logical block.
2487     //    because 1 logical block per primMsg, asyncLoad only once per message.
2488     for (uint64_t i = 0; i < projectCount; ++i)
2489     {
2490         // only care about column commands
2491         ColumnCommand* col = dynamic_cast<ColumnCommand*>(projectSteps[i].get());
2492 
2493         if (col != NULL)
2494         {
2495             asyncLoaded[i] = asyncLoaded[i] && (relLBID[i] % blocksReadAhead != 0);
2496             relLBID[i] += col->getWidth();
2497 
2498             if (!asyncLoaded[i] && col->willPrefetch())
2499             {
2500                 loadBlockAsync(col->getLBID(),
2501                                versionInfo,
2502                                txnID,
2503                                col->getCompType(),
2504                                &cachedIO,
2505                                &physIO,
2506                                LBIDTrace,
2507                                sessionID,
2508                                &counterLock,
2509                                &busyLoaderCount,
2510                                sendThread,
2511                                &vssCache);
2512                 asyncLoaded[i] = true;
2513             }
2514         }
2515     }
2516 }
2517 
generateJoinedRowGroup(rowgroup::Row & baseRow,const uint32_t depth)2518 bool BatchPrimitiveProcessor::generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth)
2519 {
2520     Row& smallRow = smallRows[depth];
2521     const bool lowestLvl = (depth == joinerCount - 1);
2522 
2523     while (gjrgRowNumber < ridCount &&
2524             gjrgPlaceHolders[depth] < tSmallSideMatches[depth][gjrgRowNumber].size() &&
2525             !gjrgFull)
2526     {
2527         const vector<uint32_t>& results = tSmallSideMatches[depth][gjrgRowNumber];
2528         const uint32_t size = results.size();
2529 
2530         if (depth == 0)
2531         {
2532             outputRG.getRow(gjrgRowNumber, &largeRow);
2533             applyMapping(gjrgMappings[joinerCount], largeRow, &baseRow);
2534             baseRow.setRid(largeRow.getRelRid());
2535         }
2536 
2537         //cout << "rowNum = " << gjrgRowNumber << " at depth " << depth << " size is " << size << endl;
2538         for (uint32_t& i = gjrgPlaceHolders[depth]; i < size && !gjrgFull; i++)
2539         {
2540             if (results[i] != (uint32_t) - 1)
2541             {
2542                 smallSideRGs[depth].getRow(results[i], &smallRow);
2543                 //rowOffset = ((uint64_t) results[i]) * smallRowSize;
2544                 //smallRow.setData(&rowDataAtThisLvl.rowData[rowOffset] + emptySize);
2545             }
2546             else
2547                 smallRow.setPointer(smallNullPointers[depth]);
2548 
2549             //cout << "small row: " << smallRow.toString() << endl;
2550             applyMapping(gjrgMappings[depth], smallRow, &baseRow);
2551 
2552             if (!lowestLvl)
2553                 generateJoinedRowGroup(baseRow, depth + 1);
2554             else
2555             {
2556                 copyRow(baseRow, &joinedRow);
2557                 //memcpy(joinedRow.getData(), baseRow.getData(), joinedRow.getSize());
2558                 //cerr << "joined row " << joinedRG.getRowCount() << ": " << joinedRow.toString() << endl;
2559                 joinedRow.nextRow();
2560                 joinedRG.incRowCount();
2561 
2562                 if (joinedRG.getRowCount() == 8192)
2563                 {
2564                     i++;
2565                     gjrgFull = true;
2566                 }
2567             }
2568 
2569             if (gjrgFull)
2570                 break;
2571         }
2572 
2573         if (depth == 0 && gjrgPlaceHolders[0] == tSmallSideMatches[0][gjrgRowNumber].size())
2574         {
2575             gjrgPlaceHolders[0] = 0;
2576             gjrgRowNumber++;
2577         }
2578     }
2579 
2580 // 	if (depth == 0)
2581 // 		cout << "gjrg returning " << (uint32_t) gjrgFull << endl;
2582     if (!gjrgFull)
2583         gjrgPlaceHolders[depth] = 0;
2584 
2585     return gjrgFull;
2586 }
2587 
resetGJRG()2588 void BatchPrimitiveProcessor::resetGJRG()
2589 {
2590     gjrgFull = false;
2591     joinedRG.resetRowGroup(baseRid);
2592     joinedRG.getRow(0, &joinedRow);
2593     joinedRG.setDBRoot(dbRoot);
2594 }
2595 
initGJRG()2596 void BatchPrimitiveProcessor::initGJRG()
2597 {
2598     for (uint32_t z = 0; z < joinerCount; z++)
2599         gjrgPlaceHolders[z] = 0;
2600 
2601     gjrgRowNumber = 0;
2602 }
2603 
getJoinResults(const Row & r,uint32_t jIndex,vector<uint32_t> & v)2604 inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jIndex, vector<uint32_t>& v)
2605 {
2606     uint bucket;
2607 
2608     if (!typelessJoin[jIndex])
2609     {
2610         if (r.isNullValue(largeSideKeyColumns[jIndex]))
2611         {
2612             /* Bug 3524. This matches everything. */
2613             if (joinTypes[jIndex] & ANTI)
2614             {
2615                 TJoiner::iterator it;
2616 
2617                 for (uint i = 0; i < processorThreads; ++i)
2618                     for (it = tJoiners[jIndex][i]->begin(); it != tJoiners[jIndex][i]->end(); ++it)
2619                         v.push_back(it->second);
2620 
2621                 return;
2622             }
2623             else
2624                 return;
2625         }
2626 
2627         uint64_t largeKey;
2628         uint32_t colIndex = largeSideKeyColumns[jIndex];
2629 
2630         if (r.isUnsigned(colIndex))
2631         {
2632             largeKey = r.getUintField(colIndex);
2633         }
2634         else
2635         {
2636             largeKey = r.getIntField(colIndex);
2637         }
2638 
2639         bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask;
2640         pair<TJoiner::iterator, TJoiner::iterator> range = tJoiners[jIndex][bucket]->equal_range(largeKey);
2641         for (; range.first != range.second; ++range.first)
2642             v.push_back(range.first->second);
2643 
2644         if (doMatchNulls[jIndex])     // add the nulls to the match list
2645         {
2646             bucket = bucketPicker((char *) &joinNullValues[jIndex], 8, bpSeed) & ptMask;
2647             range = tJoiners[jIndex][bucket]->equal_range(joinNullValues[jIndex]);
2648             for (; range.first != range.second; ++range.first)
2649                 v.push_back(range.first->second);
2650         }
2651     }
2652     else
2653     {
2654         /* Bug 3524. Large-side NULL + ANTI join matches everything. */
2655         if (joinTypes[jIndex] & ANTI)
2656         {
2657             bool hasNullValue = false;
2658 
2659             for (uint32_t i = 0; i < tlLargeSideKeyColumns[jIndex].size(); i++)
2660                 if (r.isNullValue(tlLargeSideKeyColumns[jIndex][i]))
2661                 {
2662                     hasNullValue = true;
2663                     break;
2664                 }
2665 
2666             if (hasNullValue)
2667             {
2668                 TLJoiner::iterator it;
2669                 for (uint i = 0; i < processorThreads; ++i)
2670                     for (it = tlJoiners[jIndex][i]->begin(); it != tlJoiners[jIndex][i]->end(); ++it)
2671                         v.push_back(it->second);
2672 
2673                 return;
2674             }
2675         }
2676 
2677         TypelessData largeKey = makeTypelessKey(r, tlLargeSideKeyColumns[jIndex],
2678                                                 tlKeyLengths[jIndex], &tmpKeyAllocators[jIndex]);
2679         pair<TLJoiner::iterator, TLJoiner::iterator> range;
2680         bucket = largeKey.hash(outputRG, tlLargeSideKeyColumns[jIndex]) & ptMask;
2681         range = tlJoiners[jIndex][bucket]->equal_range(largeKey);
2682         for (; range.first != range.second; ++range.first)
2683             v.push_back(range.first->second);
2684     }
2685 }
2686 
buildVSSCache(uint32_t loopCount)2687 void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount)
2688 {
2689     vector<int64_t> lbidList;
2690     vector<BRM::VSSData> vssData;
2691     uint32_t i;
2692     int rc;
2693 
2694     for (i = 0; i < filterCount; i++)
2695         filterSteps[i]->getLBIDList(loopCount, &lbidList);
2696 
2697     for (i = 0; i < projectCount; i++)
2698         projectSteps[i]->getLBIDList(loopCount, &lbidList);
2699 
2700     rc = brm->bulkVSSLookup(lbidList, versionInfo, (int) txnID, &vssData);
2701 
2702     if (rc == 0)
2703         for (i = 0; i < vssData.size(); i++)
2704             vssCache.insert(make_pair(lbidList[i], vssData[i]));
2705 
2706 //	cout << "buildVSSCache inserted " << vssCache.size() << " elements" << endl;
2707 }
2708 
resetMem()2709 void BatchPrimitiveProcessor::resetMem()
2710 {
2711     serialized.reset();
2712     outputMsg.reset();
2713     std::vector<SCommand>().swap(filterSteps);
2714     std::vector<SCommand>().swap(projectSteps);
2715     vssCache.clear();
2716 #ifndef __FreeBSD__
2717     pthread_mutex_unlock(&objLock);
2718 #endif
2719     outRowGroupData.reset();
2720     fe1Data.reset();
2721     fe2Data.reset();
2722     joinedRGMem.reset();
2723 }
2724 
2725 }
2726 // vim:ts=4 sw=4:
2727