1 /*
2    Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBSPJ_C
26 #include "Dbspj.hpp"
27 
28 #include <SectionReader.hpp>
29 #include <signaldata/LqhKey.hpp>
30 #include <signaldata/QueryTree.hpp>
31 #include <signaldata/TcKeyRef.hpp>
32 #include <signaldata/RouteOrd.hpp>
33 #include <signaldata/TransIdAI.hpp>
34 #include <signaldata/DiGetNodes.hpp>
35 #include <signaldata/DihScanTab.hpp>
36 #include <signaldata/AttrInfo.hpp>
37 #include <Interpreter.hpp>
38 #include <AttributeHeader.hpp>
39 #include <AttributeDescriptor.hpp>
40 #include <KeyDescriptor.hpp>
41 #include <md5_hash.hpp>
42 #include <signaldata/TcKeyConf.hpp>
43 
44 #include <signaldata/NodeFailRep.hpp>
45 #include <signaldata/ReadNodesConf.hpp>
46 
47 // Use DEBUG to print messages that should be
48 // seen only when we debug the product
49 
50 #ifdef VM_TRACE
51 
52 #define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
53 #define DEBUG_LQHKEYREQ
54 #define DEBUG_SCAN_FRAGREQ
55 
56 #else
57 
58 #define DEBUG(x)
59 
60 #endif
61 
62 #if 1
63 #define DEBUG_CRASH() ndbrequire(false)
64 #else
65 #define DEBUG_CRASH()
66 #endif
67 
68 #if 1
69 #undef DEBUG
70 #define DEBUG(x)
71 #undef DEBUG_LQHKEYREQ
72 #undef DEBUG_SCAN_FRAGREQ
73 #endif
74 
75 const Ptr<Dbspj::TreeNode> Dbspj::NullTreeNodePtr = { 0, RNIL };
76 const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, { 0 } };
77 
78 /** A noop for now.*/
execREAD_CONFIG_REQ(Signal * signal)79 void Dbspj::execREAD_CONFIG_REQ(Signal* signal)
80 {
81   jamEntry();
82   const ReadConfigReq req =
83     *reinterpret_cast<const ReadConfigReq*>(signal->getDataPtr());
84 
85   Pool_context pc;
86   pc.m_block = this;
87 
88   DEBUG("execREAD_CONFIG_REQ");
89   DEBUG("sizeof(Request): " << sizeof(Request) <<
90         " sizeof(TreeNode): " << sizeof(TreeNode));
91 
92   m_arenaAllocator.init(1024, RT_SPJ_ARENA_BLOCK, pc);
93   m_request_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_REQUEST, pc);
94   m_treenode_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_TREENODE, pc);
95   m_scanfraghandle_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_SCANFRAG, pc);
96   m_lookup_request_hash.setSize(16);
97   m_scan_request_hash.setSize(16);
98   void* ptr = m_ctx.m_mm.get_memroot();
99   m_page_pool.set((RowPage*)ptr, (Uint32)~0);
100 
101   Record_info ri;
102   Dependency_map::createRecordInfo(ri, RT_SPJ_DATABUFFER);
103   m_dependency_map_pool.init(&m_arenaAllocator, ri, pc);
104 
105   ReadConfigConf* const conf =
106     reinterpret_cast<ReadConfigConf*>(signal->getDataPtrSend());
107   conf->senderRef = reference();
108   conf->senderData = req.senderData;
109 
110   sendSignal(req.senderRef, GSN_READ_CONFIG_CONF, signal,
111              ReadConfigConf::SignalLength, JBB);
112 }//Dbspj::execREAD_CONF_REQ()
113 
114 static Uint32 f_STTOR_REF = 0;
115 
execSTTOR(Signal * signal)116 void Dbspj::execSTTOR(Signal* signal)
117 {
118 //#define UNIT_TEST_DATABUFFER2
119 
120   jamEntry();
121   /* START CASE */
122   const Uint16 tphase = signal->theData[1];
123   f_STTOR_REF = signal->getSendersBlockRef();
124 
125   ndbout << "Dbspj::execSTTOR() inst:" << instance()
126          << " phase=" << tphase << endl;
127 
128   if (tphase == 1)
129   {
130     jam();
131     signal->theData[0] = 0;
132     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 1000, 1);
133   }
134 
135   if (tphase == 4)
136   {
137     jam();
138 
139     signal->theData[0] = reference();
140     sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
141     return;
142   }
143 
144   sendSTTORRY(signal);
145 
146 #ifdef UNIT_TEST_DATABUFFER2
147   if (tphase == 120)
148   {
149     ndbout_c("basic test of ArenaPool / DataBuffer2");
150 
151     for (Uint32 i = 0; i<100; i++)
152     {
153       ArenaHead ah;
154       if (!m_arenaAllocator.seize(ah))
155       {
156         ndbout_c("Failed to allocate arena");
157         break;
158       }
159 
160       ndbout_c("*** LOOP %u", i);
161       Uint32 sum = 0;
162       Dependency_map::Head head;
163       LocalArenaPoolImpl pool(ah, m_dependency_map_pool);
164       for (Uint32 j = 0; j<100; j++)
165       {
166         Uint32 sz = rand() % 1000;
167         if (0)
168           ndbout_c("adding %u", sz);
169         Local_dependency_map list(pool, head);
170         for (Uint32 i = 0; i<sz; i++)
171           signal->theData[i] = sum + i;
172         list.append(signal->theData, sz);
173         sum += sz;
174       }
175 
176       {
177         ndbrequire(head.getSize() == sum);
178         Local_dependency_map list(pool, head);
179         Dependency_map::ConstDataBufferIterator it;
180         Uint32 cnt = 0;
181         for (list.first(it); !it.isNull(); list.next(it))
182         {
183           ndbrequire(* it.data == cnt);
184           cnt++;
185         }
186 
187         ndbrequire(cnt == sum);
188       }
189 
190       Resource_limit rl;
191       if (m_ctx.m_mm.get_resource_limit(7, rl))
192       {
193         ndbout_c("Resource %d min: %d max: %d curr: %d",
194                  7, rl.m_min, rl.m_max, rl.m_curr);
195       }
196 
197       {
198         ndbout_c("release map");
199         Local_dependency_map list(pool, head);
200         list.release();
201       }
202 
203       ndbout_c("release all");
204       m_arenaAllocator.release(ah);
205       ndbout_c("*** LOOP %u sum: %u", i, sum);
206     }
207   }
208 #endif
209 }//Dbspj::execSTTOR()
210 
211 void
sendSTTORRY(Signal * signal)212 Dbspj::sendSTTORRY(Signal* signal)
213 {
214   signal->theData[0] = 0;
215   signal->theData[1] = 0;    /* BLOCK CATEGORY */
216   signal->theData[2] = 0;    /* SIGNAL VERSION NUMBER */
217   signal->theData[3] = 4;
218 #ifdef UNIT_TEST_DATABUFFER2
219   signal->theData[4] = 120;  /* Start phase end*/
220 #else
221   signal->theData[4] = 255;
222 #endif
223   signal->theData[5] = 255;
224   sendSignal(f_STTOR_REF, GSN_STTORRY, signal, 6, JBB);
225 }
226 
227 void
execREAD_NODESCONF(Signal * signal)228 Dbspj::execREAD_NODESCONF(Signal* signal)
229 {
230   jamEntry();
231 
232   ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
233 
234   if (getNodeState().getNodeRestartInProgress())
235   {
236     jam();
237     c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
238     c_alive_nodes.set(getOwnNodeId());
239   }
240   else
241   {
242     jam();
243     c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
244     NdbNodeBitmask tmp;
245     tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
246     c_alive_nodes.bitOR(tmp);
247   }
248 
249   sendSTTORRY(signal);
250 }
251 
252 void
execINCL_NODEREQ(Signal * signal)253 Dbspj::execINCL_NODEREQ(Signal* signal)
254 {
255   jamEntry();
256   const Uint32 senderRef = signal->theData[0];
257   const Uint32 nodeId  = signal->theData[1];
258 
259   ndbrequire(!c_alive_nodes.get(nodeId));
260   c_alive_nodes.set(nodeId);
261 
262   signal->theData[0] = nodeId;
263   signal->theData[1] = reference();
264   sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
265 }
266 
267 void
execNODE_FAILREP(Signal * signal)268 Dbspj::execNODE_FAILREP(Signal* signal)
269 {
270   jamEntry();
271 
272   const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
273   NdbNodeBitmask failed;
274   failed.assign(NdbNodeBitmask::Size, rep->theNodes);
275 
276   c_alive_nodes.bitANDC(failed);
277 
278   signal->theData[0] = 1;
279   signal->theData[1] = 0;
280   failed.copyto(NdbNodeBitmask::Size, signal->theData + 2);
281   sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
282              JBB);
283 }
284 
285 void
execAPI_FAILREQ(Signal * signal)286 Dbspj::execAPI_FAILREQ(Signal* signal)
287 {
288   jamEntry();
289   Uint32 failedApiNode = signal->theData[0];
290   ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
291 
292   /**
293    * We only need to care about lookups
294    *   as SCAN's are aborted by DBTC
295    */
296 
297   signal->theData[0] = failedApiNode;
298   signal->theData[1] = reference();
299   sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
300 }
301 
302 void
execCONTINUEB(Signal * signal)303 Dbspj::execCONTINUEB(Signal* signal)
304 {
305   jamEntry();
306   switch(signal->theData[0]) {
307   case 0:
308     releaseGlobal(signal);
309     return;
310   case 1:
311     nodeFail_checkRequests(signal);
312     return;
313   case 2:
314     nodeFail_checkRequests(signal);
315     return;
316   }
317 
318   ndbrequire(false);
319 }
320 
321 void
nodeFail_checkRequests(Signal * signal)322 Dbspj::nodeFail_checkRequests(Signal* signal)
323 {
324   jam();
325   const Uint32 type = signal->theData[0];
326   const Uint32 bucket = signal->theData[1];
327 
328   NdbNodeBitmask failed;
329   failed.assign(NdbNodeBitmask::Size, signal->theData+2);
330 
331   Request_iterator iter;
332   Request_hash * hash;
333   switch(type){
334   case 1:
335     hash = &m_lookup_request_hash;
336     break;
337   case 2:
338     hash = &m_scan_request_hash;
339     break;
340   }
341   hash->next(bucket, iter);
342 
343   const Uint32 RT_BREAK = 64;
344   for(Uint32 i = 0; (i<RT_BREAK || iter.bucket == bucket) &&
345         !iter.curr.isNull(); i++)
346   {
347     jam();
348 
349     Ptr<Request> requestPtr = iter.curr;
350     hash->next(iter);
351     i += nodeFail(signal, requestPtr, failed);
352   }
353 
354   if (!iter.curr.isNull())
355   {
356     jam();
357     signal->theData[0] = type;
358     signal->theData[1] = bucket;
359     failed.copyto(NdbNodeBitmask::Size, signal->theData+2);
360     sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
361                JBB);
362   }
363   else if (type == 1)
364   {
365     jam();
366     signal->theData[0] = 2;
367     signal->theData[1] = 0;
368     failed.copyto(NdbNodeBitmask::Size, signal->theData+2);
369     sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
370                JBB);
371   }
372   else if (type == 2)
373   {
374     jam();
375     ndbout_c("Finished with handling node-failure");
376   }
377 }
378 
379 /**
380  * MODULE LQHKEYREQ
381  */
execLQHKEYREQ(Signal * signal)382 void Dbspj::execLQHKEYREQ(Signal* signal)
383 {
384   jamEntry();
385   c_Counters.incr_counter(CI_READS_RECEIVED, 1);
386 
387   const LqhKeyReq* req = reinterpret_cast<const LqhKeyReq*>(signal->getDataPtr());
388 
389   /**
390    * #0 - KEYINFO contains key for first operation (used for hash in TC)
391    * #1 - ATTRINFO contains tree + parameters
392    *      (unless StoredProcId is set, when only paramters are sent,
393    *       but this is not yet implemented)
394    */
395   SectionHandle handle = SectionHandle(this, signal);
396   SegmentedSectionPtr ssPtr;
397   handle.getSection(ssPtr, LqhKeyReq::AttrInfoSectionNum);
398 
399   Uint32 err;
400   Ptr<Request> requestPtr = { 0, RNIL };
401   do
402   {
403     ArenaHead ah;
404     err = DbspjErr::OutOfQueryMemory;
405     if (unlikely(!m_arenaAllocator.seize(ah)))
406       break;
407 
408 
409     m_request_pool.seize(ah, requestPtr);
410 
411     new (requestPtr.p) Request(ah);
412     do_init(requestPtr.p, req, signal->getSendersBlockRef());
413 
414     Uint32 len_cnt;
415 
416     {
417       SectionReader r0(ssPtr, getSectionSegmentPool());
418 
419       err = DbspjErr::ZeroLengthQueryTree;
420       if (unlikely(!r0.getWord(&len_cnt)))
421         break;
422     }
423 
424     Uint32 len = QueryTree::getLength(len_cnt);
425     Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
426 
427     {
428       SectionReader treeReader(ssPtr, getSectionSegmentPool());
429       SectionReader paramReader(ssPtr, getSectionSegmentPool());
430       paramReader.step(len); // skip over tree to parameters
431 
432       Build_context ctx;
433       ctx.m_resultRef = req->variableData[0];
434       ctx.m_savepointId = req->savePointId;
435       ctx.m_scanPrio = 1;
436       ctx.m_start_signal = signal;
437       ctx.m_keyPtr.i = handle.m_ptr[LqhKeyReq::KeyInfoSectionNum].i;
438       ctx.m_senderRef = signal->getSendersBlockRef();
439 
440       err = build(ctx, requestPtr, treeReader, paramReader);
441       if (unlikely(err != 0))
442         break;
443     }
444 
445     /**
446      * a query being shipped as a LQHKEYREQ may only return finite rows
447      *   i.e be a (multi-)lookup
448      */
449     ndbassert(requestPtr.p->isLookup());
450     ndbassert(requestPtr.p->m_node_cnt == cnt);
451     err = DbspjErr::InvalidRequest;
452     if (unlikely(!requestPtr.p->isLookup() || requestPtr.p->m_node_cnt != cnt))
453       break;
454 
455     /**
456      * Store request in list(s)/hash(es)
457      */
458     store_lookup(requestPtr);
459 
460     release(ssPtr);
461     handle.clear();
462 
463     start(signal, requestPtr);
464     return;
465   } while (0);
466 
467   /**
468    * Error handling below,
469    *  'err' may contain error code.
470    */
471   if (!requestPtr.isNull())
472   {
473     jam();
474     m_request_pool.release(requestPtr);
475   }
476   releaseSections(handle);
477   handle_early_lqhkey_ref(signal, req, err);
478 }
479 
480 void
do_init(Request * requestP,const LqhKeyReq * req,Uint32 senderRef)481 Dbspj::do_init(Request* requestP, const LqhKeyReq* req, Uint32 senderRef)
482 {
483   requestP->m_bits = 0;
484   requestP->m_errCode = 0;
485   requestP->m_state = Request::RS_BUILDING;
486   requestP->m_node_cnt = 0;
487   requestP->m_cnt_active = 0;
488   requestP->m_rows = 0;
489   requestP->m_active_nodes.clear();
490   requestP->m_outstanding = 0;
491   requestP->m_transId[0] = req->transId1;
492   requestP->m_transId[1] = req->transId2;
493   bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
494 #ifdef SPJ_TRACE_TIME
495   requestP->m_cnt_batches = 0;
496   requestP->m_sum_rows = 0;
497   requestP->m_sum_running = 0;
498   requestP->m_sum_waiting = 0;
499   requestP->m_save_time = spj_now();
500 #endif
501   const Uint32 reqInfo = req->requestInfo;
502   Uint32 tmp = req->clientConnectPtr;
503   if (LqhKeyReq::getDirtyFlag(reqInfo) &&
504       LqhKeyReq::getOperation(reqInfo) == ZREAD)
505   {
506     jam();
507 
508     ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
509     //const Uint32 apiRef   = lqhKeyReq->variableData[0];
510     //const Uint32 apiOpRec = lqhKeyReq->variableData[1];
511     tmp = req->variableData[1];
512     requestP->m_senderData = tmp;
513     requestP->m_senderRef = senderRef;
514   }
515   else
516   {
517     if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
518     {
519       if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
520         tmp = req->variableData[2];
521       else
522         tmp = req->variableData[0];
523     }
524     requestP->m_senderData = tmp;
525     requestP->m_senderRef = senderRef;
526   }
527   requestP->m_rootResultData = tmp;
528 }
529 
530 void
store_lookup(Ptr<Request> requestPtr)531 Dbspj::store_lookup(Ptr<Request> requestPtr)
532 {
533   ndbassert(requestPtr.p->isLookup());
534   Ptr<Request> tmp;
535   bool found = m_lookup_request_hash.find(tmp, *requestPtr.p);
536   ndbrequire(found == false);
537   m_lookup_request_hash.add(requestPtr);
538 }
539 
540 void
handle_early_lqhkey_ref(Signal * signal,const LqhKeyReq * lqhKeyReq,Uint32 err)541 Dbspj::handle_early_lqhkey_ref(Signal* signal,
542                                const LqhKeyReq * lqhKeyReq,
543                                Uint32 err)
544 {
545   /**
546    * Error path...
547    */
548   ndbrequire(err);
549   const Uint32 reqInfo = lqhKeyReq->requestInfo;
550   const Uint32 transid[2] = { lqhKeyReq->transId1, lqhKeyReq->transId2 };
551 
552   if (LqhKeyReq::getDirtyFlag(reqInfo) &&
553       LqhKeyReq::getOperation(reqInfo) == ZREAD)
554   {
555     jam();
556     /* Dirty read sends TCKEYREF direct to client, and nothing to TC */
557     ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
558     const Uint32 apiRef   = lqhKeyReq->variableData[0];
559     const Uint32 apiOpRec = lqhKeyReq->variableData[1];
560 
561     TcKeyRef* const tcKeyRef = reinterpret_cast<TcKeyRef*>(signal->getDataPtrSend());
562 
563     tcKeyRef->connectPtr = apiOpRec;
564     tcKeyRef->transId[0] = transid[0];
565     tcKeyRef->transId[1] = transid[1];
566     tcKeyRef->errorCode = err;
567     sendTCKEYREF(signal, apiRef, signal->getSendersBlockRef());
568   }
569   else
570   {
571     jam();
572     const Uint32 returnref = signal->getSendersBlockRef();
573     const Uint32 clientPtr = lqhKeyReq->clientConnectPtr;
574 
575     Uint32 TcOprec = clientPtr;
576     if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
577     {
578       if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
579         TcOprec = lqhKeyReq->variableData[2];
580       else
581         TcOprec = lqhKeyReq->variableData[0];
582     }
583 
584     LqhKeyRef* const ref = reinterpret_cast<LqhKeyRef*>(signal->getDataPtrSend());
585     ref->userRef = clientPtr;
586     ref->connectPtr = TcOprec;
587     ref->errorCode = err;
588     ref->transId1 = transid[0];
589     ref->transId2 = transid[1];
590     sendSignal(returnref, GSN_LQHKEYREF, signal,
591                LqhKeyRef::SignalLength, JBB);
592   }
593 }
594 
595 void
sendTCKEYREF(Signal * signal,Uint32 ref,Uint32 routeRef)596 Dbspj::sendTCKEYREF(Signal* signal, Uint32 ref, Uint32 routeRef)
597 {
598   const Uint32 nodeId = refToNode(ref);
599   const bool connectedToNode = getNodeInfo(nodeId).m_connected;
600 
601   if (likely(connectedToNode))
602   {
603     jam();
604     sendSignal(ref, GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB);
605   }
606   else
607   {
608     jam();
609     memmove(signal->theData+25, signal->theData, 4*TcKeyRef::SignalLength);
610     RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend();
611     ord->dstRef = ref;
612     ord->srcRef = reference();
613     ord->gsn = GSN_TCKEYREF;
614     ord->cnt = 0;
615     LinearSectionPtr ptr[3];
616     ptr[0].p = signal->theData+25;
617     ptr[0].sz = TcKeyRef::SignalLength;
618     sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
619                ptr, 1);
620   }
621 }
622 
623 void
sendTCKEYCONF(Signal * signal,Uint32 len,Uint32 ref,Uint32 routeRef)624 Dbspj::sendTCKEYCONF(Signal* signal, Uint32 len, Uint32 ref, Uint32 routeRef)
625 {
626   const Uint32 nodeId = refToNode(ref);
627   const bool connectedToNode = getNodeInfo(nodeId).m_connected;
628 
629   if (likely(connectedToNode))
630   {
631     jam();
632     sendSignal(ref, GSN_TCKEYCONF, signal, len, JBB);
633   }
634   else
635   {
636     jam();
637     memmove(signal->theData+25, signal->theData, 4*len);
638     RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend();
639     ord->dstRef = ref;
640     ord->srcRef = reference();
641     ord->gsn = GSN_TCKEYCONF;
642     ord->cnt = 0;
643     LinearSectionPtr ptr[3];
644     ptr[0].p = signal->theData+25;
645     ptr[0].sz = len;
646     sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
647                ptr, 1);
648   }
649 }
650 
651 /**
652  * END - MODULE LQHKEYREQ
653  */
654 
655 
656 /**
657  * MODULE SCAN_FRAGREQ
658  */
659 void
execSCAN_FRAGREQ(Signal * signal)660 Dbspj::execSCAN_FRAGREQ(Signal* signal)
661 {
662   jamEntry();
663 
664   /* Reassemble if the request was fragmented */
665   if (!assembleFragments(signal))
666   {
667     jam();
668     return;
669   }
670 
671   const ScanFragReq * req = (ScanFragReq *)&signal->theData[0];
672 
673 #ifdef DEBUG_SCAN_FRAGREQ
674   ndbout_c("Incomming SCAN_FRAGREQ ");
675   printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
676                     ScanFragReq::SignalLength + 2,
677                     DBLQH);
678 #endif
679 
680   /**
681    * #0 - ATTRINFO contains tree + parameters
682    *      (unless StoredProcId is set, when only paramters are sent,
683    *       but this is not yet implemented)
684    * #1 - KEYINFO if first op is index scan - contains bounds for first scan
685    *              if first op is lookup - contains keyinfo for lookup
686    */
687   SectionHandle handle = SectionHandle(this, signal);
688   SegmentedSectionPtr ssPtr;
689   handle.getSection(ssPtr, ScanFragReq::AttrInfoSectionNum);
690 
691   Uint32 err;
692   Ptr<Request> requestPtr = { 0, RNIL };
693   do
694   {
695     ArenaHead ah;
696     err = DbspjErr::OutOfQueryMemory;
697     if (unlikely(!m_arenaAllocator.seize(ah)))
698       break;
699 
700     m_request_pool.seize(ah, requestPtr);
701 
702     new (requestPtr.p) Request(ah);
703     do_init(requestPtr.p, req, signal->getSendersBlockRef());
704 
705     Uint32 len_cnt;
706     {
707       SectionReader r0(ssPtr, getSectionSegmentPool());
708       err = DbspjErr::ZeroLengthQueryTree;
709       if (unlikely(!r0.getWord(&len_cnt)))
710         break;
711     }
712 
713     Uint32 len = QueryTree::getLength(len_cnt);
714     Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
715 
716     {
717       SectionReader treeReader(ssPtr, getSectionSegmentPool());
718       SectionReader paramReader(ssPtr, getSectionSegmentPool());
719       paramReader.step(len); // skip over tree to parameters
720 
721       Build_context ctx;
722       ctx.m_resultRef = req->resultRef;
723       ctx.m_scanPrio = ScanFragReq::getScanPrio(req->requestInfo);
724       ctx.m_savepointId = req->savePointId;
725       ctx.m_batch_size_rows = req->batch_size_rows;
726       ctx.m_start_signal = signal;
727       ctx.m_senderRef = signal->getSendersBlockRef();
728 
729       if (handle.m_cnt > 1)
730       {
731         jam();
732         ctx.m_keyPtr.i = handle.m_ptr[ScanFragReq::KeyInfoSectionNum].i;
733       }
734       else
735       {
736         jam();
737         ctx.m_keyPtr.i = RNIL;
738       }
739 
740       err = build(ctx, requestPtr, treeReader, paramReader);
741       if (unlikely(err != 0))
742         break;
743     }
744 
745     ndbassert(requestPtr.p->isScan());
746     ndbassert(requestPtr.p->m_node_cnt == cnt);
747     err = DbspjErr::InvalidRequest;
748     if (unlikely(!requestPtr.p->isScan() || requestPtr.p->m_node_cnt != cnt))
749       break;
750 
751     /**
752      * Store request in list(s)/hash(es)
753      */
754     store_scan(requestPtr);
755 
756     release(ssPtr);
757     handle.clear();
758 
759     start(signal, requestPtr);
760     return;
761   } while (0);
762 
763   if (!requestPtr.isNull())
764   {
765     jam();
766     m_request_pool.release(requestPtr);
767   }
768   releaseSections(handle);
769   handle_early_scanfrag_ref(signal, req, err);
770 }
771 
772 void
do_init(Request * requestP,const ScanFragReq * req,Uint32 senderRef)773 Dbspj::do_init(Request* requestP, const ScanFragReq* req, Uint32 senderRef)
774 {
775   requestP->m_bits = 0;
776   requestP->m_errCode = 0;
777   requestP->m_state = Request::RS_BUILDING;
778   requestP->m_node_cnt = 0;
779   requestP->m_cnt_active = 0;
780   requestP->m_rows = 0;
781   requestP->m_active_nodes.clear();
782   requestP->m_outstanding = 0;
783   requestP->m_senderRef = senderRef;
784   requestP->m_senderData = req->senderData;
785   requestP->m_transId[0] = req->transId1;
786   requestP->m_transId[1] = req->transId2;
787   requestP->m_rootResultData = req->resultData;
788   bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
789 #ifdef SPJ_TRACE_TIME
790   requestP->m_cnt_batches = 0;
791   requestP->m_sum_rows = 0;
792   requestP->m_sum_running = 0;
793   requestP->m_sum_waiting = 0;
794   requestP->m_save_time = spj_now();
795 #endif
796 }
797 
798 void
store_scan(Ptr<Request> requestPtr)799 Dbspj::store_scan(Ptr<Request> requestPtr)
800 {
801   ndbassert(requestPtr.p->isScan());
802   Ptr<Request> tmp;
803   bool found = m_scan_request_hash.find(tmp, *requestPtr.p);
804   ndbrequire(found == false);
805   m_scan_request_hash.add(requestPtr);
806 }
807 
808 void
handle_early_scanfrag_ref(Signal * signal,const ScanFragReq * _req,Uint32 err)809 Dbspj::handle_early_scanfrag_ref(Signal* signal,
810                                  const ScanFragReq * _req,
811                                  Uint32 err)
812 {
813   ScanFragReq req = *_req;
814   Uint32 senderRef = signal->getSendersBlockRef();
815 
816   ScanFragRef * ref = (ScanFragRef*)&signal->theData[0];
817   ref->senderData = req.senderData;
818   ref->transId1 = req.transId1;
819   ref->transId2 = req.transId2;
820   ref->errorCode = err;
821   sendSignal(senderRef, GSN_SCAN_FRAGREF, signal,
822              ScanFragRef::SignalLength, JBB);
823 }
824 
825 /**
826  * END - MODULE SCAN_FRAGREQ
827  */
828 
829 /**
830  * MODULE GENERIC
831  */
832 Uint32
build(Build_context & ctx,Ptr<Request> requestPtr,SectionReader & tree,SectionReader & param)833 Dbspj::build(Build_context& ctx,
834              Ptr<Request> requestPtr,
835              SectionReader & tree,
836              SectionReader & param)
837 {
838   Uint32 tmp0, tmp1;
839   Uint32 err = DbspjErr::ZeroLengthQueryTree;
840   ctx.m_cnt = 0;
841   ctx.m_scan_cnt = 0;
842 
843   tree.getWord(&tmp0);
844   Uint32 loop = QueryTree::getNodeCnt(tmp0);
845 
846   DEBUG("::build()");
847   err = DbspjErr::InvalidTreeNodeCount;
848   if (loop == 0 || loop > NDB_SPJ_MAX_TREE_NODES)
849   {
850     DEBUG_CRASH();
851     goto error;
852   }
853 
854   while (ctx.m_cnt < loop)
855   {
856     DEBUG(" - loop " << ctx.m_cnt << " pos: " << tree.getPos().currPos);
857     tree.peekWord(&tmp0);
858     param.peekWord(&tmp1);
859     Uint32 node_op = QueryNode::getOpType(tmp0);
860     Uint32 node_len = QueryNode::getLength(tmp0);
861     Uint32 param_op = QueryNodeParameters::getOpType(tmp1);
862     Uint32 param_len = QueryNodeParameters::getLength(tmp1);
863 
864     err = DbspjErr::QueryNodeTooBig;
865     if (unlikely(node_len >= NDB_ARRAY_SIZE(m_buffer0)))
866     {
867       DEBUG_CRASH();
868       goto error;
869     }
870 
871     err = DbspjErr::QueryNodeParametersTooBig;
872     if (unlikely(param_len >= NDB_ARRAY_SIZE(m_buffer1)))
873     {
874       DEBUG_CRASH();
875       goto error;
876     }
877 
878     err = DbspjErr::InvalidTreeNodeSpecification;
879     if (unlikely(tree.getWords(m_buffer0, node_len) == false))
880     {
881       DEBUG_CRASH();
882       goto error;
883     }
884 
885     err = DbspjErr::InvalidTreeParametersSpecification;
886     if (unlikely(param.getWords(m_buffer1, param_len) == false))
887     {
888       DEBUG_CRASH();
889       goto error;
890     }
891 
892 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
893     printf("node: ");
894     for (Uint32 i = 0; i<node_len; i++)
895       printf("0x%.8x ", m_buffer0[i]);
896     printf("\n");
897 
898     printf("param: ");
899     for (Uint32 i = 0; i<param_len; i++)
900       printf("0x%.8x ", m_buffer1[i]);
901     printf("\n");
902 #endif
903 
904     err = DbspjErr::UnknowQueryOperation;
905     if (unlikely(node_op != param_op))
906     {
907       DEBUG_CRASH();
908       goto error;
909     }
910 
911     const OpInfo* info = getOpInfo(node_op);
912     if (unlikely(info == 0))
913     {
914       DEBUG_CRASH();
915       goto error;
916     }
917 
918     QueryNode* qn = (QueryNode*)m_buffer0;
919     QueryNodeParameters * qp = (QueryNodeParameters*)m_buffer1;
920     qn->len = node_len;
921     qp->len = param_len;
922     err = (this->*(info->m_build))(ctx, requestPtr, qn, qp);
923     if (unlikely(err != 0))
924     {
925       DEBUG_CRASH();
926       goto error;
927     }
928 
929     /**
930      * only first node gets access to signal
931      */
932     ctx.m_start_signal = 0;
933 
934     /**
935      * TODO handle error, by aborting request
936      */
937     ndbrequire(ctx.m_cnt < NDB_ARRAY_SIZE(ctx.m_node_list));
938     ctx.m_cnt++;
939   }
940   requestPtr.p->m_node_cnt = ctx.m_cnt;
941 
942   /**
943    * Init ROW_BUFFERS for those TreeNodes requiring either
944    * T_ROW_BUFFER or T_ROW_BUFFER_MAP.
945    */
946   if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
947   {
948     Ptr<TreeNode> treeNodePtr;
949     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
950     for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
951     {
952       if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
953       {
954         jam();
955         treeNodePtr.p->m_row_map.init();
956       }
957       else if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
958       {
959         jam();
960         treeNodePtr.p->m_row_list.init();
961       }
962     }
963   }
964 
965   if (ctx.m_scan_cnt > 1)
966   {
967     jam();
968     requestPtr.p->m_bits |= Request::RT_MULTI_SCAN;
969 
970     /**
971      * Iff, multi-scan is non-bushy (normal case)
972      *   we don't strictly need RT_VAR_ALLOC for RT_ROW_BUFFERS
973      *   but could instead pop-row stack frame,
974      *     however this is not implemented...
975      *
976      * so, use RT_VAR_ALLOC
977      */
978     if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
979     {
980       jam();
981       requestPtr.p->m_bits |= Request::RT_VAR_ALLOC;
982     }
983   }
984 
985   return 0;
986 
987 error:
988   jam();
989   return err;
990 }
991 
992 Uint32
createNode(Build_context & ctx,Ptr<Request> requestPtr,Ptr<TreeNode> & treeNodePtr)993 Dbspj::createNode(Build_context& ctx, Ptr<Request> requestPtr,
994                   Ptr<TreeNode> & treeNodePtr)
995 {
996   /**
997    * In the future, we can have different TreeNode-allocation strategies
998    *   that can be setup using the Build_context
999    *
1000    */
1001   if (m_treenode_pool.seize(requestPtr.p->m_arena, treeNodePtr))
1002   {
1003     DEBUG("createNode - seize -> ptrI: " << treeNodePtr.i);
1004     new (treeNodePtr.p) TreeNode(requestPtr.i);
1005     ctx.m_node_list[ctx.m_cnt] = treeNodePtr;
1006     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1007     list.addLast(treeNodePtr);
1008     treeNodePtr.p->m_node_no = ctx.m_cnt;
1009     return 0;
1010   }
1011   return DbspjErr::OutOfOperations;
1012 }
1013 
1014 void
start(Signal * signal,Ptr<Request> requestPtr)1015 Dbspj::start(Signal* signal,
1016              Ptr<Request> requestPtr)
1017 {
1018   if (requestPtr.p->m_bits & Request::RT_NEED_PREPARE)
1019   {
1020     jam();
1021     requestPtr.p->m_outstanding = 0;
1022     requestPtr.p->m_state = Request::RS_PREPARING;
1023 
1024     Ptr<TreeNode> nodePtr;
1025     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1026     for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1027     {
1028       jam();
1029       ndbrequire(nodePtr.p->m_info != 0);
1030       if (nodePtr.p->m_info->m_prepare != 0)
1031       {
1032         jam();
1033         (this->*(nodePtr.p->m_info->m_prepare))(signal, requestPtr, nodePtr);
1034       }
1035     }
1036 
1037     /**
1038      * preferably RT_NEED_PREPARE should only be set if blocking
1039      * calls are used, in which case m_outstanding should have been increased
1040      */
1041     ndbassert(requestPtr.p->m_outstanding);
1042   }
1043 
1044   checkPrepareComplete(signal, requestPtr, 0);
1045 }
1046 
1047 void
checkPrepareComplete(Signal * signal,Ptr<Request> requestPtr,Uint32 cnt)1048 Dbspj::checkPrepareComplete(Signal * signal, Ptr<Request> requestPtr,
1049                             Uint32 cnt)
1050 {
1051   ndbrequire(requestPtr.p->m_outstanding >= cnt);
1052   requestPtr.p->m_outstanding -= cnt;
1053 
1054   if (requestPtr.p->m_outstanding == 0)
1055   {
1056     jam();
1057 
1058     if (unlikely((requestPtr.p->m_state & Request::RS_ABORTING) != 0))
1059     {
1060       jam();
1061       batchComplete(signal, requestPtr);
1062       return;
1063     }
1064 
1065     requestPtr.p->m_state = Request::RS_RUNNING;
1066     Ptr<TreeNode> nodePtr;
1067     {
1068       Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1069       ndbrequire(list.first(nodePtr));
1070     }
1071     ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_start != 0);
1072     (this->*(nodePtr.p->m_info->m_start))(signal, requestPtr, nodePtr);
1073   }
1074 }
1075 
1076 void
checkBatchComplete(Signal * signal,Ptr<Request> requestPtr,Uint32 cnt)1077 Dbspj::checkBatchComplete(Signal * signal, Ptr<Request> requestPtr,
1078                           Uint32 cnt)
1079 {
1080   ndbrequire(requestPtr.p->m_outstanding >= cnt);
1081   requestPtr.p->m_outstanding -= cnt;
1082 
1083   if (requestPtr.p->m_outstanding == 0)
1084   {
1085     jam();
1086     batchComplete(signal, requestPtr);
1087   }
1088 }
1089 
1090 void
batchComplete(Signal * signal,Ptr<Request> requestPtr)1091 Dbspj::batchComplete(Signal* signal, Ptr<Request> requestPtr)
1092 {
1093   ndbrequire(requestPtr.p->m_outstanding == 0); // "definition" of batchComplete
1094 
1095   bool is_complete = requestPtr.p->m_cnt_active == 0;
1096   bool need_complete_phase = requestPtr.p->m_bits & Request::RT_NEED_COMPLETE;
1097 
1098   if (requestPtr.p->isLookup())
1099   {
1100     ndbassert(requestPtr.p->m_cnt_active == 0);
1101   }
1102 
1103   if (!is_complete || (is_complete && need_complete_phase == false))
1104   {
1105     /**
1106      * one batch complete, and either
1107      *   - request not complete
1108      *   - or not complete_phase needed
1109      */
1110     jam();
1111 
1112     if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1113     {
1114       ndbassert(is_complete);
1115     }
1116 
1117     prepareNextBatch(signal, requestPtr);
1118     sendConf(signal, requestPtr, is_complete);
1119   }
1120   else if (is_complete && need_complete_phase)
1121   {
1122     jam();
1123     /**
1124      * run complete-phase
1125      */
1126     complete(signal, requestPtr);
1127     return;
1128   }
1129 
1130   if (requestPtr.p->m_cnt_active == 0)
1131   {
1132     jam();
1133     /**
1134      * request completed
1135      */
1136     cleanup(requestPtr);
1137   }
1138   else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
1139   {
1140     jam();
1141     /**
1142      * release unneeded buffers and position cursor for SCAN_NEXTREQ
1143      */
1144     releaseScanBuffers(requestPtr);
1145   }
1146   else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
1147   {
1148     jam();
1149     /**
1150      * if not multiple scans in request, simply release all pages allocated
1151      * for row buffers (all rows will be released anyway)
1152      */
1153     releaseRequestBuffers(requestPtr, true);
1154   }
1155 }
1156 
1157 /**
1158  * Locate next TreeNode(s) to retrieve more rows from.
1159  *
1160  *   Calculate set of the 'm_active_nodes' we will receive from in NEXTREQ.
1161  *   Add these TreeNodes to the cursor list to be iterated.
1162  */
1163 void
prepareNextBatch(Signal * signal,Ptr<Request> requestPtr)1164 Dbspj::prepareNextBatch(Signal* signal, Ptr<Request> requestPtr)
1165 {
1166   requestPtr.p->m_cursor_nodes.init();
1167   requestPtr.p->m_active_nodes.clear();
1168 
1169   if (requestPtr.p->m_cnt_active == 0)
1170   {
1171     jam();
1172     return;
1173   }
1174 
1175   if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT)
1176   {
1177     /**
1178      * If REPEAT_SCAN_RESULT we handle bushy scans by return more *new* rows
1179      * from only one of the active child scans. If there are multiple
1180      * bushy scans not being able to return their current result set in
1181      * a single batch, result sets from the other child scans are repeated
1182      * until all rows has been returned to the API client.
1183      *
1184      * Hence, the cross joined results from the bushy scans are partly
1185      * produced within the SPJ block on a 'batchsize granularity',
1186      * and partly is the responsibility of the API-client by iterating
1187      * the result rows within the current result batches.
1188      * (Opposed to non-REPEAT_SCAN_RESULT, the client only have to care about
1189      *  the current batched rows - no buffering is required)
1190      */
1191     jam();
1192     Ptr<TreeNode> nodePtr;
1193     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1194 
1195     /**
1196      * Locate last 'TN_ACTIVE' TreeNode which is the only one choosen
1197      * to return more *new* rows.
1198      */
1199     for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1200     {
1201       if (nodePtr.p->m_state == TreeNode::TN_ACTIVE)
1202       {
1203         jam();
1204         DEBUG("Will fetch more from 'active' m_node_no: " << nodePtr.p->m_node_no);
1205         /**
1206          * A later NEXTREQ will request a *new* batch of rows from this TreeNode.
1207          */
1208         registerActiveCursor(requestPtr, nodePtr);
1209         break;
1210       }
1211     }
1212 
1213     /**
1214      *  Restart/repeat other (index scan) child batches which:
1215      *    - Being 'after' nodePtr located above.
1216      *    - Not being an ancestor of (depends on) any 'active' TreeNode.
1217      *      (As these scans are started when rows from these parent nodes
1218      *      arrives.)
1219      */
1220     if (!nodePtr.isNull())
1221     {
1222       jam();
1223       DEBUG("Calculate 'active', w/ cursor on m_node_no: " << nodePtr.p->m_node_no);
1224 
1225       /* Restart any partial index-scans after this 'TN_ACTIVE' TreeNode */
1226       for (list.next(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1227       {
1228         jam();
1229         if (!nodePtr.p->m_ancestors.overlaps (requestPtr.p->m_active_nodes))
1230         {
1231           jam();
1232           ndbrequire(nodePtr.p->m_state != TreeNode::TN_ACTIVE);
1233           ndbrequire(nodePtr.p->m_info != 0);
1234           if (nodePtr.p->m_info->m_parent_batch_repeat != 0)
1235           {
1236             jam();
1237             (this->*(nodePtr.p->m_info->m_parent_batch_repeat))(signal,
1238                                                                 requestPtr,
1239                                                                 nodePtr);
1240           }
1241         }
1242       }
1243     } // if (!nodePtr.isNull()
1244   }
1245   else  // not 'RT_REPEAT_SCAN_RESULT'
1246   {
1247     /**
1248      * If not REPEAT_SCAN_RESULT multiple active TreeNodes may return their
1249      * remaining result simultaneously. In case of bushy-scans, these
1250      * concurrent result streams are cross joins of each other
1251      * in SQL terms. In order to produce the cross joined result, it is
1252      * the responsibility of the API-client to buffer these streams and
1253      * iterate them to produce the cross join.
1254      */
1255     jam();
1256     Ptr<TreeNode> nodePtr;
1257     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1258     TreeNodeBitMask ancestors_of_active;
1259 
1260     for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1261     {
1262       /**
1263        * If we are active (i.e not consumed all rows originating
1264        *   from parent rows) and we are not in the set of parents
1265        *   for any active child:
1266        *
1267        * Then, this is a position that execSCAN_NEXTREQ should continue
1268        */
1269       if (nodePtr.p->m_state == TreeNode::TN_ACTIVE &&
1270          !ancestors_of_active.get (nodePtr.p->m_node_no))
1271       {
1272         jam();
1273         DEBUG("Add 'active' m_node_no: " << nodePtr.p->m_node_no);
1274         registerActiveCursor(requestPtr, nodePtr);
1275         ancestors_of_active.bitOR(nodePtr.p->m_ancestors);
1276       }
1277     }
1278   } // if (RT_REPEAT_SCAN_RESULT)
1279 
1280   DEBUG("Calculated 'm_active_nodes': " << requestPtr.p->m_active_nodes.rep.data[0]);
1281 }
1282 
1283 void
sendConf(Signal * signal,Ptr<Request> requestPtr,bool is_complete)1284 Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete)
1285 {
1286   if (requestPtr.p->isScan())
1287   {
1288     if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1289     {
1290       jam();
1291       /**
1292        * We aborted request ourselves (due to node-failure ?)
1293        *   but TC haven't contacted us...so we can't reply yet...
1294        */
1295       ndbrequire(is_complete);
1296       ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
1297       return;
1298     }
1299 
1300     if (requestPtr.p->m_errCode == 0)
1301     {
1302       jam();
1303       ScanFragConf * conf=
1304         reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
1305       conf->senderData = requestPtr.p->m_senderData;
1306       conf->transId1 = requestPtr.p->m_transId[0];
1307       conf->transId2 = requestPtr.p->m_transId[1];
1308       conf->completedOps = requestPtr.p->m_rows;
1309       conf->fragmentCompleted = is_complete ? 1 : 0;
1310       conf->total_len = requestPtr.p->m_active_nodes.rep.data[0];
1311 
1312       c_Counters.incr_counter(CI_SCAN_BATCHES_RETURNED, 1);
1313       c_Counters.incr_counter(CI_SCAN_ROWS_RETURNED, requestPtr.p->m_rows);
1314 
1315 #ifdef SPJ_TRACE_TIME
1316       Uint64 now = spj_now();
1317       Uint64 then = requestPtr.p->m_save_time;
1318 
1319       requestPtr.p->m_sum_rows += requestPtr.p->m_rows;
1320       requestPtr.p->m_sum_running += Uint32(now - then);
1321       requestPtr.p->m_cnt_batches++;
1322       requestPtr.p->m_save_time = now;
1323 
1324       if (is_complete)
1325       {
1326         Uint32 cnt = requestPtr.p->m_cnt_batches;
1327         ndbout_c("batches: %u avg_rows: %u avg_running: %u avg_wait: %u",
1328                  cnt,
1329                  (requestPtr.p->m_sum_rows / cnt),
1330                  (requestPtr.p->m_sum_running / cnt),
1331                  cnt == 1 ? 0 : requestPtr.p->m_sum_waiting / (cnt - 1));
1332       }
1333 #endif
1334 
1335       /**
1336        * reset for next batch
1337        */
1338       requestPtr.p->m_rows = 0;
1339       if (!is_complete)
1340       {
1341         jam();
1342         requestPtr.p->m_state |= Request::RS_WAITING;
1343       }
1344 #ifdef DEBUG_SCAN_FRAGREQ
1345       ndbout_c("Dbspj::sendConf() sending SCAN_FRAGCONF ");
1346       printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1347                          conf->total_len,
1348                          DBLQH);
1349 #endif
1350       sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
1351                  ScanFragConf::SignalLength, JBB);
1352     }
1353     else
1354     {
1355       jam();
1356       ndbrequire(is_complete);
1357       ScanFragRef * ref=
1358         reinterpret_cast<ScanFragRef*>(signal->getDataPtrSend());
1359       ref->senderData = requestPtr.p->m_senderData;
1360       ref->transId1 = requestPtr.p->m_transId[0];
1361       ref->transId2 = requestPtr.p->m_transId[1];
1362       ref->errorCode = requestPtr.p->m_errCode;
1363 
1364       sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
1365                  ScanFragRef::SignalLength, JBB);
1366     }
1367   }
1368   else
1369   {
1370     ndbassert(is_complete);
1371     if (requestPtr.p->m_errCode)
1372     {
1373       jam();
1374       Uint32 resultRef = getResultRef(requestPtr);
1375       TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr();
1376       ref->connectPtr = requestPtr.p->m_senderData;
1377       ref->transId[0] = requestPtr.p->m_transId[0];
1378       ref->transId[1] = requestPtr.p->m_transId[1];
1379       ref->errorCode = requestPtr.p->m_errCode;
1380       ref->errorData = 0;
1381 
1382       sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
1383     }
1384   }
1385 }
1386 
1387 Uint32
getResultRef(Ptr<Request> requestPtr)1388 Dbspj::getResultRef(Ptr<Request> requestPtr)
1389 {
1390   Ptr<TreeNode> nodePtr;
1391   Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1392   for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1393   {
1394     if (nodePtr.p->m_info == &g_LookupOpInfo)
1395     {
1396       jam();
1397       return nodePtr.p->m_lookup_data.m_api_resultRef;
1398     }
1399   }
1400   ndbrequire(false);
1401   return 0;
1402 }
1403 
1404 void
releaseScanBuffers(Ptr<Request> requestPtr)1405 Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
1406 {
1407   Ptr<TreeNode> treeNodePtr;
1408   Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1409   TreeNodeBitMask ancestors_of_active;
1410 
1411   for (list.last(treeNodePtr); !treeNodePtr.isNull(); list.prev(treeNodePtr))
1412   {
1413     /**
1414      * If there are no active children,
1415      *   then we can cleanup in our sub-branch
1416      */
1417     if (!ancestors_of_active.get(treeNodePtr.p->m_node_no))
1418     {
1419       if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1420       {
1421         jam();
1422         releaseNodeRows(requestPtr, treeNodePtr);
1423       }
1424 
1425       /**
1426        * Cleanup ACTIVE nodes fetching more rows in a NEXTREQ,
1427        * or nodes being in 'm_active_nodes' as they will 'repeat'.
1428        * (and then become active)
1429        */
1430       if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE ||
1431           requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no))
1432       {
1433         jam();
1434         cleanupChildBranch(requestPtr, treeNodePtr);
1435       }
1436     }
1437 
1438     /**
1439       * Collect ancestors of all nodes which are, or will
1440       * become active in NEXTREQ (possibly repeated)
1441       */
1442     if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE ||
1443         requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no))
1444     {
1445       ancestors_of_active.bitOR(treeNodePtr.p->m_ancestors);
1446     }
1447   }
1448   /**
1449    * Needs to be atleast 1 active otherwise we should have
1450    *   taken the cleanup "path" in batchComplete
1451    */
1452   ndbrequire(requestPtr.p->m_cnt_active >= 1);
1453 }
1454 
1455 void
registerActiveCursor(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1456 Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1457 {
1458   Uint32 bit = treeNodePtr.p->m_node_no;
1459   ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
1460   requestPtr.p->m_active_nodes.set(bit);
1461 
1462   Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
1463 #ifdef VM_TRACE
1464   {
1465     Ptr<TreeNode> nodePtr;
1466     for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1467     {
1468       ndbrequire(nodePtr.i != treeNodePtr.i);
1469     }
1470   }
1471 #endif
1472   list.add(treeNodePtr);
1473 }
1474 
1475 void
cleanupChildBranch(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1476 Dbspj::cleanupChildBranch(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1477 {
1478   LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1479   Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1480   Dependency_map::ConstDataBufferIterator it;
1481   for (list.first(it); !it.isNull(); list.next(it))
1482   {
1483     jam();
1484     Ptr<TreeNode> childPtr;
1485     m_treenode_pool.getPtr(childPtr, *it.data);
1486     if (childPtr.p->m_info->m_parent_batch_cleanup != 0)
1487     {
1488       jam();
1489       (this->*(childPtr.p->m_info->m_parent_batch_cleanup))(requestPtr,
1490                                                             childPtr);
1491     }
1492     cleanupChildBranch(requestPtr,childPtr);
1493   }
1494 }
1495 
1496 void
releaseNodeRows(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1497 Dbspj::releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1498 {
1499   /**
1500    * Release all rows associated with tree node
1501    */
1502 
1503   // only when var-alloc, or else stack will be popped wo/ consideration
1504   // to individual rows
1505   ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1506   ndbassert(treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER);
1507 
1508   /**
1509    * Two ways to iterate...
1510    */
1511   if ((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0)
1512   {
1513     jam();
1514     Uint32 cnt = 0;
1515     SLFifoRowListIterator iter;
1516     for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1517     {
1518       jam();
1519       RowRef pos = iter.m_ref;
1520       next(iter);
1521       releaseRow(requestPtr, pos);
1522       cnt ++;
1523     }
1524     treeNodePtr.p->m_row_list.init();
1525     DEBUG("SLFifoRowListIterator: released " << cnt << " rows!");
1526   }
1527   else
1528   {
1529     jam();
1530     Uint32 cnt = 0;
1531     RowMapIterator iter;
1532     for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1533     {
1534       jam();
1535       RowRef pos = iter.m_ref;
1536       // this could be made more efficient by not actually seting up m_row_ptr
1537       next(iter);
1538       releaseRow(requestPtr, pos);
1539       cnt++;
1540     }
1541     treeNodePtr.p->m_row_map.init();
1542     DEBUG("RowMapIterator: released " << cnt << " rows!");
1543   }
1544 }
1545 
1546 void
releaseRow(Ptr<Request> requestPtr,RowRef pos)1547 Dbspj::releaseRow(Ptr<Request> requestPtr, RowRef pos)
1548 {
1549   ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1550   ndbassert(pos.m_allocator == 1);
1551   Ptr<RowPage> ptr;
1552   m_page_pool.getPtr(ptr, pos.m_page_id);
1553   ((Var_page*)ptr.p)->free_record(pos.m_page_pos, Var_page::CHAIN);
1554   Uint32 free_space = ((Var_page*)ptr.p)->free_space;
1555   if (free_space == 0)
1556   {
1557     jam();
1558     LocalDLFifoList<RowPage> list(m_page_pool,
1559                                   requestPtr.p->m_rowBuffer.m_page_list);
1560     list.remove(ptr);
1561     releasePage(ptr);
1562   }
1563   else if (free_space > requestPtr.p->m_rowBuffer.m_var.m_free)
1564   {
1565     LocalDLFifoList<RowPage> list(m_page_pool,
1566                                   requestPtr.p->m_rowBuffer.m_page_list);
1567     list.remove(ptr);
1568     list.addLast(ptr);
1569     requestPtr.p->m_rowBuffer.m_var.m_free = free_space;
1570   }
1571 }
1572 
1573 void
releaseRequestBuffers(Ptr<Request> requestPtr,bool reset)1574 Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr, bool reset)
1575 {
1576   /**
1577    * Release all pages for request
1578    */
1579   {
1580     {
1581       LocalDLFifoList<RowPage> list(m_page_pool,
1582                                     requestPtr.p->m_rowBuffer.m_page_list);
1583       if (!list.isEmpty())
1584       {
1585         jam();
1586         Ptr<RowPage> first, last;
1587         list.first(first);
1588         list.last(last);
1589         releasePages(first.i, last);
1590         list.remove();
1591       }
1592     }
1593     requestPtr.p->m_rowBuffer.stack_init();
1594   }
1595 
1596   if (reset)
1597   {
1598     Ptr<TreeNode> nodePtr;
1599     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1600     for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1601     {
1602       jam();
1603       if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1604       {
1605         jam();
1606         if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
1607         {
1608           jam();
1609           nodePtr.p->m_row_map.init();
1610         }
1611         else
1612         {
1613           nodePtr.p->m_row_list.init();
1614         }
1615       }
1616     }
1617   }
1618 }
1619 
1620 void
reportBatchComplete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1621 Dbspj::reportBatchComplete(Signal * signal, Ptr<Request> requestPtr,
1622                            Ptr<TreeNode> treeNodePtr)
1623 {
1624   LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1625   Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1626   Dependency_map::ConstDataBufferIterator it;
1627   for (list.first(it); !it.isNull(); list.next(it))
1628   {
1629     jam();
1630     Ptr<TreeNode> childPtr;
1631     m_treenode_pool.getPtr(childPtr, * it.data);
1632     if (childPtr.p->m_bits & TreeNode::T_NEED_REPORT_BATCH_COMPLETED)
1633     {
1634       jam();
1635       ndbrequire(childPtr.p->m_info != 0 &&
1636                  childPtr.p->m_info->m_parent_batch_complete !=0 );
1637       (this->*(childPtr.p->m_info->m_parent_batch_complete))(signal,
1638                                                              requestPtr,
1639                                                              childPtr);
1640     }
1641   }
1642 }
1643 
1644 void
abort(Signal * signal,Ptr<Request> requestPtr,Uint32 errCode)1645 Dbspj::abort(Signal* signal, Ptr<Request> requestPtr, Uint32 errCode)
1646 {
1647   jam();
1648 
1649   if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1650   {
1651     jam();
1652     goto checkcomplete;
1653   }
1654 
1655   requestPtr.p->m_state |= Request::RS_ABORTING;
1656   requestPtr.p->m_errCode = errCode;
1657 
1658   {
1659     Ptr<TreeNode> nodePtr;
1660     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1661     for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1662     {
1663       jam();
1664       /**
1665        * clear T_REPORT_BATCH_COMPLETE so that child nodes don't get confused
1666        *   during abort
1667        */
1668       nodePtr.p->m_bits &= ~Uint32(TreeNode::T_REPORT_BATCH_COMPLETE);
1669 
1670       ndbrequire(nodePtr.p->m_info != 0);
1671       if (nodePtr.p->m_info->m_abort != 0)
1672       {
1673         jam();
1674         (this->*(nodePtr.p->m_info->m_abort))(signal, requestPtr, nodePtr);
1675       }
1676     }
1677   }
1678 
1679 checkcomplete:
1680   checkBatchComplete(signal, requestPtr, 0);
1681 }
1682 
1683 Uint32
nodeFail(Signal * signal,Ptr<Request> requestPtr,NdbNodeBitmask nodes)1684 Dbspj::nodeFail(Signal* signal, Ptr<Request> requestPtr,
1685                 NdbNodeBitmask nodes)
1686 {
1687   Uint32 cnt = 0;
1688   Uint32 iter = 0;
1689 
1690   {
1691     Ptr<TreeNode> nodePtr;
1692     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1693     for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1694     {
1695       jam();
1696       ndbrequire(nodePtr.p->m_info != 0);
1697       if (nodePtr.p->m_info->m_execNODE_FAILREP != 0)
1698       {
1699         jam();
1700         iter ++;
1701         cnt += (this->*(nodePtr.p->m_info->m_execNODE_FAILREP))(signal,
1702                                                                 requestPtr,
1703                                                                 nodePtr, nodes);
1704       }
1705     }
1706   }
1707 
1708   if (cnt == 0)
1709   {
1710     jam();
1711     /**
1712      * None of the operations needed NodeFailRep "action"
1713      *   check if our TC has died...but...only needed in
1714      *   scan case...for lookup...not so...
1715      */
1716     if (requestPtr.p->isScan() &&
1717         nodes.get(refToNode(requestPtr.p->m_senderRef)))
1718     {
1719       jam();
1720       abort(signal, requestPtr, DbspjErr::NodeFailure);
1721     }
1722   }
1723   else
1724   {
1725     jam();
1726     abort(signal, requestPtr, DbspjErr::NodeFailure);
1727   }
1728 
1729   return cnt + iter;
1730 }
1731 
1732 void
complete(Signal * signal,Ptr<Request> requestPtr)1733 Dbspj::complete(Signal* signal, Ptr<Request> requestPtr)
1734 {
1735   /**
1736    * we need to run complete-phase before sending last SCAN_FRAGCONF
1737    */
1738   Uint32 flags = requestPtr.p->m_state &
1739     (Request::RS_ABORTING | Request::RS_WAITING);
1740 
1741   requestPtr.p->m_state = Request::RS_COMPLETING | flags;
1742 
1743   // clear bit so that next batchComplete()
1744   // will continue to cleanup
1745   ndbassert((requestPtr.p->m_bits & Request::RT_NEED_COMPLETE) != 0);
1746   requestPtr.p->m_bits &= ~(Uint32)Request::RT_NEED_COMPLETE;
1747   requestPtr.p->m_outstanding = 0;
1748   {
1749     Ptr<TreeNode> nodePtr;
1750     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1751     for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1752     {
1753       jam();
1754       ndbrequire(nodePtr.p->m_info != 0);
1755       if (nodePtr.p->m_info->m_complete != 0)
1756       {
1757         jam();
1758         (this->*(nodePtr.p->m_info->m_complete))(signal, requestPtr, nodePtr);
1759       }
1760     }
1761 
1762     /**
1763      * preferably RT_NEED_COMPLETE should only be set if blocking
1764      * calls are used, in which case m_outstanding should have been increased
1765      *
1766      * BUT: scanIndex does DIH_SCAN_TAB_COMPLETE_REP which does not send reply
1767      *      so it not really "blocking"
1768      *      i.e remove assert
1769      */
1770     //ndbassert(requestPtr.p->m_outstanding);
1771   }
1772   checkBatchComplete(signal, requestPtr, 0);
1773 }
1774 
1775 void
cleanup(Ptr<Request> requestPtr)1776 Dbspj::cleanup(Ptr<Request> requestPtr)
1777 {
1778   ndbrequire(requestPtr.p->m_cnt_active == 0);
1779   {
1780     Ptr<TreeNode> nodePtr;
1781     Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1782     for (list.first(nodePtr); !nodePtr.isNull(); )
1783     {
1784       jam();
1785       ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_cleanup != 0);
1786       (this->*(nodePtr.p->m_info->m_cleanup))(requestPtr, nodePtr);
1787 
1788       Ptr<TreeNode> tmp = nodePtr;
1789       list.next(nodePtr);
1790       m_treenode_pool.release(tmp);
1791     }
1792     list.remove();
1793   }
1794   if (requestPtr.p->isScan())
1795   {
1796     jam();
1797 
1798     if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1799     {
1800       jam();
1801       requestPtr.p->m_state = Request::RS_ABORTED;
1802       return;
1803     }
1804 
1805 #ifdef VM_TRACE
1806     {
1807       Request key;
1808       key.m_transId[0] = requestPtr.p->m_transId[0];
1809       key.m_transId[1] = requestPtr.p->m_transId[1];
1810       key.m_senderData = requestPtr.p->m_senderData;
1811       Ptr<Request> tmp;
1812       ndbrequire(m_scan_request_hash.find(tmp, key));
1813     }
1814 #endif
1815     m_scan_request_hash.remove(requestPtr);
1816   }
1817   else
1818   {
1819     jam();
1820 #ifdef VM_TRACE
1821     {
1822       Request key;
1823       key.m_transId[0] = requestPtr.p->m_transId[0];
1824       key.m_transId[1] = requestPtr.p->m_transId[1];
1825       key.m_senderData = requestPtr.p->m_senderData;
1826       Ptr<Request> tmp;
1827       ndbrequire(m_lookup_request_hash.find(tmp, key));
1828     }
1829 #endif
1830     m_lookup_request_hash.remove(requestPtr);
1831   }
1832   releaseRequestBuffers(requestPtr, false);
1833   ArenaHead ah = requestPtr.p->m_arena;
1834   m_request_pool.release(requestPtr);
1835   m_arenaAllocator.release(ah);
1836 }
1837 
1838 void
cleanup_common(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1839 Dbspj::cleanup_common(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1840 {
1841   jam();
1842 
1843   LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1844   {
1845     Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1846     list.release();
1847   }
1848 
1849   {
1850     Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
1851     pattern.release();
1852   }
1853 
1854   {
1855     Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
1856     pattern.release();
1857   }
1858 
1859   if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL)
1860   {
1861     jam();
1862     releaseSection(treeNodePtr.p->m_send.m_keyInfoPtrI);
1863   }
1864 
1865   if (treeNodePtr.p->m_send.m_attrInfoPtrI != RNIL)
1866   {
1867     jam();
1868     releaseSection(treeNodePtr.p->m_send.m_attrInfoPtrI);
1869   }
1870 }
1871 
1872 /**
1873  * Processing of signals from LQH
1874  */
1875 void
execLQHKEYREF(Signal * signal)1876 Dbspj::execLQHKEYREF(Signal* signal)
1877 {
1878   jamEntry();
1879 
1880   const LqhKeyRef* ref = reinterpret_cast<const LqhKeyRef*>(signal->getDataPtr());
1881 
1882   DEBUG("execLQHKEYREF, errorCode:" << ref->errorCode);
1883   Ptr<TreeNode> treeNodePtr;
1884   m_treenode_pool.getPtr(treeNodePtr, ref->connectPtr);
1885 
1886   Ptr<Request> requestPtr;
1887   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1888 
1889   ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYREF);
1890   (this->*(treeNodePtr.p->m_info->m_execLQHKEYREF))(signal,
1891                                                     requestPtr,
1892                                                     treeNodePtr);
1893 }
1894 
1895 void
execLQHKEYCONF(Signal * signal)1896 Dbspj::execLQHKEYCONF(Signal* signal)
1897 {
1898   jamEntry();
1899 
1900   DEBUG("execLQHKEYCONF");
1901 
1902   const LqhKeyConf* conf = reinterpret_cast<const LqhKeyConf*>(signal->getDataPtr());
1903   Ptr<TreeNode> treeNodePtr;
1904   m_treenode_pool.getPtr(treeNodePtr, conf->opPtr);
1905 
1906   Ptr<Request> requestPtr;
1907   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1908 
1909   ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYCONF);
1910   (this->*(treeNodePtr.p->m_info->m_execLQHKEYCONF))(signal,
1911                                                      requestPtr,
1912                                                      treeNodePtr);
1913 }
1914 
1915 void
execSCAN_FRAGREF(Signal * signal)1916 Dbspj::execSCAN_FRAGREF(Signal* signal)
1917 {
1918   jamEntry();
1919   const ScanFragRef* ref = reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
1920 
1921   DEBUG("execSCAN_FRAGREF, errorCode:" << ref->errorCode);
1922 
1923   Ptr<ScanFragHandle> scanFragHandlePtr;
1924   m_scanfraghandle_pool.getPtr(scanFragHandlePtr, ref->senderData);
1925   Ptr<TreeNode> treeNodePtr;
1926   m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1927   Ptr<Request> requestPtr;
1928   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1929 
1930   ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGREF);
1931   (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGREF))(signal,
1932                                                        requestPtr,
1933                                                        treeNodePtr,
1934                                                        scanFragHandlePtr);
1935 }
1936 
1937 void
execSCAN_HBREP(Signal * signal)1938 Dbspj::execSCAN_HBREP(Signal* signal)
1939 {
1940   jamEntry();
1941 
1942   Uint32 senderData = signal->theData[0];
1943   //Uint32 transId[2] = { signal->theData[1], signal->theData[2] };
1944 
1945   Ptr<ScanFragHandle> scanFragHandlePtr;
1946   m_scanfraghandle_pool.getPtr(scanFragHandlePtr, senderData);
1947   Ptr<TreeNode> treeNodePtr;
1948   m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1949   Ptr<Request> requestPtr;
1950   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1951 
1952   Uint32 ref = requestPtr.p->m_senderRef;
1953   signal->theData[0] = requestPtr.p->m_senderData;
1954   sendSignal(ref, GSN_SCAN_HBREP, signal, 3, JBB);
1955 }
1956 
1957 void
execSCAN_FRAGCONF(Signal * signal)1958 Dbspj::execSCAN_FRAGCONF(Signal* signal)
1959 {
1960   jamEntry();
1961   DEBUG("execSCAN_FRAGCONF");
1962 
1963   const ScanFragConf* conf = reinterpret_cast<const ScanFragConf*>(signal->getDataPtr());
1964 
1965 #ifdef DEBUG_SCAN_FRAGREQ
1966   ndbout_c("Dbspj::execSCAN_FRAGCONF() receiveing SCAN_FRAGCONF ");
1967   printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1968                      conf->total_len,
1969                      DBLQH);
1970 #endif
1971 
1972   Ptr<ScanFragHandle> scanFragHandlePtr;
1973   m_scanfraghandle_pool.getPtr(scanFragHandlePtr, conf->senderData);
1974   Ptr<TreeNode> treeNodePtr;
1975   m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1976   Ptr<Request> requestPtr;
1977   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1978 
1979   ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGCONF);
1980   (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGCONF))(signal,
1981                                                         requestPtr,
1982                                                         treeNodePtr,
1983                                                         scanFragHandlePtr);
1984 }
1985 
1986 void
execSCAN_NEXTREQ(Signal * signal)1987 Dbspj::execSCAN_NEXTREQ(Signal* signal)
1988 {
1989   jamEntry();
1990   const ScanFragNextReq * req = (ScanFragNextReq*)&signal->theData[0];
1991 
1992   DEBUG("Incomming SCAN_NEXTREQ");
1993 #ifdef DEBUG_SCAN_FRAGREQ
1994   printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
1995                        ScanFragNextReq::SignalLength, DBLQH);
1996 #endif
1997 
1998   Request key;
1999   key.m_transId[0] = req->transId1;
2000   key.m_transId[1] = req->transId2;
2001   key.m_senderData = req->senderData;
2002 
2003   Ptr<Request> requestPtr;
2004   if (unlikely(!m_scan_request_hash.find(requestPtr, key)))
2005   {
2006     jam();
2007     ndbrequire(req->requestInfo == ScanFragNextReq::ZCLOSE);
2008     return;
2009   }
2010 
2011 #ifdef SPJ_TRACE_TIME
2012   Uint64 now = spj_now();
2013   Uint64 then = requestPtr.p->m_save_time;
2014   requestPtr.p->m_sum_waiting += Uint32(now - then);
2015   requestPtr.p->m_save_time = now;
2016 #endif
2017 
2018   Uint32 state = requestPtr.p->m_state;
2019   requestPtr.p->m_state = state & ~Uint32(Request::RS_WAITING);
2020 
2021   if (unlikely(state == Request::RS_ABORTED))
2022   {
2023     jam();
2024     batchComplete(signal, requestPtr);
2025     return;
2026   }
2027 
2028   if (unlikely((state & Request::RS_ABORTING) != 0))
2029   {
2030     jam();
2031     /**
2032      * abort is already in progress...
2033      *   since RS_WAITING is cleared...it will end this request
2034      */
2035     return;
2036   }
2037 
2038   if (req->requestInfo == ScanFragNextReq::ZCLOSE)  // Requested close scan
2039   {
2040     jam();
2041     abort(signal, requestPtr, 0);
2042     return;
2043   }
2044 
2045   ndbrequire((state & Request::RS_WAITING) != 0);
2046   ndbrequire(requestPtr.p->m_outstanding == 0);
2047 
2048   {
2049     /**
2050      * Scroll all relevant cursors...
2051      */
2052     Ptr<TreeNode> treeNodePtr;
2053     Local_TreeNodeCursor_list list(m_treenode_pool,
2054                                    requestPtr.p->m_cursor_nodes);
2055     Uint32 cnt_active = 0;
2056 
2057     for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
2058     {
2059       if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
2060       {
2061         jam();
2062         DEBUG("SCAN_NEXTREQ on TreeNode: " << treeNodePtr.i
2063            << ",  m_node_no: " << treeNodePtr.p->m_node_no
2064            << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2065 
2066         ndbrequire(treeNodePtr.p->m_info != 0 &&
2067                    treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
2068         (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
2069                                                              requestPtr,
2070                                                              treeNodePtr);
2071         cnt_active++;
2072       }
2073       else
2074       {
2075         /**
2076          * Restart any other scans not being 'TN_ACTIVE'
2077          * (Only effective if 'RT_REPEAT_SCAN_RESULT')
2078          */
2079         jam();
2080         ndbrequire(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT);
2081         DEBUG("  Restart TreeNode: " << treeNodePtr.i
2082            << ",  m_node_no: " << treeNodePtr.p->m_node_no
2083            << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2084 
2085         ndbrequire(treeNodePtr.p->m_info != 0 &&
2086                    treeNodePtr.p->m_info->m_parent_batch_complete !=0 );
2087         (this->*(treeNodePtr.p->m_info->m_parent_batch_complete))(signal,
2088                                                                   requestPtr,
2089                                                                   treeNodePtr);
2090       }
2091     }
2092     /* Expected only a single ACTIVE TreeNode among the cursors */
2093     ndbrequire(cnt_active == 1 ||
2094                !(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT));
2095   }
2096 }
2097 
2098 void
execTRANSID_AI(Signal * signal)2099 Dbspj::execTRANSID_AI(Signal* signal)
2100 {
2101   jamEntry();
2102   DEBUG("execTRANSID_AI");
2103   TransIdAI * req = (TransIdAI *)signal->getDataPtr();
2104   Uint32 ptrI = req->connectPtr;
2105   //Uint32 transId[2] = { req->transId[0], req->transId[1] };
2106 
2107   Ptr<TreeNode> treeNodePtr;
2108   m_treenode_pool.getPtr(treeNodePtr, ptrI);
2109   Ptr<Request> requestPtr;
2110   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
2111 
2112   ndbrequire(signal->getNoOfSections() != 0); // TODO check if this can happen
2113 
2114   SegmentedSectionPtr dataPtr;
2115   {
2116     SectionHandle handle(this, signal);
2117     handle.getSection(dataPtr, 0);
2118     handle.clear();
2119   }
2120 
2121 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
2122   printf("execTRANSID_AI: ");
2123   print(dataPtr, stdout);
2124 #endif
2125 
2126   /**
2127    * build easy-access-array for row
2128    */
2129   Uint32 tmp[2+MAX_ATTRIBUTES_IN_TABLE];
2130   RowPtr::Header* header = CAST_PTR(RowPtr::Header, &tmp[0]);
2131 
2132   Uint32 cnt = buildRowHeader(header, dataPtr);
2133   ndbassert(header->m_len < NDB_ARRAY_SIZE(tmp));
2134 
2135   struct RowPtr row;
2136   row.m_type = RowPtr::RT_SECTION;
2137   row.m_src_node_ptrI = treeNodePtr.i;
2138   row.m_row_data.m_section.m_header = header;
2139   row.m_row_data.m_section.m_dataPtr.assign(dataPtr);
2140 
2141   getCorrelationData(row.m_row_data.m_section,
2142                      cnt - 1,
2143                      row.m_src_correlation);
2144 
2145   if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
2146   {
2147     jam();
2148     Uint32 err = storeRow(requestPtr, treeNodePtr, row);
2149     ndbrequire(err == 0);
2150   }
2151 
2152   ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execTRANSID_AI);
2153 
2154   (this->*(treeNodePtr.p->m_info->m_execTRANSID_AI))(signal,
2155                                                      requestPtr,
2156                                                      treeNodePtr,
2157                                                      row);
2158   release(dataPtr);
2159 }
2160 
2161 Uint32
storeRow(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,RowPtr & row)2162 Dbspj::storeRow(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, RowPtr &row)
2163 {
2164   ndbassert(row.m_type == RowPtr::RT_SECTION);
2165   SegmentedSectionPtr dataPtr = row.m_row_data.m_section.m_dataPtr;
2166   Uint32 * headptr = (Uint32*)row.m_row_data.m_section.m_header;
2167   Uint32 headlen = 1 + row.m_row_data.m_section.m_header->m_len;
2168 
2169   /**
2170    * If rows are not in map, then they are kept in linked list
2171    */
2172   Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
2173     0 : 2;
2174 
2175   Uint32 totlen = 0;
2176   totlen += dataPtr.sz;
2177   totlen += headlen;
2178   totlen += linklen;
2179 
2180   RowRef ref;
2181   Uint32 * dstptr = 0;
2182   if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2183   {
2184     jam();
2185     dstptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2186   }
2187   else
2188   {
2189     jam();
2190     dstptr = varAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2191   }
2192 
2193   if (unlikely(dstptr == 0))
2194   {
2195     jam();
2196     return DbspjErr::OutOfRowMemory;
2197   }
2198 
2199   row.m_type = RowPtr::RT_LINEAR;
2200   row.m_row_data.m_linear.m_row_ref = ref;
2201   row.m_row_data.m_linear.m_header = (RowPtr::Header*)(dstptr + linklen);
2202   row.m_row_data.m_linear.m_data = dstptr + linklen + headlen;
2203 
2204   memcpy(dstptr + linklen, headptr, 4 * headlen);
2205   copy(dstptr + linklen + headlen, dataPtr);
2206 
2207   if (linklen)
2208   {
2209     jam();
2210     NullRowRef.copyto_link(dstptr); // Null terminate list...
2211     add_to_list(treeNodePtr.p->m_row_list, ref);
2212   }
2213   else
2214   {
2215     jam();
2216     return add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
2217   }
2218 
2219   return 0;
2220 }
2221 
2222 void
setupRowPtr(Ptr<TreeNode> treeNodePtr,RowPtr & row,RowRef ref,const Uint32 * src)2223 Dbspj::setupRowPtr(Ptr<TreeNode> treeNodePtr,
2224                    RowPtr& row, RowRef ref, const Uint32 * src)
2225 {
2226   Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
2227     0 : 2;
2228   const RowPtr::Header * headptr = (RowPtr::Header*)(src + linklen);
2229   Uint32 headlen = 1 + headptr->m_len;
2230 
2231   row.m_type = RowPtr::RT_LINEAR;
2232   row.m_row_data.m_linear.m_row_ref = ref;
2233   row.m_row_data.m_linear.m_header = headptr;
2234   row.m_row_data.m_linear.m_data = (Uint32*)headptr + headlen;
2235 }
2236 
2237 void
add_to_list(SLFifoRowList & list,RowRef rowref)2238 Dbspj::add_to_list(SLFifoRowList & list, RowRef rowref)
2239 {
2240   if (list.isNull())
2241   {
2242     jam();
2243     list.m_first_row_page_id = rowref.m_page_id;
2244     list.m_first_row_page_pos = rowref.m_page_pos;
2245   }
2246   else
2247   {
2248     jam();
2249     /**
2250      * add last to list
2251      */
2252     RowRef last;
2253     last.m_allocator = rowref.m_allocator;
2254     last.m_page_id = list.m_last_row_page_id;
2255     last.m_page_pos = list.m_last_row_page_pos;
2256     Uint32 * rowptr;
2257     if (rowref.m_allocator == 0)
2258     {
2259       jam();
2260       rowptr = get_row_ptr_stack(last);
2261     }
2262     else
2263     {
2264       jam();
2265       rowptr = get_row_ptr_var(last);
2266     }
2267     rowref.copyto_link(rowptr);
2268   }
2269 
2270   list.m_last_row_page_id = rowref.m_page_id;
2271   list.m_last_row_page_pos = rowref.m_page_pos;
2272 }
2273 
2274 Uint32 *
get_row_ptr_stack(RowRef pos)2275 Dbspj::get_row_ptr_stack(RowRef pos)
2276 {
2277   ndbassert(pos.m_allocator == 0);
2278   Ptr<RowPage> ptr;
2279   m_page_pool.getPtr(ptr, pos.m_page_id);
2280   return ptr.p->m_data + pos.m_page_pos;
2281 }
2282 
2283 Uint32 *
get_row_ptr_var(RowRef pos)2284 Dbspj::get_row_ptr_var(RowRef pos)
2285 {
2286   ndbassert(pos.m_allocator == 1);
2287   Ptr<RowPage> ptr;
2288   m_page_pool.getPtr(ptr, pos.m_page_id);
2289   return ((Var_page*)ptr.p)->get_ptr(pos.m_page_pos);
2290 }
2291 
2292 bool
first(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,SLFifoRowListIterator & iter)2293 Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2294              SLFifoRowListIterator& iter)
2295 {
2296   Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2297   SLFifoRowList & list = treeNodePtr.p->m_row_list;
2298   if (list.isNull())
2299   {
2300     jam();
2301     iter.setNull();
2302     return false;
2303   }
2304 
2305   iter.m_ref.m_allocator = var;
2306   iter.m_ref.m_page_id = list.m_first_row_page_id;
2307   iter.m_ref.m_page_pos = list.m_first_row_page_pos;
2308   if (var == 0)
2309   {
2310     jam();
2311     iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2312   }
2313   else
2314   {
2315     jam();
2316     iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2317   }
2318 
2319   return true;
2320 }
2321 
2322 bool
next(SLFifoRowListIterator & iter)2323 Dbspj::next(SLFifoRowListIterator& iter)
2324 {
2325   iter.m_ref.assign_from_link(iter.m_row_ptr);
2326   if (iter.m_ref.isNull())
2327   {
2328     jam();
2329     return false;
2330   }
2331 
2332   if (iter.m_ref.m_allocator == 0)
2333   {
2334     jam();
2335     iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2336   }
2337   else
2338   {
2339     jam();
2340     iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2341   }
2342   return true;
2343 }
2344 
2345 bool
next(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,SLFifoRowListIterator & iter,SLFifoRowListIteratorPtr start)2346 Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2347             SLFifoRowListIterator& iter, SLFifoRowListIteratorPtr start)
2348 {
2349   Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2350   (void)var;
2351   ndbassert(var == iter.m_ref.m_allocator);
2352   if (iter.m_ref.m_allocator == 0)
2353   {
2354     jam();
2355     iter.m_row_ptr = get_row_ptr_stack(start.m_ref);
2356   }
2357   else
2358   {
2359     jam();
2360     iter.m_row_ptr = get_row_ptr_var(start.m_ref);
2361   }
2362   return next(iter);
2363 }
2364 
2365 Uint32
add_to_map(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Uint32 corrVal,RowRef rowref)2366 Dbspj::add_to_map(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2367                   Uint32 corrVal, RowRef rowref)
2368 {
2369   Uint32 * mapptr;
2370   RowMap& map = treeNodePtr.p->m_row_map;
2371   if (map.isNull())
2372   {
2373     jam();
2374     Uint16 batchsize = treeNodePtr.p->m_batch_size;
2375     Uint32 sz16 = RowMap::MAP_SIZE_PER_REF_16 * batchsize;
2376     Uint32 sz32 = (sz16 + 1) / 2;
2377     RowRef ref;
2378     if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2379     {
2380       jam();
2381       mapptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2382     }
2383     else
2384     {
2385       jam();
2386       mapptr = varAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2387     }
2388     if (unlikely(mapptr == 0))
2389     {
2390       jam();
2391       return DbspjErr::OutOfRowMemory;
2392     }
2393     map.assign(ref);
2394     map.m_elements = 0;
2395     map.m_size = batchsize;
2396     map.clear(mapptr);
2397   }
2398   else
2399   {
2400     jam();
2401     RowRef ref;
2402     map.copyto(ref);
2403     if (ref.m_allocator == 0)
2404     {
2405       jam();
2406       mapptr = get_row_ptr_stack(ref);
2407     }
2408     else
2409     {
2410       jam();
2411       mapptr = get_row_ptr_var(ref);
2412     }
2413   }
2414 
2415   Uint32 pos = corrVal & 0xFFFF;
2416   ndbrequire(pos < map.m_size);
2417   ndbrequire(map.m_elements < map.m_size);
2418 
2419   if (1)
2420   {
2421     /**
2422      * Check that *pos* is empty
2423      */
2424     RowRef check;
2425     map.load(mapptr, pos, check);
2426     ndbrequire(check.m_page_pos == 0xFFFF);
2427   }
2428 
2429   map.store(mapptr, pos, rowref);
2430 
2431   return 0;
2432 }
2433 
2434 bool
first(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,RowMapIterator & iter)2435 Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2436              RowMapIterator & iter)
2437 {
2438   Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2439   RowMap& map = treeNodePtr.p->m_row_map;
2440   if (map.isNull())
2441   {
2442     jam();
2443     iter.setNull();
2444     return false;
2445   }
2446 
2447   if (var == 0)
2448   {
2449     jam();
2450     iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2451   }
2452   else
2453   {
2454     jam();
2455     iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2456   }
2457   iter.m_size = map.m_size;
2458   iter.m_ref.m_allocator = var;
2459 
2460   Uint32 pos = 0;
2461   while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2462     pos++;
2463 
2464   if (pos == iter.m_size)
2465   {
2466     jam();
2467     iter.setNull();
2468     return false;
2469   }
2470   else
2471   {
2472     jam();
2473     RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2474     iter.m_element_no = pos;
2475     if (var == 0)
2476     {
2477       jam();
2478       iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2479     }
2480     else
2481     {
2482       jam();
2483       iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2484     }
2485     return true;
2486   }
2487 }
2488 
2489 bool
next(RowMapIterator & iter)2490 Dbspj::next(RowMapIterator & iter)
2491 {
2492   Uint32 pos = iter.m_element_no + 1;
2493   while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2494     pos++;
2495 
2496   if (pos == iter.m_size)
2497   {
2498     jam();
2499     iter.setNull();
2500     return false;
2501   }
2502   else
2503   {
2504     jam();
2505     RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2506     iter.m_element_no = pos;
2507     if (iter.m_ref.m_allocator == 0)
2508     {
2509       jam();
2510       iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2511     }
2512     else
2513     {
2514       jam();
2515       iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2516     }
2517     return true;
2518   }
2519 }
2520 
2521 bool
next(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,RowMapIterator & iter,RowMapIteratorPtr start)2522 Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2523             RowMapIterator & iter, RowMapIteratorPtr start)
2524 {
2525   Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2526   RowMap& map = treeNodePtr.p->m_row_map;
2527   ndbrequire(!map.isNull());
2528 
2529   if (var == 0)
2530   {
2531     jam();
2532     iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2533   }
2534   else
2535   {
2536     jam();
2537     iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2538   }
2539   iter.m_size = map.m_size;
2540 
2541   RowMap::load(iter.m_map_ptr, start.m_element_no, iter.m_ref);
2542   iter.m_element_no = start.m_element_no;
2543   return next(iter);
2544 }
2545 
2546 Uint32 *
stackAlloc(RowBuffer & buffer,RowRef & dst,Uint32 sz)2547 Dbspj::stackAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2548 {
2549   Ptr<RowPage> ptr;
2550   LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list);
2551 
2552   Uint32 pos = buffer.m_stack.m_pos;
2553   const Uint32 SIZE = RowPage::SIZE;
2554   if (list.isEmpty() || (pos + sz) > SIZE)
2555   {
2556     jam();
2557     bool ret = allocPage(ptr);
2558     if (unlikely(ret == false))
2559     {
2560       jam();
2561       return 0;
2562     }
2563 
2564     pos = 0;
2565     list.addLast(ptr);
2566   }
2567   else
2568   {
2569     list.last(ptr);
2570   }
2571 
2572   dst.m_page_id = ptr.i;
2573   dst.m_page_pos = pos;
2574   dst.m_allocator = 0;
2575   buffer.m_stack.m_pos = pos + sz;
2576   return ptr.p->m_data + pos;
2577 }
2578 
2579 Uint32 *
varAlloc(RowBuffer & buffer,RowRef & dst,Uint32 sz)2580 Dbspj::varAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2581 {
2582   Ptr<RowPage> ptr;
2583   LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list);
2584 
2585   Uint32 free_space = buffer.m_var.m_free;
2586   if (list.isEmpty() || free_space < (sz + 1))
2587   {
2588     jam();
2589     bool ret = allocPage(ptr);
2590     if (unlikely(ret == false))
2591     {
2592       jam();
2593       return 0;
2594     }
2595 
2596     list.addLast(ptr);
2597     ((Var_page*)ptr.p)->init();
2598   }
2599   else
2600   {
2601     jam();
2602     list.last(ptr);
2603   }
2604 
2605   Var_page * vp = (Var_page*)ptr.p;
2606   Uint32 pos = vp->alloc_record(sz, (Var_page*)m_buffer0, Var_page::CHAIN);
2607 
2608   dst.m_page_id = ptr.i;
2609   dst.m_page_pos = pos;
2610   dst.m_allocator = 1;
2611   buffer.m_var.m_free = vp->free_space;
2612   return vp->get_ptr(pos);
2613 }
2614 
2615 bool
allocPage(Ptr<RowPage> & ptr)2616 Dbspj::allocPage(Ptr<RowPage> & ptr)
2617 {
2618   if (m_free_page_list.firstItem == RNIL)
2619   {
2620     jam();
2621     ptr.p = (RowPage*)m_ctx.m_mm.alloc_page(RT_SPJ_DATABUFFER,
2622                                             &ptr.i,
2623                                             Ndbd_mem_manager::NDB_ZONE_ANY);
2624     if (ptr.p == 0)
2625     {
2626       return false;
2627     }
2628     return true;
2629   }
2630   else
2631   {
2632     jam();
2633     LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2634     bool ret = list.remove_front(ptr);
2635     ndbrequire(ret);
2636     return ret;
2637   }
2638 }
2639 
2640 void
releasePage(Ptr<RowPage> ptr)2641 Dbspj::releasePage(Ptr<RowPage> ptr)
2642 {
2643   LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2644   list.add(ptr);
2645 }
2646 
2647 void
releasePages(Uint32 first,Ptr<RowPage> last)2648 Dbspj::releasePages(Uint32 first, Ptr<RowPage> last)
2649 {
2650   LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2651   list.add(first, last);
2652 }
2653 
2654 void
releaseGlobal(Signal * signal)2655 Dbspj::releaseGlobal(Signal * signal)
2656 {
2657   Uint32 delay = 100;
2658   LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2659   if (list.empty())
2660   {
2661     jam();
2662     delay = 300;
2663   }
2664   else
2665   {
2666     Ptr<RowPage> ptr;
2667     list.remove_front(ptr);
2668     m_ctx.m_mm.release_page(RT_SPJ_DATABUFFER, ptr.i);
2669   }
2670 
2671   signal->theData[0] = 0;
2672   sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay, 1);
2673 }
2674 
2675 /**
2676  * END - MODULE GENERIC
2677  */
2678 
2679 /**
2680  * MODULE LOOKUP
2681  */
2682 const Dbspj::OpInfo
2683 Dbspj::g_LookupOpInfo =
2684 {
2685   &Dbspj::lookup_build,
2686   0, // prepare
2687   &Dbspj::lookup_start,
2688   &Dbspj::lookup_execTRANSID_AI,
2689   &Dbspj::lookup_execLQHKEYREF,
2690   &Dbspj::lookup_execLQHKEYCONF,
2691   0, // execSCAN_FRAGREF
2692   0, // execSCAN_FRAGCONF
2693   &Dbspj::lookup_parent_row,
2694   &Dbspj::lookup_parent_batch_complete,
2695   0, // Dbspj::lookup_parent_batch_repeat,
2696   0, // Dbspj::lookup_parent_batch_cleanup,
2697   0, // Dbspj::lookup_execSCAN_NEXTREQ
2698   0, // Dbspj::lookup_complete
2699   &Dbspj::lookup_abort,
2700   &Dbspj::lookup_execNODE_FAILREP,
2701   &Dbspj::lookup_cleanup
2702 };
2703 
2704 Uint32
lookup_build(Build_context & ctx,Ptr<Request> requestPtr,const QueryNode * qn,const QueryNodeParameters * qp)2705 Dbspj::lookup_build(Build_context& ctx,
2706                     Ptr<Request> requestPtr,
2707                     const QueryNode* qn,
2708                     const QueryNodeParameters* qp)
2709 {
2710   Uint32 err = 0;
2711   Ptr<TreeNode> treeNodePtr;
2712   const QN_LookupNode * node = (const QN_LookupNode*)qn;
2713   const QN_LookupParameters * param = (const QN_LookupParameters*)qp;
2714   do
2715   {
2716     err = createNode(ctx, requestPtr, treeNodePtr);
2717     if (unlikely(err != 0))
2718     {
2719       DEBUG_CRASH();
2720       break;
2721     }
2722 
2723     treeNodePtr.p->m_info = &g_LookupOpInfo;
2724     Uint32 transId1 = requestPtr.p->m_transId[0];
2725     Uint32 transId2 = requestPtr.p->m_transId[1];
2726     Uint32 savePointId = ctx.m_savepointId;
2727 
2728     Uint32 treeBits = node->requestInfo;
2729     Uint32 paramBits = param->requestInfo;
2730     //ndbout_c("Dbspj::lookup_build() treeBits=%.8x paramBits=%.8x",
2731     //         treeBits, paramBits);
2732     LqhKeyReq* dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
2733     {
2734       /**
2735        * static variables
2736        */
2737       dst->tcBlockref = reference();
2738       dst->clientConnectPtr = treeNodePtr.i;
2739 
2740       /**
2741        * TODO reference()+treeNodePtr.i is passed twice
2742        *   this can likely be optimized using the requestInfo-bits
2743        * UPDATE: This can be accomplished by *not* setApplicationAddressFlag
2744        *         and patch LQH to then instead use tcBlockref/clientConnectPtr
2745        */
2746       dst->transId1 = transId1;
2747       dst->transId2 = transId2;
2748       dst->savePointId = savePointId;
2749       dst->scanInfo = 0;
2750       dst->attrLen = 0;
2751       /** Initialy set reply ref to client, do_send will set SPJ refs if non-LEAF */
2752       dst->variableData[0] = ctx.m_resultRef;
2753       dst->variableData[1] = param->resultData;
2754       Uint32 requestInfo = 0;
2755       LqhKeyReq::setOperation(requestInfo, ZREAD);
2756       LqhKeyReq::setApplicationAddressFlag(requestInfo, 1);
2757       LqhKeyReq::setDirtyFlag(requestInfo, 1);
2758       LqhKeyReq::setSimpleFlag(requestInfo, 1);
2759       LqhKeyReq::setNormalProtocolFlag(requestInfo, 0);  // Assume T_LEAF
2760       LqhKeyReq::setCorrFactorFlag(requestInfo, 1);
2761       LqhKeyReq::setNoDiskFlag(requestInfo,
2762                                (treeBits & DABits::NI_LINKED_DISK) == 0 &&
2763                                (paramBits & DABits::PI_DISK_ATTR) == 0);
2764       dst->requestInfo = requestInfo;
2765     }
2766 
2767     err = DbspjErr::InvalidTreeNodeSpecification;
2768     if (unlikely(node->len < QN_LookupNode::NodeSize))
2769     {
2770       DEBUG_CRASH();
2771       break;
2772     }
2773 
2774     if (treeBits & QN_LookupNode::L_UNIQUE_INDEX)
2775     {
2776       jam();
2777       treeNodePtr.p->m_bits |= TreeNode::T_UNIQUE_INDEX_LOOKUP;
2778     }
2779 
2780     Uint32 tableId = node->tableId;
2781     Uint32 schemaVersion = node->tableVersion;
2782 
2783     Uint32 tableSchemaVersion = tableId + ((schemaVersion << 16) & 0xFFFF0000);
2784     dst->tableSchemaVersion = tableSchemaVersion;
2785 
2786     err = DbspjErr::InvalidTreeParametersSpecification;
2787     DEBUG("param len: " << param->len);
2788     if (unlikely(param->len < QN_LookupParameters::NodeSize))
2789     {
2790       DEBUG_CRASH();
2791       break;
2792     }
2793 
2794     ctx.m_resultData = param->resultData;
2795     treeNodePtr.p->m_lookup_data.m_api_resultRef = ctx.m_resultRef;
2796     treeNodePtr.p->m_lookup_data.m_api_resultData = param->resultData;
2797     treeNodePtr.p->m_lookup_data.m_outstanding = 0;
2798     treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
2799 
2800     /**
2801      * Parse stuff common lookup/scan-frag
2802      */
2803     struct DABuffer nodeDA, paramDA;
2804     nodeDA.ptr = node->optional;
2805     nodeDA.end = nodeDA.ptr + (node->len - QN_LookupNode::NodeSize);
2806     paramDA.ptr = param->optional;
2807     paramDA.end = paramDA.ptr + (param->len - QN_LookupParameters::NodeSize);
2808     err = parseDA(ctx, requestPtr, treeNodePtr,
2809                   nodeDA, treeBits, paramDA, paramBits);
2810     if (unlikely(err != 0))
2811     {
2812       DEBUG_CRASH();
2813       break;
2814     }
2815 
2816     if (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED)
2817     {
2818       jam();
2819       LqhKeyReq::setInterpretedFlag(dst->requestInfo, 1);
2820     }
2821 
2822     /**
2823      * Inherit batch size from parent
2824      */
2825     treeNodePtr.p->m_batch_size = 1;
2826     if (treeNodePtr.p->m_parentPtrI != RNIL)
2827     {
2828       jam();
2829       Ptr<TreeNode> parentPtr;
2830       m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
2831       treeNodePtr.p->m_batch_size = parentPtr.p->m_batch_size;
2832     }
2833 
2834     if (ctx.m_start_signal)
2835     {
2836       jam();
2837       Signal * signal = ctx.m_start_signal;
2838       const LqhKeyReq* src = (const LqhKeyReq*)signal->getDataPtr();
2839 #if NOT_YET
2840       Uint32 instanceNo =
2841         blockToInstance(signal->header.theReceiversBlockNumber);
2842       treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
2843                                                 instanceNo, getOwnNodeId());
2844 #else
2845       treeNodePtr.p->m_send.m_ref =
2846         numberToRef(DBLQH, getInstanceKey(src->tableSchemaVersion & 0xFFFF,
2847                                           src->fragmentData & 0xFFFF),
2848                     getOwnNodeId());
2849 #endif
2850 
2851       Uint32 hashValue = src->hashValue;
2852       Uint32 fragId = src->fragmentData;
2853       Uint32 requestInfo = src->requestInfo;
2854       Uint32 attrLen = src->attrLen; // fragdist-key is in here
2855 
2856       /**
2857        * assertions
2858        */
2859       ndbassert(LqhKeyReq::getAttrLen(attrLen) == 0);         // Only long
2860       ndbassert(LqhKeyReq::getScanTakeOverFlag(attrLen) == 0);// Not supported
2861       ndbassert(LqhKeyReq::getReorgFlag(attrLen) == 0);       // Not supported
2862       ndbassert(LqhKeyReq::getOperation(requestInfo) == ZREAD);
2863       ndbassert(LqhKeyReq::getKeyLen(requestInfo) == 0);      // Only long
2864       ndbassert(LqhKeyReq::getMarkerFlag(requestInfo) == 0);  // Only read
2865       ndbassert(LqhKeyReq::getAIInLqhKeyReq(requestInfo) == 0);
2866       ndbassert(LqhKeyReq::getSeqNoReplica(requestInfo) == 0);
2867       ndbassert(LqhKeyReq::getLastReplicaNo(requestInfo) == 0);
2868       ndbassert(LqhKeyReq::getApplicationAddressFlag(requestInfo) != 0);
2869       ndbassert(LqhKeyReq::getSameClientAndTcFlag(requestInfo) == 0);
2870 
2871 #if TODO
2872       /**
2873        * Handle various lock-modes
2874        */
2875       static Uint8 getDirtyFlag(const UintR & requestInfo);
2876       static Uint8 getSimpleFlag(const UintR & requestInfo);
2877 #endif
2878 
2879       Uint32 dst_requestInfo = dst->requestInfo;
2880       ndbassert(LqhKeyReq::getInterpretedFlag(requestInfo) ==
2881                 LqhKeyReq::getInterpretedFlag(dst_requestInfo));
2882       ndbassert(LqhKeyReq::getNoDiskFlag(requestInfo) ==
2883                 LqhKeyReq::getNoDiskFlag(dst_requestInfo));
2884 
2885       dst->hashValue = hashValue;
2886       dst->fragmentData = fragId;
2887       dst->attrLen = attrLen; // fragdist is in here
2888 
2889       treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
2890       treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT;
2891     }
2892     return 0;
2893   } while (0);
2894 
2895   return err;
2896 }
2897 
2898 void
lookup_start(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)2899 Dbspj::lookup_start(Signal* signal,
2900                     Ptr<Request> requestPtr,
2901                     Ptr<TreeNode> treeNodePtr)
2902 {
2903   lookup_send(signal, requestPtr, treeNodePtr);
2904 }
2905 
2906 void
lookup_send(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)2907 Dbspj::lookup_send(Signal* signal,
2908                    Ptr<Request> requestPtr,
2909                    Ptr<TreeNode> treeNodePtr)
2910 {
2911   jam();
2912 
2913   Uint32 cnt = 2;
2914   if (treeNodePtr.p->isLeaf())
2915   {
2916     jam();
2917     if (requestPtr.p->isLookup())
2918     {
2919       jam();
2920       cnt = 0;
2921     }
2922     else
2923     {
2924       jam();
2925       cnt = 1;
2926     }
2927   }
2928 
2929   LqhKeyReq* req = reinterpret_cast<LqhKeyReq*>(signal->getDataPtrSend());
2930 
2931   memcpy(req, treeNodePtr.p->m_lookup_data.m_lqhKeyReq,
2932          sizeof(treeNodePtr.p->m_lookup_data.m_lqhKeyReq));
2933   req->variableData[2] = treeNodePtr.p->m_send.m_correlation;
2934   req->variableData[3] = requestPtr.p->m_rootResultData;
2935 
2936   if (!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()))
2937   {
2938     // Non-LEAF want reply to SPJ instead of ApiClient.
2939     LqhKeyReq::setNormalProtocolFlag(req->requestInfo, 1);
2940     req->variableData[0] = reference();
2941     req->variableData[1] = treeNodePtr.i;
2942   }
2943   else
2944   {
2945     jam();
2946     /**
2947      * Fake that TC sent this request,
2948      *   so that it can route a maybe TCKEYREF
2949      */
2950     req->tcBlockref = requestPtr.p->m_senderRef;
2951   }
2952 
2953   SectionHandle handle(this);
2954 
2955   Uint32 ref = treeNodePtr.p->m_send.m_ref;
2956   Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
2957   Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
2958 
2959   if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT)
2960   {
2961     jam();
2962     /**
2963      * Pass sections to send
2964      */
2965     treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2966     treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2967   }
2968   else
2969   {
2970     if ((treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) == 0)
2971     {
2972       jam();
2973       Uint32 tmp = RNIL;
2974       ndbrequire(dupSection(tmp, keyInfoPtrI)); // TODO handle error
2975       keyInfoPtrI = tmp;
2976     }
2977     else
2978     {
2979       jam();
2980       treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2981     }
2982 
2983     if ((treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) == 0)
2984     {
2985       jam();
2986       Uint32 tmp = RNIL;
2987       ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
2988       attrInfoPtrI = tmp;
2989     }
2990     else
2991     {
2992       jam();
2993       treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2994     }
2995   }
2996 
2997   getSection(handle.m_ptr[0], keyInfoPtrI);
2998   getSection(handle.m_ptr[1], attrInfoPtrI);
2999   handle.m_cnt = 2;
3000 
3001 #if defined DEBUG_LQHKEYREQ
3002   ndbout_c("LQHKEYREQ to %x", ref);
3003   printLQHKEYREQ(stdout, signal->getDataPtrSend(),
3004                  NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
3005                  DBLQH);
3006   printf("KEYINFO: ");
3007   print(handle.m_ptr[0], stdout);
3008   printf("ATTRINFO: ");
3009   print(handle.m_ptr[1], stdout);
3010 #endif
3011 
3012   Uint32 Tnode = refToNode(ref);
3013   if (Tnode == getOwnNodeId())
3014   {
3015     c_Counters.incr_counter(CI_LOCAL_READS_SENT, 1);
3016   }
3017   else
3018   {
3019     c_Counters.incr_counter(CI_REMOTE_READS_SENT, 1);
3020   }
3021 
3022   if (unlikely(!c_alive_nodes.get(Tnode)))
3023   {
3024     jam();
3025     releaseSections(handle);
3026     abort(signal, requestPtr, DbspjErr::NodeFailure);
3027     return;
3028   }
3029   else if (! (treeNodePtr.p->isLeaf() && requestPtr.p->isLookup()))
3030   {
3031     jam();
3032     ndbassert(Tnode < NDB_ARRAY_SIZE(requestPtr.p->m_lookup_node_data));
3033     requestPtr.p->m_outstanding += cnt;
3034     requestPtr.p->m_lookup_node_data[Tnode] += cnt;
3035     // number wrapped
3036     ndbrequire(! (requestPtr.p->m_lookup_node_data[Tnode] == 0));
3037   }
3038 
3039   sendSignal(ref, GSN_LQHKEYREQ, signal,
3040              NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
3041              JBB, &handle);
3042 
3043   treeNodePtr.p->m_lookup_data.m_outstanding += cnt;
3044   if (requestPtr.p->isLookup() && treeNodePtr.p->isLeaf())
3045   {
3046     jam();
3047     /**
3048      * Send TCKEYCONF with DirtyReadBit + Tnode,
3049      *   so that API can discover if Tnode while waiting for result
3050      */
3051     Uint32 resultRef = req->variableData[0];
3052     Uint32 resultData = req->variableData[1];
3053 
3054     TcKeyConf* conf = (TcKeyConf*)signal->getDataPtrSend();
3055     conf->apiConnectPtr = RNIL; // lookup transaction from operations...
3056     conf->confInfo = 0;
3057     TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3058     conf->transId1 = requestPtr.p->m_transId[0];
3059     conf->transId2 = requestPtr.p->m_transId[1];
3060     conf->operations[0].apiOperationPtr = resultData;
3061     conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit | Tnode;
3062     Uint32 sigLen = TcKeyConf::StaticLength + TcKeyConf::OperationLength;
3063     sendTCKEYCONF(signal, sigLen, resultRef, requestPtr.p->m_senderRef);
3064   }
3065 }
3066 
3067 void
lookup_execTRANSID_AI(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)3068 Dbspj::lookup_execTRANSID_AI(Signal* signal,
3069                              Ptr<Request> requestPtr,
3070                              Ptr<TreeNode> treeNodePtr,
3071                              const RowPtr & rowRef)
3072 {
3073   jam();
3074 
3075   Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3076 
3077   {
3078     LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3079     Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3080     Dependency_map::ConstDataBufferIterator it;
3081     for (list.first(it); !it.isNull(); list.next(it))
3082     {
3083       jam();
3084       Ptr<TreeNode> childPtr;
3085       m_treenode_pool.getPtr(childPtr, * it.data);
3086       ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
3087       (this->*(childPtr.p->m_info->m_parent_row))(signal,
3088                                                   requestPtr, childPtr,rowRef);
3089     }
3090   }
3091   ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3092 
3093   ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3094   requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3095 
3096   treeNodePtr.p->m_lookup_data.m_outstanding--;
3097 
3098   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3099       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3100       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3101   {
3102     jam();
3103     // We have received all rows for this operation in this batch.
3104     reportBatchComplete(signal, requestPtr, treeNodePtr);
3105 
3106     // Prepare for next batch.
3107     treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3108     treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3109   }
3110 
3111   checkBatchComplete(signal, requestPtr, 1);
3112 }
3113 
3114 void
lookup_execLQHKEYREF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3115 Dbspj::lookup_execLQHKEYREF(Signal* signal,
3116                             Ptr<Request> requestPtr,
3117                             Ptr<TreeNode> treeNodePtr)
3118 {
3119   const LqhKeyRef * rep = (LqhKeyRef*)signal->getDataPtr();
3120   Uint32 errCode = rep->errorCode;
3121   Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3122 
3123   c_Counters.incr_counter(CI_READS_NOT_FOUND, 1);
3124 
3125   if (requestPtr.p->isLookup())
3126   {
3127     jam();
3128 
3129     /* CONF/REF not requested for lookup-Leaf: */
3130     ndbrequire(!treeNodePtr.p->isLeaf());
3131 
3132     /**
3133      * Scan-request does not need to
3134      *   send TCKEYREF...
3135      */
3136     /**
3137      * Return back to api...
3138      *   NOTE: assume that signal is tampered with
3139      */
3140     Uint32 resultRef = treeNodePtr.p->m_lookup_data.m_api_resultRef;
3141     Uint32 resultData = treeNodePtr.p->m_lookup_data.m_api_resultData;
3142     TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr();
3143     ref->connectPtr = resultData;
3144     ref->transId[0] = requestPtr.p->m_transId[0];
3145     ref->transId[1] = requestPtr.p->m_transId[1];
3146     ref->errorCode = errCode;
3147     ref->errorData = 0;
3148 
3149     DEBUG("lookup_execLQHKEYREF, errorCode:" << errCode);
3150 
3151     sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
3152 
3153     if (treeNodePtr.p->m_bits & TreeNode::T_UNIQUE_INDEX_LOOKUP)
3154     {
3155       /**
3156        * If this is a "leaf" unique index lookup
3157        *   emit extra TCKEYCONF as would have been done with ordinary
3158        *   operation
3159        */
3160       LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3161       Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3162       Dependency_map::ConstDataBufferIterator it;
3163       ndbrequire(list.first(it));
3164       ndbrequire(list.getSize() == 1); // should only be 1 child
3165       Ptr<TreeNode> childPtr;
3166       m_treenode_pool.getPtr(childPtr, * it.data);
3167       if (childPtr.p->m_bits & TreeNode::T_LEAF)
3168       {
3169         jam();
3170         Uint32 resultRef = childPtr.p->m_lookup_data.m_api_resultRef;
3171         Uint32 resultData = childPtr.p->m_lookup_data.m_api_resultData;
3172         TcKeyConf* conf = (TcKeyConf*)signal->getDataPtr();
3173         conf->apiConnectPtr = RNIL;
3174         conf->confInfo = 0;
3175         conf->gci_hi = 0;
3176         TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3177         conf->transId1 = requestPtr.p->m_transId[0];
3178         conf->transId2 = requestPtr.p->m_transId[1];
3179         conf->operations[0].apiOperationPtr = resultData;
3180         conf->operations[0].attrInfoLen =
3181           TcKeyConf::DirtyReadBit |getOwnNodeId();
3182         sendTCKEYCONF(signal, TcKeyConf::StaticLength + 2, resultRef, requestPtr.p->m_senderRef);
3183       }
3184     }
3185   }
3186   else
3187   {
3188     jam();
3189     switch(errCode){
3190     case 626: // Row not found
3191     case 899: // Interpreter_exit_nok
3192       jam();
3193       break;
3194     default:
3195       jam();
3196       abort(signal, requestPtr, errCode);
3197     }
3198   }
3199 
3200   Uint32 cnt = 2;
3201   if (treeNodePtr.p->isLeaf())  // Can't be a lookup-Leaf, asserted above
3202     cnt = 1;
3203 
3204   ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= cnt);
3205   requestPtr.p->m_lookup_node_data[Tnode] -= cnt;
3206 
3207   treeNodePtr.p->m_lookup_data.m_outstanding -= cnt;
3208 
3209   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3210       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3211       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3212   {
3213     jam();
3214     // We have received all rows for this operation in this batch.
3215     reportBatchComplete(signal, requestPtr, treeNodePtr);
3216 
3217     // Prepare for next batch.
3218     treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3219     treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3220   }
3221 
3222   checkBatchComplete(signal, requestPtr, cnt);
3223 }
3224 
3225 void
lookup_execLQHKEYCONF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3226 Dbspj::lookup_execLQHKEYCONF(Signal* signal,
3227                              Ptr<Request> requestPtr,
3228                              Ptr<TreeNode> treeNodePtr)
3229 {
3230   ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3231 
3232   Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3233 
3234   if (treeNodePtr.p->m_bits & TreeNode::T_USER_PROJECTION)
3235   {
3236     jam();
3237     requestPtr.p->m_rows++;
3238   }
3239 
3240   ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3241   requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3242 
3243   treeNodePtr.p->m_lookup_data.m_outstanding--;
3244 
3245   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3246       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3247       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3248   {
3249     jam();
3250     // We have received all rows for this operation in this batch.
3251     reportBatchComplete(signal, requestPtr, treeNodePtr);
3252 
3253     // Prepare for next batch.
3254     treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3255     treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3256   }
3257 
3258   checkBatchComplete(signal, requestPtr, 1);
3259 }
3260 
3261 void
lookup_parent_row(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)3262 Dbspj::lookup_parent_row(Signal* signal,
3263                          Ptr<Request> requestPtr,
3264                          Ptr<TreeNode> treeNodePtr,
3265                          const RowPtr & rowRef)
3266 {
3267   /**
3268    * Here we need to...
3269    *   1) construct a key
3270    *   2) compute hash     (normally TC)
3271    *   3) get node for row (normally TC)
3272    */
3273   Uint32 err;
3274   const LqhKeyReq* src = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
3275   const Uint32 tableId = LqhKeyReq::getTableId(src->tableSchemaVersion);
3276   const Uint32 corrVal = rowRef.m_src_correlation;
3277 
3278   DEBUG("::lookup_parent_row");
3279 
3280   do
3281   {
3282     Uint32 ptrI = RNIL;
3283     if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED)
3284     {
3285       jam();
3286       DEBUG("parent_row w/ T_KEYINFO_CONSTRUCTED");
3287       /**
3288        * Get key-pattern
3289        */
3290       LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3291       Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
3292 
3293       bool keyIsNull;
3294       err = expand(ptrI, pattern, rowRef, keyIsNull);
3295       if (unlikely(err != 0))
3296         break;
3297 
3298       if (keyIsNull)
3299       {
3300         jam();
3301         DEBUG("Key contain NULL values");
3302         /**
3303          * When the key contains NULL values, an EQ-match is impossible!
3304          * Entire lookup request can therefore be eliminate as it is known
3305          * to be REFused with errorCode = 626 (Row not found).
3306          * Different handling is required depening of request being a
3307          * scan or lookup:
3308          */
3309         if (requestPtr.p->isScan())
3310         {
3311           /**
3312            * Scan request: We can simply ignore lookup operation:
3313            * As rowCount in SCANCONF will not include this KEYREQ,
3314            * we dont have to send a KEYREF either.
3315            */
3316           jam();
3317           DEBUG("..Ignore impossible KEYREQ");
3318           if (ptrI != RNIL)
3319           {
3320             releaseSection(ptrI);
3321           }
3322           return;  // Bailout, KEYREQ would have returned KEYREF(626) anyway
3323         }
3324         else  // isLookup()
3325         {
3326           /**
3327            * Ignored lookup request need a faked KEYREF for the lookup operation.
3328            * Furthermore, if this is a leaf treeNode, a KEYCONF is also
3329            * expected by the API.
3330            *
3331            * TODO: Not implemented yet as we believe
3332            *       elimination of NULL key access for scan request
3333            *       will have the most performance impact.
3334            */
3335           jam();
3336         }
3337       } // keyIsNull
3338 
3339       /**
3340        * NOTE:
3341        *    The logic below contradicts 'keyIsNull' logic above and should
3342        *    be removed.
3343        *    However, it's likely that scanIndex should have similar
3344        *    logic as 'Null as wildcard' may make sense for a range bound.
3345        * NOTE2:
3346        *    Until 'keyIsNull' also cause bailout for request->isLookup()
3347        *    createEmptySection *is* require to avoid crash due to empty keys.
3348        */
3349       if (ptrI == RNIL)  // TODO: remove when keyIsNull is completely handled
3350       {
3351         jam();
3352         /**
3353          * We constructed a null-key...construct a zero-length key (even if we don't support it *now*)
3354          *
3355          *   (we actually did prior to joining mysql where null was treated as any other
3356          *   value in a key). But mysql treats null in unique key as *wildcard*
3357          *   which we don't support so well...and do nasty tricks in handler
3358          *
3359          * NOTE: should be *after* check for error
3360          */
3361         err = createEmptySection(ptrI);
3362         if (unlikely(err != 0))
3363           break;
3364       }
3365 
3366       treeNodePtr.p->m_send.m_keyInfoPtrI = ptrI;
3367     }
3368 
3369     BuildKeyReq tmp;
3370     err = computeHash(signal, tmp, tableId, treeNodePtr.p->m_send.m_keyInfoPtrI);
3371     if (unlikely(err != 0))
3372       break;
3373 
3374     err = getNodes(signal, tmp, tableId);
3375     if (unlikely(err != 0))
3376       break;
3377 
3378     Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3379     if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED)
3380     {
3381       jam();
3382       Uint32 tmp = RNIL;
3383       ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
3384 
3385       Uint32 org_size;
3386       {
3387         SegmentedSectionPtr ptr;
3388         getSection(ptr, tmp);
3389         org_size = ptr.sz;
3390       }
3391 
3392       bool hasNull;
3393       LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3394       Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
3395       err = expand(tmp, pattern, rowRef, hasNull);
3396       if (unlikely(err != 0))
3397         break;
3398 //    ndbrequire(!hasNull);
3399 
3400       /**
3401        * Update size of subsrouting section, which contains arguments
3402        */
3403       SegmentedSectionPtr ptr;
3404       getSection(ptr, tmp);
3405       Uint32 new_size = ptr.sz;
3406       Uint32 * sectionptrs = ptr.p->theData;
3407       sectionptrs[4] = new_size - org_size;
3408 
3409       treeNodePtr.p->m_send.m_attrInfoPtrI = tmp;
3410     }
3411 
3412     /**
3413      * Now send...
3414      */
3415 
3416     /**
3417      * TODO merge better with lookup_start (refactor)
3418      */
3419     {
3420       /* We set the upper half word of m_correlation to the tuple ID
3421        * of the parent, such that the API can match this tuple with its
3422        * parent.
3423        * Then we re-use the tuple ID of the parent as the
3424        * tuple ID for this tuple also. Since the tuple ID
3425        * is unique within this batch and SPJ block for the parent operation,
3426        * it must also be unique for this operation.
3427        * This ensures that lookup operations with no user projection will
3428        * work, since such operations will have the same tuple ID as their
3429        * parents. The API will then be able to match a tuple with its
3430        * grandparent, even if it gets no tuple for the parent operation.*/
3431       treeNodePtr.p->m_send.m_correlation =
3432         (corrVal << 16) + (corrVal & 0xffff);
3433 
3434       treeNodePtr.p->m_send.m_ref = tmp.receiverRef;
3435       LqhKeyReq * dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
3436       dst->hashValue = tmp.hashInfo[0];
3437       dst->fragmentData = tmp.fragId;
3438       Uint32 attrLen = 0;
3439       LqhKeyReq::setDistributionKey(attrLen, tmp.fragDistKey);
3440       dst->attrLen = attrLen;
3441       lookup_send(signal, requestPtr, treeNodePtr);
3442 
3443       if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED)
3444       {
3445         jam();
3446         // restore
3447         treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
3448       }
3449     }
3450     return;
3451   } while (0);
3452 
3453   ndbrequire(false);
3454 }
3455 
3456 void
lookup_parent_batch_complete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3457 Dbspj::lookup_parent_batch_complete(Signal* signal,
3458                                     Ptr<Request> requestPtr,
3459                                     Ptr<TreeNode> treeNodePtr)
3460 {
3461   jam();
3462 
3463   /**
3464    * lookups are performed directly...so we're not really interested in
3465    *   parent_batch_complete...we only pass-through
3466    */
3467 
3468   /**
3469    * but this method should only be called if we have T_REPORT_BATCH_COMPLETE
3470    */
3471   ndbassert(treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE);
3472 
3473   ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete);
3474   treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true;
3475   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3476       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3477   {
3478     jam();
3479     // We have received all rows for this operation in this batch.
3480     reportBatchComplete(signal, requestPtr, treeNodePtr);
3481 
3482     // Prepare for next batch.
3483     treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3484     treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3485   }
3486 }
3487 
3488 void
lookup_abort(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3489 Dbspj::lookup_abort(Signal* signal,
3490                     Ptr<Request> requestPtr,
3491                     Ptr<TreeNode> treeNodePtr)
3492 {
3493   jam();
3494 }
3495 
3496 Uint32
lookup_execNODE_FAILREP(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,NdbNodeBitmask mask)3497 Dbspj::lookup_execNODE_FAILREP(Signal* signal,
3498                                Ptr<Request> requestPtr,
3499                                Ptr<TreeNode> treeNodePtr,
3500                                NdbNodeBitmask mask)
3501 {
3502   jam();
3503   Uint32 node = 0;
3504   Uint32 sum = 0;
3505   while (requestPtr.p->m_outstanding &&
3506          ((node = mask.find(node + 1)) != NdbNodeBitmask::NotFound))
3507   {
3508     Uint32 cnt = requestPtr.p->m_lookup_node_data[node];
3509     sum += cnt;
3510     requestPtr.p->m_lookup_node_data[node] = 0;
3511   }
3512 
3513   if (sum)
3514   {
3515     jam();
3516     ndbrequire(requestPtr.p->m_outstanding >= sum);
3517     requestPtr.p->m_outstanding -= sum;
3518   }
3519 
3520   return sum;
3521 }
3522 
3523 void
lookup_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3524 Dbspj::lookup_cleanup(Ptr<Request> requestPtr,
3525                       Ptr<TreeNode> treeNodePtr)
3526 {
3527   cleanup_common(requestPtr, treeNodePtr);
3528 }
3529 
3530 
3531 Uint32
handle_special_hash(Uint32 tableId,Uint32 dstHash[4],const Uint64 * src,Uint32 srcLen,const KeyDescriptor * desc)3532 Dbspj::handle_special_hash(Uint32 tableId, Uint32 dstHash[4],
3533                            const Uint64* src,
3534                            Uint32 srcLen,       // Len in #32bit words
3535                            const KeyDescriptor* desc)
3536 {
3537   const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3538     (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3539   Uint64 alignedWorkspace[MAX_KEY_SIZE_IN_LONG_WORDS * MAX_XFRM_MULTIPLY];
3540   const bool hasVarKeys = desc->noOfVarKeys > 0;
3541   const bool hasCharAttr = desc->hasCharAttr;
3542   const bool compute_distkey = desc->noOfDistrKeys > 0;
3543 
3544   const Uint64 *hashInput = 0;
3545   Uint32 inputLen = 0;
3546   Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
3547   Uint32 * keyPartLenPtr;
3548 
3549   /* Normalise KeyInfo into workspace if necessary */
3550   if (hasCharAttr || (compute_distkey && hasVarKeys))
3551   {
3552     hashInput = alignedWorkspace;
3553     keyPartLenPtr = keyPartLen;
3554     inputLen = xfrm_key(tableId,
3555                         (Uint32*)src,
3556                         (Uint32*)alignedWorkspace,
3557                         sizeof(alignedWorkspace) >> 2,
3558                         keyPartLenPtr);
3559     if (unlikely(inputLen == 0))
3560     {
3561       return 290;  // 'Corrupt key in TC, unable to xfrm'
3562     }
3563   }
3564   else
3565   {
3566     /* Keyinfo already suitable for hash */
3567     hashInput = src;
3568     inputLen = srcLen;
3569     keyPartLenPtr = 0;
3570   }
3571 
3572   /* Calculate primary key hash */
3573   md5_hash(dstHash, hashInput, inputLen);
3574 
3575   /* If the distribution key != primary key then we have to
3576    * form a distribution key from the primary key and calculate
3577    * a separate distribution hash based on this
3578    */
3579   if (compute_distkey)
3580   {
3581     jam();
3582 
3583     Uint32 distrKeyHash[4];
3584     /* Reshuffle primary key columns to get just distribution key */
3585     Uint32 len = create_distr_key(tableId, (Uint32*)hashInput, (Uint32*)alignedWorkspace, keyPartLenPtr);
3586     /* Calculate distribution key hash */
3587     md5_hash(distrKeyHash, alignedWorkspace, len);
3588 
3589     /* Just one word used for distribution */
3590     dstHash[1] = distrKeyHash[1];
3591   }
3592   return 0;
3593 }
3594 
3595 Uint32
computeHash(Signal * signal,BuildKeyReq & dst,Uint32 tableId,Uint32 ptrI)3596 Dbspj::computeHash(Signal* signal,
3597                    BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3598 {
3599   /**
3600    * Essentially the same code as in Dbtc::hash().
3601    * The code for user defined partitioning has been removed though.
3602    */
3603   SegmentedSectionPtr ptr;
3604   getSection(ptr, ptrI);
3605 
3606   /* NOTE:  md5_hash below require 64-bit alignment
3607    */
3608   const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3609     (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3610   Uint64 tmp64[MAX_KEY_SIZE_IN_LONG_WORDS];
3611   Uint32 *tmp32 = (Uint32*)tmp64;
3612   copy(tmp32, ptr);
3613 
3614   const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3615   ndbrequire(desc != NULL);
3616 
3617   bool need_special_hash = desc->hasCharAttr | (desc->noOfDistrKeys > 0);
3618   if (need_special_hash)
3619   {
3620     jam();
3621     return handle_special_hash(tableId, dst.hashInfo, tmp64, ptr.sz, desc);
3622   }
3623   else
3624   {
3625     jam();
3626     md5_hash(dst.hashInfo, tmp64, ptr.sz);
3627     return 0;
3628   }
3629 }
3630 
3631 /**
3632  * This function differs from computeHash in that *ptrI*
3633  * only contains partition key (packed) and not full primary key
3634  */
3635 Uint32
computePartitionHash(Signal * signal,BuildKeyReq & dst,Uint32 tableId,Uint32 ptrI)3636 Dbspj::computePartitionHash(Signal* signal,
3637                             BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3638 {
3639   SegmentedSectionPtr ptr;
3640   getSection(ptr, ptrI);
3641 
3642   /* NOTE:  md5_hash below require 64-bit alignment
3643    */
3644   const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3645     (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3646   Uint64 _space[MAX_KEY_SIZE_IN_LONG_WORDS];
3647   Uint64 *tmp64 = _space;
3648   Uint32 *tmp32 = (Uint32*)tmp64;
3649   Uint32 sz = ptr.sz;
3650   copy(tmp32, ptr);
3651 
3652   const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3653   ndbrequire(desc != NULL);
3654 
3655   bool need_xfrm = desc->hasCharAttr || desc->noOfVarKeys;
3656   if (need_xfrm)
3657   {
3658     jam();
3659     /**
3660      * xfrm distribution key
3661      */
3662     Uint32 srcPos = 0;
3663     Uint32 dstPos = 0;
3664     Uint32 * src = tmp32;
3665     Uint32 * dst = signal->theData+24;
3666     for (Uint32 i = 0; i < desc->noOfKeyAttr; i++)
3667     {
3668       const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3669       if (AttributeDescriptor::getDKey(keyAttr.attributeDescriptor))
3670       {
3671         xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3672                   src, srcPos, dst, dstPos,
3673                   NDB_ARRAY_SIZE(signal->theData) - 24);
3674       }
3675     }
3676     tmp64 = (Uint64*)dst;
3677     sz = dstPos;
3678   }
3679 
3680   md5_hash(dst.hashInfo, tmp64, sz);
3681   return 0;
3682 }
3683 
3684 Uint32
getNodes(Signal * signal,BuildKeyReq & dst,Uint32 tableId)3685 Dbspj::getNodes(Signal* signal, BuildKeyReq& dst, Uint32 tableId)
3686 {
3687   Uint32 err;
3688   DiGetNodesReq * req = (DiGetNodesReq *)&signal->theData[0];
3689   req->tableId = tableId;
3690   req->hashValue = dst.hashInfo[1];
3691   req->distr_key_indicator = 0; // userDefinedPartitioning not supported!
3692   * (EmulatedJamBuffer**)req->jamBuffer = jamBuffer();
3693 
3694 #if 1
3695   EXECUTE_DIRECT(DBDIH, GSN_DIGETNODESREQ, signal,
3696                  DiGetNodesReq::SignalLength, 0);
3697 #else
3698   sendSignal(DBDIH_REF, GSN_DIGETNODESREQ, signal,
3699              DiGetNodesReq::SignalLength, JBB);
3700   jamEntry();
3701 
3702 #endif
3703 
3704   DiGetNodesConf * conf = (DiGetNodesConf *)&signal->theData[0];
3705   err = signal->theData[0];
3706   Uint32 Tdata2 = conf->reqinfo;
3707   Uint32 nodeId = conf->nodes[0];
3708   Uint32 instanceKey = (Tdata2 >> 24) & 127;
3709 
3710   DEBUG("HASH to nodeId:" << nodeId << ", instanceKey:" << instanceKey);
3711 
3712   jamEntry();
3713   if (unlikely(err != 0))
3714     goto error;
3715 
3716   dst.fragId = conf->fragId;
3717   dst.fragDistKey = (Tdata2 >> 16) & 255;
3718   dst.receiverRef = numberToRef(DBLQH, instanceKey, nodeId);
3719 
3720   return 0;
3721 
3722 error:
3723   /**
3724    * TODO handle error
3725    */
3726   ndbrequire(false);
3727   return err;
3728 }
3729 
3730 /**
3731  * END - MODULE LOOKUP
3732  */
3733 
3734 /**
3735  * MODULE SCAN FRAG
3736  *
3737  * NOTE: This may only be root node
3738  */
3739 const Dbspj::OpInfo
3740 Dbspj::g_ScanFragOpInfo =
3741 {
3742   &Dbspj::scanFrag_build,
3743   0, // prepare
3744   &Dbspj::scanFrag_start,
3745   &Dbspj::scanFrag_execTRANSID_AI,
3746   0, // execLQHKEYREF
3747   0, // execLQHKEYCONF
3748   &Dbspj::scanFrag_execSCAN_FRAGREF,
3749   &Dbspj::scanFrag_execSCAN_FRAGCONF,
3750   0, // parent row
3751   0, // parent batch complete
3752   0, // parent batch repeat
3753   0, // Dbspj::scanFrag_parent_batch_cleanup,
3754   &Dbspj::scanFrag_execSCAN_NEXTREQ,
3755   0, // Dbspj::scanFrag_complete
3756   &Dbspj::scanFrag_abort,
3757   0, // execNODE_FAILREP,
3758   &Dbspj::scanFrag_cleanup
3759 };
3760 
3761 Uint32
scanFrag_build(Build_context & ctx,Ptr<Request> requestPtr,const QueryNode * qn,const QueryNodeParameters * qp)3762 Dbspj::scanFrag_build(Build_context& ctx,
3763                       Ptr<Request> requestPtr,
3764                       const QueryNode* qn,
3765                       const QueryNodeParameters* qp)
3766 {
3767   Uint32 err = 0;
3768   Ptr<TreeNode> treeNodePtr;
3769   const QN_ScanFragNode * node = (const QN_ScanFragNode*)qn;
3770   const QN_ScanFragParameters * param = (const QN_ScanFragParameters*)qp;
3771 
3772   do
3773   {
3774     err = createNode(ctx, requestPtr, treeNodePtr);
3775     if (unlikely(err != 0))
3776       break;
3777 
3778     treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = RNIL;
3779     Ptr<ScanFragHandle> scanFragHandlePtr;
3780     if (unlikely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena,
3781                                              scanFragHandlePtr) != true))
3782     {
3783       err = DbspjErr::OutOfQueryMemory;
3784       break;
3785     }
3786 
3787     scanFragHandlePtr.p->m_treeNodePtrI = treeNodePtr.i;
3788     scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
3789     treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = scanFragHandlePtr.i;
3790 
3791     requestPtr.p->m_bits |= Request::RT_SCAN;
3792     treeNodePtr.p->m_info = &g_ScanFragOpInfo;
3793     treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
3794     treeNodePtr.p->m_batch_size = ctx.m_batch_size_rows;
3795 
3796     ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
3797     dst->senderData = scanFragHandlePtr.i;
3798     dst->resultRef = reference();
3799     dst->resultData = treeNodePtr.i;
3800     dst->savePointId = ctx.m_savepointId;
3801 
3802     Uint32 transId1 = requestPtr.p->m_transId[0];
3803     Uint32 transId2 = requestPtr.p->m_transId[1];
3804     dst->transId1 = transId1;
3805     dst->transId2 = transId2;
3806 
3807     Uint32 treeBits = node->requestInfo;
3808     Uint32 paramBits = param->requestInfo;
3809     //ndbout_c("Dbspj::scanFrag_build() treeBits=%.8x paramBits=%.8x",
3810     //         treeBits, paramBits);
3811     Uint32 requestInfo = 0;
3812     ScanFragReq::setReadCommittedFlag(requestInfo, 1);
3813     ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
3814     ScanFragReq::setCorrFactorFlag(requestInfo, 1);
3815     ScanFragReq::setNoDiskFlag(requestInfo,
3816                                (treeBits & DABits::NI_LINKED_DISK) == 0 &&
3817                                (paramBits & DABits::PI_DISK_ATTR) == 0);
3818     dst->requestInfo = requestInfo;
3819 
3820     err = DbspjErr::InvalidTreeNodeSpecification;
3821     DEBUG("scanFrag_build: len=" << node->len);
3822     if (unlikely(node->len < QN_ScanFragNode::NodeSize))
3823       break;
3824 
3825     dst->tableId = node->tableId;
3826     dst->schemaVersion = node->tableVersion;
3827 
3828     err = DbspjErr::InvalidTreeParametersSpecification;
3829     DEBUG("param len: " << param->len);
3830     if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
3831     {
3832       jam();
3833       DEBUG_CRASH();
3834       break;
3835     }
3836 
3837     ctx.m_resultData = param->resultData;
3838 
3839     /**
3840      * Parse stuff common lookup/scan-frag
3841      */
3842     struct DABuffer nodeDA, paramDA;
3843     nodeDA.ptr = node->optional;
3844     nodeDA.end = nodeDA.ptr + (node->len - QN_ScanFragNode::NodeSize);
3845     paramDA.ptr = param->optional;
3846     paramDA.end = paramDA.ptr + (param->len - QN_ScanFragParameters::NodeSize);
3847     err = parseDA(ctx, requestPtr, treeNodePtr,
3848                   nodeDA, treeBits, paramDA, paramBits);
3849     if (unlikely(err != 0))
3850     {
3851       jam();
3852       DEBUG_CRASH();
3853       break;
3854     }
3855 
3856     ctx.m_scan_cnt++;
3857     ctx.m_scans.set(treeNodePtr.p->m_node_no);
3858 
3859     if (ctx.m_start_signal)
3860     {
3861       jam();
3862       Signal* signal = ctx.m_start_signal;
3863       const ScanFragReq* src = (const ScanFragReq*)(signal->getDataPtr());
3864 
3865 #if NOT_YET
3866       Uint32 instanceNo =
3867         blockToInstance(signal->header.theReceiversBlockNumber);
3868       treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
3869                                                 instanceNo, getOwnNodeId());
3870 #else
3871       treeNodePtr.p->m_send.m_ref =
3872         numberToRef(DBLQH, getInstanceKey(src->tableId,
3873                                           src->fragmentNoKeyLen),
3874                     getOwnNodeId());
3875 #endif
3876 
3877       Uint32 fragId = src->fragmentNoKeyLen;
3878       Uint32 requestInfo = src->requestInfo;
3879       Uint32 batch_size_bytes = src->batch_size_bytes;
3880       Uint32 batch_size_rows = src->batch_size_rows;
3881 
3882 #ifdef VM_TRACE
3883       Uint32 savePointId = src->savePointId;
3884       Uint32 tableId = src->tableId;
3885       Uint32 schemaVersion = src->schemaVersion;
3886       Uint32 transId1 = src->transId1;
3887       Uint32 transId2 = src->transId2;
3888 #endif
3889       ndbassert(ScanFragReq::getLockMode(requestInfo) == 0);
3890       ndbassert(ScanFragReq::getHoldLockFlag(requestInfo) == 0);
3891       ndbassert(ScanFragReq::getKeyinfoFlag(requestInfo) == 0);
3892       ndbassert(ScanFragReq::getReadCommittedFlag(requestInfo) == 1);
3893       ndbassert(ScanFragReq::getLcpScanFlag(requestInfo) == 0);
3894       //ScanFragReq::getAttrLen(requestInfo); // ignore
3895       ndbassert(ScanFragReq::getReorgFlag(requestInfo) == 0);
3896 
3897       Uint32 tupScanFlag = ScanFragReq::getTupScanFlag(requestInfo);
3898       Uint32 rangeScanFlag = ScanFragReq::getRangeScanFlag(requestInfo);
3899       Uint32 descendingFlag = ScanFragReq::getDescendingFlag(requestInfo);
3900       Uint32 scanPrio = ScanFragReq::getScanPrio(requestInfo);
3901 
3902       Uint32 dst_requestInfo = dst->requestInfo;
3903 
3904       ScanFragReq::setTupScanFlag(dst_requestInfo,tupScanFlag);
3905       ScanFragReq::setRangeScanFlag(dst_requestInfo,rangeScanFlag);
3906       ScanFragReq::setDescendingFlag(dst_requestInfo,descendingFlag);
3907       ScanFragReq::setScanPrio(dst_requestInfo,scanPrio);
3908 
3909       /**
3910        * 'NoDiskFlag' should agree with information in treeNode
3911        */
3912       ndbassert(ScanFragReq::getNoDiskFlag(requestInfo) ==
3913                 ScanFragReq::getNoDiskFlag(dst_requestInfo));
3914 
3915       dst->fragmentNoKeyLen = fragId;
3916       dst->requestInfo = dst_requestInfo;
3917       dst->batch_size_bytes = batch_size_bytes;
3918       dst->batch_size_rows = batch_size_rows;
3919 
3920 #ifdef VM_TRACE
3921       ndbassert(dst->savePointId == savePointId);
3922       ndbassert(dst->tableId == tableId);
3923       ndbassert(dst->schemaVersion == schemaVersion);
3924       ndbassert(dst->transId1 == transId1);
3925       ndbassert(dst->transId2 == transId2);
3926 #endif
3927 
3928       treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
3929       treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT;
3930 
3931       if (rangeScanFlag)
3932       {
3933         c_Counters.incr_counter(CI_RANGE_SCANS_RECEIVED, 1);
3934       }
3935       else
3936       {
3937         c_Counters.incr_counter(CI_TABLE_SCANS_RECEIVED, 1);
3938       }
3939     }
3940     else
3941     {
3942       ndbrequire(false);
3943     }
3944 
3945     return 0;
3946   } while (0);
3947 
3948   return err;
3949 }
3950 
3951 void
scanFrag_start(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3952 Dbspj::scanFrag_start(Signal* signal,
3953                       Ptr<Request> requestPtr,
3954                       Ptr<TreeNode> treeNodePtr)
3955 {
3956   scanFrag_send(signal, requestPtr, treeNodePtr);
3957 }
3958 
3959 void
scanFrag_send(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3960 Dbspj::scanFrag_send(Signal* signal,
3961                      Ptr<Request> requestPtr,
3962                      Ptr<TreeNode> treeNodePtr)
3963 {
3964   jam();
3965 
3966   requestPtr.p->m_outstanding++;
3967   requestPtr.p->m_cnt_active++;
3968   treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
3969   Ptr<ScanFragHandle> scanFragHandlePtr;
3970   m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
3971                                m_scanfrag_data.m_scanFragHandlePtrI);
3972 
3973   ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
3974 
3975   memcpy(req, treeNodePtr.p->m_scanfrag_data.m_scanFragReq,
3976          sizeof(treeNodePtr.p->m_scanfrag_data.m_scanFragReq));
3977   req->variableData[0] = treeNodePtr.p->m_send.m_correlation;
3978   req->variableData[1] = requestPtr.p->m_rootResultData;
3979 
3980   SectionHandle handle(this);
3981 
3982   Uint32 ref = treeNodePtr.p->m_send.m_ref;
3983   Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
3984   Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3985 
3986   /**
3987    * ScanFrag may only be used as root-node, i.e T_ONE_SHOT
3988    */
3989   ndbrequire(treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT);
3990 
3991   /**
3992    * Pass sections to send
3993    */
3994   treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
3995   treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
3996 
3997   getSection(handle.m_ptr[0], attrInfoPtrI);
3998   handle.m_cnt = 1;
3999 
4000   if (keyInfoPtrI != RNIL)
4001   {
4002     jam();
4003     getSection(handle.m_ptr[1], keyInfoPtrI);
4004     handle.m_cnt = 2;
4005   }
4006 
4007 #ifdef DEBUG_SCAN_FRAGREQ
4008   ndbout_c("SCAN_FRAGREQ to %x", ref);
4009   printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
4010                     NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4011                     DBLQH);
4012   printf("ATTRINFO: ");
4013   print(handle.m_ptr[0], stdout);
4014   if (handle.m_cnt > 1)
4015   {
4016     printf("KEYINFO: ");
4017     print(handle.m_ptr[1], stdout);
4018   }
4019 #endif
4020 
4021   if (ScanFragReq::getRangeScanFlag(req->requestInfo))
4022   {
4023     c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
4024   }
4025   else
4026   {
4027     c_Counters.incr_counter(CI_LOCAL_TABLE_SCANS_SENT, 1);
4028   }
4029 
4030   ndbrequire(refToNode(ref) == getOwnNodeId());
4031   sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
4032              NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4033              JBB, &handle);
4034 
4035   scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4036   treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4037   treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4038 }
4039 
4040 void
scanFrag_execTRANSID_AI(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)4041 Dbspj::scanFrag_execTRANSID_AI(Signal* signal,
4042                                Ptr<Request> requestPtr,
4043                                Ptr<TreeNode> treeNodePtr,
4044                                const RowPtr & rowRef)
4045 {
4046   jam();
4047   treeNodePtr.p->m_scanfrag_data.m_rows_received++;
4048 
4049   LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4050   Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
4051   Dependency_map::ConstDataBufferIterator it;
4052 
4053   {
4054     for (list.first(it); !it.isNull(); list.next(it))
4055     {
4056       jam();
4057       Ptr<TreeNode> childPtr;
4058       m_treenode_pool.getPtr(childPtr, * it.data);
4059       ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
4060       (this->*(childPtr.p->m_info->m_parent_row))(signal,
4061                                                   requestPtr, childPtr,rowRef);
4062     }
4063   }
4064 
4065   if (treeNodePtr.p->m_scanfrag_data.m_rows_received ==
4066       treeNodePtr.p->m_scanfrag_data.m_rows_expecting)
4067   {
4068     jam();
4069 
4070     if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
4071     {
4072       jam();
4073       reportBatchComplete(signal, requestPtr, treeNodePtr);
4074     }
4075 
4076     checkBatchComplete(signal, requestPtr, 1);
4077     return;
4078   }
4079 }
4080 
4081 void
scanFrag_execSCAN_FRAGREF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> scanFragHandlePtr)4082 Dbspj::scanFrag_execSCAN_FRAGREF(Signal* signal,
4083                                  Ptr<Request> requestPtr,
4084                                  Ptr<TreeNode> treeNodePtr,
4085                                  Ptr<ScanFragHandle> scanFragHandlePtr)
4086 {
4087   const ScanFragRef* rep =
4088     reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
4089   Uint32 errCode = rep->errorCode;
4090 
4091   DEBUG("scanFrag_execSCAN_FRAGREF, rep->senderData:" << rep->senderData
4092         << ", requestPtr.p->m_senderData:" << requestPtr.p->m_senderData);
4093   scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4094   ndbrequire(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
4095   ndbrequire(requestPtr.p->m_cnt_active);
4096   requestPtr.p->m_cnt_active--;
4097   ndbrequire(requestPtr.p->m_outstanding);
4098   requestPtr.p->m_outstanding--;
4099   treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4100 
4101   abort(signal, requestPtr, errCode);
4102 }
4103 
4104 
4105 void
scanFrag_execSCAN_FRAGCONF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> scanFragHandlePtr)4106 Dbspj::scanFrag_execSCAN_FRAGCONF(Signal* signal,
4107                                   Ptr<Request> requestPtr,
4108                                   Ptr<TreeNode> treeNodePtr,
4109                                   Ptr<ScanFragHandle> scanFragHandlePtr)
4110 {
4111   const ScanFragConf * conf =
4112     reinterpret_cast<const ScanFragConf*>(signal->getDataPtr());
4113   Uint32 rows = conf->completedOps;
4114   Uint32 done = conf->fragmentCompleted;
4115 
4116   Uint32 state = scanFragHandlePtr.p->m_state;
4117   if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
4118   {
4119     jam();
4120     /**
4121      * We sent an explicit close request...ignore this...a close will come later
4122      */
4123     return;
4124   }
4125 
4126   ndbrequire(done <= 2); // 0, 1, 2 (=ZSCAN_FRAG_CLOSED)
4127 
4128   ndbassert(treeNodePtr.p->m_scanfrag_data.m_rows_expecting == ~Uint32(0));
4129   treeNodePtr.p->m_scanfrag_data.m_rows_expecting = rows;
4130   if (treeNodePtr.p->isLeaf())
4131   {
4132     /**
4133      * If this is a leaf node, then no rows will be sent to the SPJ block,
4134      * as there are no child operations to instantiate.
4135      */
4136     treeNodePtr.p->m_scanfrag_data.m_rows_received = rows;
4137   }
4138 
4139   requestPtr.p->m_rows += rows;
4140   if (done)
4141   {
4142     jam();
4143 
4144     ndbrequire(requestPtr.p->m_cnt_active);
4145     requestPtr.p->m_cnt_active--;
4146     treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4147     scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4148   }
4149   else
4150   {
4151     jam();
4152     scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
4153   }
4154 
4155   if (treeNodePtr.p->m_scanfrag_data.m_rows_expecting ==
4156       treeNodePtr.p->m_scanfrag_data.m_rows_received ||
4157       (state == ScanFragHandle::SFH_WAIT_CLOSE))
4158   {
4159     jam();
4160 
4161     if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
4162     {
4163       jam();
4164       reportBatchComplete(signal, requestPtr, treeNodePtr);
4165     }
4166 
4167     checkBatchComplete(signal, requestPtr, 1);
4168     return;
4169   }
4170 }
4171 
4172 void
scanFrag_execSCAN_NEXTREQ(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4173 Dbspj::scanFrag_execSCAN_NEXTREQ(Signal* signal,
4174                                  Ptr<Request> requestPtr,
4175                                  Ptr<TreeNode> treeNodePtr)
4176 {
4177   jamEntry();
4178 
4179   Ptr<ScanFragHandle> scanFragHandlePtr;
4180   m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
4181                                m_scanfrag_data.m_scanFragHandlePtrI);
4182 
4183   const ScanFragReq * org =
4184     (ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
4185 
4186   ScanFragNextReq* req =
4187     reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
4188   req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4189   req->requestInfo = 0;
4190   req->transId1 = requestPtr.p->m_transId[0];
4191   req->transId2 = requestPtr.p->m_transId[1];
4192   req->batch_size_rows = org->batch_size_rows;
4193   req->batch_size_bytes = org->batch_size_bytes;
4194 
4195   DEBUG("scanFrag_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref
4196         << ", senderData: " << req->senderData);
4197 #ifdef DEBUG_SCAN_FRAGREQ
4198   printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
4199                        ScanFragNextReq::SignalLength, DBLQH);
4200 #endif
4201 
4202   sendSignal(treeNodePtr.p->m_send.m_ref,
4203              GSN_SCAN_NEXTREQ,
4204              signal,
4205              ScanFragNextReq::SignalLength,
4206              JBB);
4207 
4208   treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4209   treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4210   requestPtr.p->m_outstanding++;
4211   scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4212 }//Dbspj::scanFrag_execSCAN_NEXTREQ()
4213 
4214 void
scanFrag_abort(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4215 Dbspj::scanFrag_abort(Signal* signal,
4216                       Ptr<Request> requestPtr,
4217                       Ptr<TreeNode> treeNodePtr)
4218 {
4219   jam();
4220 
4221   Ptr<ScanFragHandle> scanFragHandlePtr;
4222   m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
4223                                m_scanfrag_data.m_scanFragHandlePtrI);
4224   if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
4225   {
4226     jam();
4227 
4228     switch(scanFragHandlePtr.p->m_state){
4229     case ScanFragHandle::SFH_NOT_STARTED:
4230     case ScanFragHandle::SFH_COMPLETE:
4231       ndbrequire(false); // we shouldnt be TN_ACTIVE then...
4232 
4233     case ScanFragHandle::SFH_WAIT_CLOSE:
4234       jam();
4235       // close already sent
4236       return;
4237     case ScanFragHandle::SFH_WAIT_NEXTREQ:
4238       jam();
4239       // we were idle
4240       requestPtr.p->m_outstanding++;
4241       break;
4242     case ScanFragHandle::SFH_SCANNING:
4243       jam();
4244       break;
4245     }
4246 
4247     treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4248     scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
4249 
4250     ScanFragNextReq* req =
4251       reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
4252     req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4253     req->requestInfo = ScanFragNextReq::ZCLOSE;
4254     req->transId1 = requestPtr.p->m_transId[0];
4255     req->transId2 = requestPtr.p->m_transId[1];
4256     req->batch_size_rows = 0;
4257     req->batch_size_bytes = 0;
4258 
4259     sendSignal(treeNodePtr.p->m_send.m_ref,
4260                GSN_SCAN_NEXTREQ,
4261                signal,
4262                ScanFragNextReq::SignalLength,
4263                JBB);
4264   }
4265 }
4266 
4267 
4268 void
scanFrag_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4269 Dbspj::scanFrag_cleanup(Ptr<Request> requestPtr,
4270                         Ptr<TreeNode> treeNodePtr)
4271 {
4272   Uint32 ptrI = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4273   if (ptrI != RNIL)
4274   {
4275     m_scanfraghandle_pool.release(ptrI);
4276   }
4277   cleanup_common(requestPtr, treeNodePtr);
4278 }
4279 
4280 /**
4281  * END - MODULE SCAN FRAG
4282  */
4283 
4284 /**
4285  * MODULE SCAN INDEX
4286  *
4287  * NOTE: This may not be root-node
4288  */
4289 const Dbspj::OpInfo
4290 Dbspj::g_ScanIndexOpInfo =
4291 {
4292   &Dbspj::scanIndex_build,
4293   &Dbspj::scanIndex_prepare,
4294   0, // start
4295   &Dbspj::scanIndex_execTRANSID_AI,
4296   0, // execLQHKEYREF
4297   0, // execLQHKEYCONF
4298   &Dbspj::scanIndex_execSCAN_FRAGREF,
4299   &Dbspj::scanIndex_execSCAN_FRAGCONF,
4300   &Dbspj::scanIndex_parent_row,
4301   &Dbspj::scanIndex_parent_batch_complete,
4302   &Dbspj::scanIndex_parent_batch_repeat,
4303   &Dbspj::scanIndex_parent_batch_cleanup,
4304   &Dbspj::scanIndex_execSCAN_NEXTREQ,
4305   &Dbspj::scanIndex_complete,
4306   &Dbspj::scanIndex_abort,
4307   &Dbspj::scanIndex_execNODE_FAILREP,
4308   &Dbspj::scanIndex_cleanup
4309 };
4310 
4311 Uint32
scanIndex_build(Build_context & ctx,Ptr<Request> requestPtr,const QueryNode * qn,const QueryNodeParameters * qp)4312 Dbspj::scanIndex_build(Build_context& ctx,
4313                        Ptr<Request> requestPtr,
4314                        const QueryNode* qn,
4315                        const QueryNodeParameters* qp)
4316 {
4317   Uint32 err = 0;
4318   Ptr<TreeNode> treeNodePtr;
4319   const QN_ScanIndexNode * node = (const QN_ScanIndexNode*)qn;
4320   const QN_ScanIndexParameters * param = (const QN_ScanIndexParameters*)qp;
4321 
4322   do
4323   {
4324     err = createNode(ctx, requestPtr, treeNodePtr);
4325     if (unlikely(err != 0))
4326       break;
4327 
4328     Uint32 batchSize = param->batchSize;
4329 
4330     requestPtr.p->m_bits |= Request::RT_SCAN;
4331     requestPtr.p->m_bits |= Request::RT_NEED_PREPARE;
4332     requestPtr.p->m_bits |= Request::RT_NEED_COMPLETE;
4333     treeNodePtr.p->m_info = &g_ScanIndexOpInfo;
4334     treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
4335     treeNodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED;
4336     treeNodePtr.p->m_batch_size =
4337       batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4338 
4339     ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
4340     dst->senderData = treeNodePtr.i;
4341     dst->resultRef = reference();
4342     dst->resultData = treeNodePtr.i;
4343     dst->savePointId = ctx.m_savepointId;
4344     dst->batch_size_rows  =
4345       batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4346     dst->batch_size_bytes = batchSize >> QN_ScanIndexParameters::BatchRowBits;
4347 
4348     Uint32 transId1 = requestPtr.p->m_transId[0];
4349     Uint32 transId2 = requestPtr.p->m_transId[1];
4350     dst->transId1 = transId1;
4351     dst->transId2 = transId2;
4352 
4353     Uint32 treeBits = node->requestInfo;
4354     Uint32 paramBits = param->requestInfo;
4355     Uint32 requestInfo = 0;
4356     ScanFragReq::setRangeScanFlag(requestInfo, 1);
4357     ScanFragReq::setReadCommittedFlag(requestInfo, 1);
4358     ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
4359     ScanFragReq::setNoDiskFlag(requestInfo,
4360                                (treeBits & DABits::NI_LINKED_DISK) == 0 &&
4361                                (paramBits & DABits::PI_DISK_ATTR) == 0);
4362     ScanFragReq::setCorrFactorFlag(requestInfo, 1);
4363     dst->requestInfo = requestInfo;
4364 
4365     err = DbspjErr::InvalidTreeNodeSpecification;
4366     DEBUG("scanIndex_build: len=" << node->len);
4367     if (unlikely(node->len < QN_ScanIndexNode::NodeSize))
4368       break;
4369 
4370     dst->tableId = node->tableId;
4371     dst->schemaVersion = node->tableVersion;
4372 
4373     err = DbspjErr::InvalidTreeParametersSpecification;
4374     DEBUG("param len: " << param->len);
4375     if (unlikely(param->len < QN_ScanIndexParameters::NodeSize))
4376     {
4377       jam();
4378       DEBUG_CRASH();
4379       break;
4380     }
4381 
4382     ctx.m_resultData = param->resultData;
4383 
4384     /**
4385      * Parse stuff
4386      */
4387     struct DABuffer nodeDA, paramDA;
4388     nodeDA.ptr = node->optional;
4389     nodeDA.end = nodeDA.ptr + (node->len - QN_ScanIndexNode::NodeSize);
4390     paramDA.ptr = param->optional;
4391     paramDA.end = paramDA.ptr + (param->len - QN_ScanIndexParameters::NodeSize);
4392 
4393     err = parseScanIndex(ctx, requestPtr, treeNodePtr,
4394                          nodeDA, treeBits, paramDA, paramBits);
4395 
4396     if (unlikely(err != 0))
4397     {
4398       jam();
4399       DEBUG_CRASH();
4400       break;
4401     }
4402 
4403     /**
4404      * Since we T_NEED_REPORT_BATCH_COMPLETED, we set
4405      *   this on all our parents...
4406      */
4407     Ptr<TreeNode> nodePtr;
4408     nodePtr.i = treeNodePtr.p->m_parentPtrI;
4409     while (nodePtr.i != RNIL)
4410     {
4411       jam();
4412       m_treenode_pool.getPtr(nodePtr);
4413       nodePtr.p->m_bits |= TreeNode::T_REPORT_BATCH_COMPLETE;
4414       nodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED;
4415       nodePtr.i = nodePtr.p->m_parentPtrI;
4416     }
4417 
4418     /**
4419      * If there exists other scan TreeNodes not being among
4420      * my ancestors, results from this scanIndex may be repeated
4421      * as part of an X-scan.
4422      *
4423      * NOTE: The scan nodes being along the left deep ancestor chain
4424      *       are not 'repeatable' as they are driving the
4425      *       repeated X-scan and are thus not repeated themself.
4426      */
4427     if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT &&
4428        !treeNodePtr.p->m_ancestors.contains(ctx.m_scans))
4429     {
4430       treeNodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
4431     }
4432 
4433     ctx.m_scan_cnt++;
4434     ctx.m_scans.set(treeNodePtr.p->m_node_no);
4435 
4436     return 0;
4437   } while (0);
4438 
4439   return err;
4440 }
4441 
4442 Uint32
parseScanIndex(Build_context & ctx,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,DABuffer tree,Uint32 treeBits,DABuffer param,Uint32 paramBits)4443 Dbspj::parseScanIndex(Build_context& ctx,
4444                       Ptr<Request> requestPtr,
4445                       Ptr<TreeNode> treeNodePtr,
4446                       DABuffer tree, Uint32 treeBits,
4447                       DABuffer param, Uint32 paramBits)
4448 {
4449   Uint32 err = 0;
4450 
4451   typedef QN_ScanIndexNode Node;
4452   typedef QN_ScanIndexParameters Params;
4453 
4454   do
4455   {
4456     jam();
4457 
4458     ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4459     data.m_fragments.init();
4460     data.m_frags_outstanding = 0;
4461     data.m_frags_complete = 0;
4462     data.m_frags_not_started = 0;
4463     data.m_parallelismStat.init();
4464     data.m_firstExecution = true;
4465     data.m_batch_chunks = 0;
4466 
4467     err = parseDA(ctx, requestPtr, treeNodePtr,
4468                   tree, treeBits, param, paramBits);
4469     if (unlikely(err != 0))
4470       break;
4471 
4472     if (treeBits & Node::SI_PRUNE_PATTERN)
4473     {
4474       Uint32 len_cnt = * tree.ptr ++;
4475       Uint32 len = len_cnt & 0xFFFF; // length of pattern in words
4476       Uint32 cnt = len_cnt >> 16;    // no of parameters
4477 
4478       LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4479       ndbrequire((cnt==0) == ((treeBits & Node::SI_PRUNE_PARAMS) ==0));
4480       ndbrequire((cnt==0) == ((paramBits & Params::SIP_PRUNE_PARAMS)==0));
4481 
4482       if (treeBits & Node::SI_PRUNE_LINKED)
4483       {
4484         jam();
4485         DEBUG("LINKED-PRUNE PATTERN w/ " << cnt << " PARAM values");
4486 
4487         data.m_prunePattern.init();
4488         Local_pattern_store pattern(pool, data.m_prunePattern);
4489 
4490         /**
4491          * Expand pattern into a new pattern (with linked values)
4492          */
4493         err = expand(pattern, treeNodePtr, tree, len, param, cnt);
4494         if (unlikely(err != 0))
4495           break;
4496 
4497         treeNodePtr.p->m_bits |= TreeNode::T_PRUNE_PATTERN;
4498         c_Counters.incr_counter(CI_PRUNED_RANGE_SCANS_RECEIVED, 1);
4499       }
4500       else
4501       {
4502         jam();
4503         DEBUG("FIXED-PRUNE w/ " << cnt << " PARAM values");
4504 
4505         /**
4506          * Expand pattern directly into
4507          *   This means a "fixed" pruning from here on
4508          *   i.e guaranteed single partition
4509          */
4510         Uint32 prunePtrI = RNIL;
4511         bool hasNull;
4512         err = expand(prunePtrI, tree, len, param, cnt, hasNull);
4513         if (unlikely(err != 0))
4514           break;
4515 
4516         if (unlikely(hasNull))
4517         {
4518           /* API should have elliminated requests w/ const-NULL keys */
4519           jam();
4520           DEBUG("BEWARE: T_CONST_PRUNE-key contain NULL values");
4521 //        treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE;
4522 //        break;
4523           ndbrequire(false);
4524         }
4525         ndbrequire(prunePtrI != RNIL);  /* todo: can we allow / take advantage of NULLs in range scan? */
4526         data.m_constPrunePtrI = prunePtrI;
4527 
4528         /**
4529          * We may not compute the partition for the hash-key here
4530          *   as we have not yet opened a read-view
4531          */
4532         treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE;
4533         c_Counters.incr_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED, 1);
4534       }
4535     } //SI_PRUNE_PATTERN
4536 
4537     if ((treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE) == 0 &&
4538         ((treeBits & Node::SI_PARALLEL) ||
4539          ((paramBits & Params::SIP_PARALLEL))))
4540     {
4541       jam();
4542       treeNodePtr.p->m_bits |= TreeNode::T_SCAN_PARALLEL;
4543     }
4544 
4545     return 0;
4546   } while(0);
4547 
4548   DEBUG_CRASH();
4549   return err;
4550 }
4551 
4552 void
scanIndex_prepare(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4553 Dbspj::scanIndex_prepare(Signal * signal,
4554                          Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
4555 {
4556   jam();
4557 
4558   treeNodePtr.p->m_state = TreeNode::TN_PREPARING;
4559   ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
4560 
4561   DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend();
4562   req->senderRef = reference();
4563   req->senderData = treeNodePtr.i;
4564   req->tableId = dst->tableId;
4565   req->schemaTransId = 0;
4566   sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
4567              DihScanTabReq::SignalLength, JBB);
4568 
4569   requestPtr.p->m_outstanding++;
4570 }
4571 
4572 void
execDIH_SCAN_TAB_REF(Signal * signal)4573 Dbspj::execDIH_SCAN_TAB_REF(Signal* signal)
4574 {
4575   jamEntry();
4576   ndbrequire(false);
4577 }
4578 
4579 void
execDIH_SCAN_TAB_CONF(Signal * signal)4580 Dbspj::execDIH_SCAN_TAB_CONF(Signal* signal)
4581 {
4582   jamEntry();
4583   DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr();
4584 
4585   Ptr<TreeNode> treeNodePtr;
4586   m_treenode_pool.getPtr(treeNodePtr, conf->senderData);
4587   ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4588 
4589   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4590 
4591   Uint32 cookie = conf->scanCookie;
4592   Uint32 fragCount = conf->fragmentCount;
4593   ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq;
4594 
4595   if (conf->reorgFlag)
4596   {
4597     jam();
4598     ScanFragReq::setReorgFlag(dst->requestInfo, 1);
4599   }
4600 
4601   data.m_fragCount = fragCount;
4602   data.m_scanCookie = cookie;
4603 
4604   const Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
4605   bool pruned = (treeNodePtr.p->m_bits & prunemask) != 0;
4606 
4607   Ptr<Request> requestPtr;
4608   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4609 
4610   Ptr<ScanFragHandle> fragPtr;
4611   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4612   if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
4613   {
4614     jam();
4615     fragPtr.p->init(0);
4616     fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4617     list.addLast(fragPtr);
4618   }
4619   else
4620   {
4621     jam();
4622     goto error1;
4623   }
4624 
4625   if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE)
4626   {
4627     jam();
4628 
4629     // TODO we need a different variant of computeHash here,
4630     // since m_constPrunePtrI does not contain full primary key
4631     // but only parts in distribution key
4632 
4633     BuildKeyReq tmp;
4634     Uint32 indexId = dst->tableId;
4635     Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4636     Uint32 err = computePartitionHash(signal, tmp, tableId, data.m_constPrunePtrI);
4637     if (unlikely(err != 0))
4638       goto error;
4639 
4640     releaseSection(data.m_constPrunePtrI);
4641     data.m_constPrunePtrI = RNIL;
4642 
4643     err = getNodes(signal, tmp, tableId);
4644     if (unlikely(err != 0))
4645       goto error;
4646 
4647     fragPtr.p->m_fragId = tmp.fragId;
4648     fragPtr.p->m_ref = tmp.receiverRef;
4649     data.m_fragCount = 1;
4650   }
4651   else if (fragCount == 1)
4652   {
4653     jam();
4654     /**
4655      * This is roughly equivalent to T_CONST_PRUNE
4656      *   pretend that it is const-pruned
4657      */
4658     if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
4659     {
4660       jam();
4661       LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4662       Local_pattern_store pattern(pool, data.m_prunePattern);
4663       pattern.release();
4664     }
4665     data.m_constPrunePtrI = RNIL;
4666     Uint32 clear = TreeNode::T_PRUNE_PATTERN | TreeNode::T_SCAN_PARALLEL;
4667     treeNodePtr.p->m_bits &= ~clear;
4668     treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE;
4669 
4670     /**
4671      * We must get fragPtr.p->m_ref...so set pruned=false
4672      */
4673     pruned = false;
4674   }
4675   else
4676   {
4677     for (Uint32 i = 1; i<fragCount; i++)
4678     {
4679       jam();
4680       Ptr<ScanFragHandle> fragPtr;
4681       if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
4682       {
4683         jam();
4684         fragPtr.p->init(i);
4685         fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4686         list.addLast(fragPtr);
4687       }
4688       else
4689       {
4690         goto error1;
4691       }
4692     }
4693   }
4694   data.m_frags_complete = data.m_fragCount;
4695 
4696   if (!pruned)
4697   {
4698     jam();
4699     Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId;
4700     DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend();
4701     req->senderRef = reference();
4702     req->tableId = tableId;
4703     req->scanCookie = cookie;
4704 
4705     Uint32 cnt = 0;
4706     for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4707     {
4708       jam();
4709       req->senderData = fragPtr.i;
4710       req->fragId = fragPtr.p->m_fragId;
4711       sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
4712                  DihScanGetNodesReq::SignalLength, JBB);
4713       cnt++;
4714     }
4715     data.m_frags_outstanding = cnt;
4716     requestPtr.p->m_outstanding++;
4717   }
4718   else
4719   {
4720     jam();
4721     treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4722   }
4723   checkPrepareComplete(signal, requestPtr, 1);
4724 
4725   return;
4726 
4727 error1:
4728 error:
4729   ndbrequire(false);
4730 }
4731 
4732 void
execDIH_SCAN_GET_NODES_REF(Signal * signal)4733 Dbspj::execDIH_SCAN_GET_NODES_REF(Signal* signal)
4734 {
4735   jamEntry();
4736   ndbrequire(false);
4737 }
4738 
4739 void
execDIH_SCAN_GET_NODES_CONF(Signal * signal)4740 Dbspj::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
4741 {
4742   jamEntry();
4743 
4744   DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr();
4745 
4746   Uint32 senderData = conf->senderData;
4747   Uint32 node = conf->nodes[0];
4748   Uint32 instanceKey = conf->instanceKey;
4749 
4750   Ptr<ScanFragHandle> fragPtr;
4751   m_scanfraghandle_pool.getPtr(fragPtr, senderData);
4752   Ptr<TreeNode> treeNodePtr;
4753   m_treenode_pool.getPtr(treeNodePtr, fragPtr.p->m_treeNodePtrI);
4754   ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4755   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4756   ndbrequire(data.m_frags_outstanding > 0);
4757   data.m_frags_outstanding--;
4758 
4759   fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node);
4760 
4761   if (data.m_frags_outstanding == 0)
4762   {
4763     jam();
4764 
4765     treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4766 
4767     Ptr<Request> requestPtr;
4768     m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4769     checkPrepareComplete(signal, requestPtr, 1);
4770   }
4771 }
4772 
4773 Uint32
scanIndex_findFrag(Local_ScanFragHandle_list & list,Ptr<ScanFragHandle> & fragPtr,Uint32 fragId)4774 Dbspj::scanIndex_findFrag(Local_ScanFragHandle_list & list,
4775                           Ptr<ScanFragHandle> & fragPtr, Uint32 fragId)
4776 {
4777   for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4778   {
4779     jam();
4780     if (fragPtr.p->m_fragId == fragId)
4781     {
4782       jam();
4783       return 0;
4784     }
4785   }
4786 
4787   return 99; // TODO
4788 }
4789 
4790 void
scanIndex_parent_row(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)4791 Dbspj::scanIndex_parent_row(Signal* signal,
4792                             Ptr<Request> requestPtr,
4793                             Ptr<TreeNode> treeNodePtr,
4794                             const RowPtr & rowRef)
4795 {
4796   jam();
4797 
4798   Uint32 err;
4799   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4800 
4801   /**
4802    * Construct range definition,
4803    *   and if prune pattern enabled
4804    *   stuff it onto correct scanindexFrag
4805    */
4806   do
4807   {
4808     Ptr<ScanFragHandle> fragPtr;
4809     Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4810     LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4811     if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
4812     {
4813       jam();
4814 
4815       /**
4816        * TODO: Expand into linear memory instead
4817        *       of expanding into sections, and then copy
4818        *       section into linear
4819        */
4820       Local_pattern_store pattern(pool, data.m_prunePattern);
4821       Uint32 pruneKeyPtrI = RNIL;
4822       bool hasNull;
4823       err = expand(pruneKeyPtrI, pattern, rowRef, hasNull);
4824       if (unlikely(err != 0))
4825       {
4826         DEBUG_CRASH();
4827         break;
4828       }
4829 
4830       if (unlikely(hasNull))
4831       {
4832         jam();
4833         DEBUG("T_PRUNE_PATTERN-key contain NULL values");
4834 
4835         // Ignore this request as 'NULL == <column>' will never give a match
4836         if (pruneKeyPtrI != RNIL)
4837         {
4838           releaseSection(pruneKeyPtrI);
4839         }
4840         return;  // Bailout, SCANREQ would have returned 0 rows anyway
4841       }
4842 
4843       // TODO we need a different variant of computeHash here,
4844       // since pruneKeyPtrI does not contain full primary key
4845       // but only parts in distribution key
4846 
4847       BuildKeyReq tmp;
4848       ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq;
4849       Uint32 indexId = dst->tableId;
4850       Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4851       err = computePartitionHash(signal, tmp, tableId, pruneKeyPtrI);
4852       releaseSection(pruneKeyPtrI); // see ^ TODO
4853       if (unlikely(err != 0))
4854       {
4855         DEBUG_CRASH();
4856         break;
4857       }
4858 
4859       err = getNodes(signal, tmp, tableId);
4860       if (unlikely(err != 0))
4861       {
4862         DEBUG_CRASH();
4863         break;
4864       }
4865 
4866       err = scanIndex_findFrag(list, fragPtr, tmp.fragId);
4867       if (unlikely(err != 0))
4868       {
4869         DEBUG_CRASH();
4870         break;
4871       }
4872 
4873       /**
4874        * NOTE: We can get different receiverRef's here
4875        *       for different keys. E.g during node-recovery where
4876        *       primary-fragment is switched.
4877        *
4878        *       Use latest that we receive
4879        *
4880        * TODO: Also double check table-reorg
4881        */
4882       fragPtr.p->m_ref = tmp.receiverRef;
4883     }
4884     else
4885     {
4886       jam();
4887       /**
4888        * If const prune, or no-prune, store on first fragment,
4889        * and send to 1 or all resp.
4890        */
4891       list.first(fragPtr);
4892     }
4893 
4894     Uint32 ptrI = fragPtr.p->m_rangePtrI;
4895     bool hasNull;
4896     if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED)
4897     {
4898       jam();
4899       Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
4900       err = expand(ptrI, pattern, rowRef, hasNull);
4901       if (unlikely(err != 0))
4902       {
4903         DEBUG_CRASH();
4904         break;
4905       }
4906     }
4907     else
4908     {
4909       jam();
4910       // Fixed key...fix later...
4911       ndbrequire(false);
4912     }
4913 //  ndbrequire(!hasNull);  // FIXME, can't ignore request as we already added it to keyPattern
4914     fragPtr.p->m_rangePtrI = ptrI;
4915     scanIndex_fixupBound(fragPtr, ptrI, rowRef.m_src_correlation);
4916 
4917     if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT)
4918     {
4919       jam();
4920       /**
4921        * We being a T_ONE_SHOT means that we're only be called
4922        *   with parent_row once, i.e batch is complete
4923        */
4924       scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr);
4925     }
4926 
4927     return;
4928   } while (0);
4929 
4930   ndbrequire(false);
4931 }
4932 
4933 
4934 void
scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr,Uint32 ptrI,Uint32 corrVal)4935 Dbspj::scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr,
4936                             Uint32 ptrI, Uint32 corrVal)
4937 {
4938   /**
4939    * Index bounds...need special tender and care...
4940    *
4941    * 1) Set #bound no, bound-size, and renumber attributes
4942    */
4943   SectionReader r0(ptrI, getSectionSegmentPool());
4944   ndbrequire(r0.step(fragPtr.p->m_range_builder.m_range_size));
4945   Uint32 boundsz = r0.getSize() - fragPtr.p->m_range_builder.m_range_size;
4946   Uint32 boundno = fragPtr.p->m_range_builder.m_range_cnt + 1;
4947 
4948   Uint32 tmp;
4949   ndbrequire(r0.peekWord(&tmp));
4950   tmp |= (boundsz << 16) | ((corrVal & 0xFFF) << 4);
4951   ndbrequire(r0.updateWord(tmp));
4952   ndbrequire(r0.step(1));    // Skip first BoundType
4953 
4954   // TODO: Renumbering below assume there are only EQ-bounds !!
4955   Uint32 id = 0;
4956   Uint32 len32;
4957   do
4958   {
4959     ndbrequire(r0.peekWord(&tmp));
4960     AttributeHeader ah(tmp);
4961     Uint32 len = ah.getByteSize();
4962     AttributeHeader::init(&tmp, id++, len);
4963     ndbrequire(r0.updateWord(tmp));
4964     len32 = (len + 3) >> 2;
4965   } while (r0.step(2 + len32));  // Skip AttributeHeader(1) + Attribute(len32) + next BoundType(1)
4966 
4967   fragPtr.p->m_range_builder.m_range_cnt = boundno;
4968   fragPtr.p->m_range_builder.m_range_size = r0.getSize();
4969 }
4970 
4971 void
scanIndex_parent_batch_complete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4972 Dbspj::scanIndex_parent_batch_complete(Signal* signal,
4973                                        Ptr<Request> requestPtr,
4974                                        Ptr<TreeNode> treeNodePtr)
4975 {
4976   jam();
4977 
4978   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4979   data.m_rows_received = 0;
4980   data.m_rows_expecting = 0;
4981   ndbassert(data.m_frags_outstanding == 0);
4982   ndbassert(data.m_frags_complete == data.m_fragCount);
4983   data.m_frags_complete = 0;
4984 
4985   Ptr<ScanFragHandle> fragPtr;
4986   {
4987     Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4988     list.first(fragPtr);
4989 
4990     if ((treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) == 0)
4991     {
4992       if (fragPtr.p->m_rangePtrI == RNIL)
4993       {
4994         // No keys found
4995         jam();
4996         data.m_frags_complete = data.m_fragCount;
4997       }
4998     }
4999     else
5000     {
5001       while(!fragPtr.isNull())
5002       {
5003         if (fragPtr.p->m_rangePtrI == RNIL)
5004         {
5005           jam();
5006           /**
5007            * This is a pruned scan, so we must scan those fragments that
5008            * some distribution key hashed to.
5009            */
5010           fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5011           data.m_frags_complete++;
5012         }
5013         list.next(fragPtr);
5014       }
5015     }
5016   }
5017   data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
5018 
5019   if (data.m_frags_complete == data.m_fragCount)
5020   {
5021     jam();
5022     /**
5023      * No keys was produced...
5024      */
5025     return;
5026   }
5027 
5028   /**
5029    * When parent's batch is complete, we send our batch
5030    */
5031   const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
5032   ndbrequire(org->batch_size_rows > 0);
5033 
5034   if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
5035   {
5036     jam();
5037     data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5038                              org->batch_size_rows);
5039   }
5040   else if (data.m_firstExecution)
5041   {
5042     /**
5043      * Having a high parallelism would allow us to fetch data from many
5044      * fragments in parallel and thus reduce the number of round trips.
5045      * On the other hand, we should set parallelism so low that we can fetch
5046      * all data from a fragment in one batch if possible.
5047      * Since this is the first execution, we do not know how many rows or bytes
5048      * this operation is likely to return. Therefore we set parallelism to 1,
5049      * since this gives the lowest penalty if our guess is wrong.
5050      */
5051     jam();
5052     data.m_parallelism = 1;
5053   }
5054   else
5055   {
5056     jam();
5057     /**
5058      * Use statistics from earlier runs of this operation to estimate the
5059      * initial parallelism. We use the mean minus two times the standard
5060      * deviation to have a low risk of setting parallelism to high (as erring
5061      * in the other direction is more costly).
5062      */
5063     Int32 parallelism =
5064       static_cast<Int32>(MIN(data.m_parallelismStat.getMean()
5065                              - 2 * data.m_parallelismStat.getStdDev(),
5066                              org->batch_size_rows));
5067 
5068     if (parallelism < 1)
5069     {
5070       jam();
5071       parallelism = 1;
5072     }
5073     else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
5074     {
5075       jam();
5076       /**
5077        * Set parallelism such that we can expect to have similar
5078        * parallelism in each batch. For example if there are 8 remaining
5079        * fragments, then we should fecth 2 times 4 fragments rather than
5080        * 7+1.
5081        */
5082       const Int32 roundTrips =
5083         1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
5084       parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
5085     }
5086 
5087     data.m_parallelism = static_cast<Uint32>(parallelism);
5088 
5089 #ifdef DEBUG_SCAN_FRAGREQ
5090     DEBUG("::scanIndex_send() starting index scan with parallelism="
5091           << data.m_parallelism);
5092 #endif
5093   }
5094   ndbrequire(data.m_parallelism > 0);
5095 
5096   const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
5097   const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
5098   ndbassert(bs_rows > 0);
5099   ndbassert(bs_bytes > 0);
5100 
5101   data.m_largestBatchRows = 0;
5102   data.m_largestBatchBytes = 0;
5103   data.m_totalRows = 0;
5104   data.m_totalBytes = 0;
5105 
5106   {
5107     Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5108     Ptr<ScanFragHandle> fragPtr;
5109     list.first(fragPtr);
5110 
5111     while(!fragPtr.isNull())
5112     {
5113       ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
5114                 fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
5115       fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
5116       list.next(fragPtr);
5117     }
5118   }
5119 
5120   Uint32 batchRange = 0;
5121   scanIndex_send(signal,
5122                  requestPtr,
5123                  treeNodePtr,
5124                  data.m_parallelism,
5125                  bs_bytes,
5126                  bs_rows,
5127                  batchRange);
5128 
5129   data.m_firstExecution = false;
5130 
5131   ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
5132                                  data.m_frags_complete) <=
5133              data.m_fragCount);
5134 
5135   data.m_batch_chunks = 1;
5136   requestPtr.p->m_cnt_active++;
5137   requestPtr.p->m_outstanding++;
5138   treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
5139 }
5140 
5141 void
scanIndex_parent_batch_repeat(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5142 Dbspj::scanIndex_parent_batch_repeat(Signal* signal,
5143                                       Ptr<Request> requestPtr,
5144                                       Ptr<TreeNode> treeNodePtr)
5145 {
5146   jam();
5147   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5148 
5149   DEBUG("scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no
5150         << ", m_batch_chunks: " << data.m_batch_chunks);
5151 
5152   ndbassert(treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE);
5153 
5154   /**
5155    * Register index-scans to be restarted if we didn't get all
5156    * previously fetched parent related child rows in a single batch.
5157    */
5158   if (data.m_batch_chunks > 1)
5159   {
5160     jam();
5161     DEBUG("Register TreeNode for restart, m_node_no: " << treeNodePtr.p->m_node_no);
5162     ndbrequire(treeNodePtr.p->m_state != TreeNode::TN_ACTIVE);
5163     registerActiveCursor(requestPtr, treeNodePtr);
5164     data.m_batch_chunks = 0;
5165   }
5166 }
5167 
5168 /**
5169  * Ask for the first batch for a number of fragments.
5170  */
5171 void
scanIndex_send(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Uint32 noOfFrags,Uint32 bs_bytes,Uint32 bs_rows,Uint32 & batchRange)5172 Dbspj::scanIndex_send(Signal* signal,
5173                       Ptr<Request> requestPtr,
5174                       Ptr<TreeNode> treeNodePtr,
5175                       Uint32 noOfFrags,
5176                       Uint32 bs_bytes,
5177                       Uint32 bs_rows,
5178                       Uint32& batchRange)
5179 {
5180   /**
5181    * if (m_bits & prunemask):
5182    * - Range keys sliced out to each ScanFragHandle
5183    * - Else, range keys kept on first (and only) ScanFragHandle
5184    */
5185   const bool prune = treeNodePtr.p->m_bits &
5186     (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
5187 
5188   /**
5189    * If scan is repeatable, we must make sure not to release range keys so
5190    * that we canuse them again in the next repetition.
5191    */
5192   const bool repeatable =
5193     (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
5194 
5195   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5196   ndbassert(noOfFrags > 0);
5197   ndbassert(data.m_frags_not_started >= noOfFrags);
5198   ScanFragReq* const req =
5199     reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
5200   const ScanFragReq * const org
5201     = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
5202   memcpy(req, org, sizeof(data.m_scanFragReq));
5203   // req->variableData[0] // set below
5204   req->variableData[1] = requestPtr.p->m_rootResultData;
5205   req->batch_size_bytes = bs_bytes;
5206   req->batch_size_rows = bs_rows;
5207 
5208   Uint32 requestsSent = 0;
5209   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5210   Ptr<ScanFragHandle> fragPtr;
5211   list.first(fragPtr);
5212   Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
5213   ndbrequire(prune || keyInfoPtrI != RNIL);
5214   /**
5215    * Iterate over the list of fragments until we have sent as many
5216    * SCAN_FRAGREQs as we should.
5217    */
5218   while (requestsSent < noOfFrags)
5219   {
5220     jam();
5221     ndbassert(!fragPtr.isNull());
5222 
5223     if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
5224     {
5225       // Skip forward to the frags that we should send.
5226       jam();
5227       list.next(fragPtr);
5228       continue;
5229     }
5230 
5231     const Uint32 ref = fragPtr.p->m_ref;
5232 
5233     if (noOfFrags==1 && !prune &&
5234         data.m_frags_not_started == data.m_fragCount &&
5235         refToNode(ref) != getOwnNodeId() &&
5236         list.hasNext(fragPtr))
5237     {
5238       /**
5239        * If we are doing a scan with adaptive parallelism and start with
5240        * parallelism=1 then it makes sense to fetch a batch from a fragment on
5241        * the local data node. The reason for this is that if that fragment
5242        * contains few rows, we may be able to read from several fragments in
5243        * parallel. Then we minimize the total number of round trips (to remote
5244        * data nodes) if we fetch the first fragment batch locally.
5245        */
5246       jam();
5247       list.next(fragPtr);
5248       continue;
5249     }
5250 
5251     SectionHandle handle(this);
5252 
5253     Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
5254 
5255     /**
5256      * Set data specific for this fragment
5257      */
5258     req->senderData = fragPtr.i;
5259     req->fragmentNoKeyLen = fragPtr.p->m_fragId;
5260 
5261     if (prune)
5262     {
5263       jam();
5264       keyInfoPtrI = fragPtr.p->m_rangePtrI;
5265       if (keyInfoPtrI == RNIL)
5266       {
5267         /**
5268          * Since we use pruning, we can see that no parent rows would hash
5269          * to this fragment.
5270          */
5271         jam();
5272         fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5273         list.next(fragPtr);
5274         continue;
5275       }
5276 
5277       if (!repeatable)
5278       {
5279         /**
5280          * If we'll use sendSignal() and we need to send the attrInfo several
5281          * times, we need to copy them. (For repeatable or unpruned scans
5282          * we use sendSignalNoRelease(), so then we do not need to copy.)
5283          */
5284         jam();
5285         Uint32 tmp = RNIL;
5286         ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
5287         attrInfoPtrI = tmp;
5288       }
5289     }
5290 
5291     req->variableData[0] = batchRange;
5292     getSection(handle.m_ptr[0], attrInfoPtrI);
5293     getSection(handle.m_ptr[1], keyInfoPtrI);
5294     handle.m_cnt = 2;
5295 
5296 #if defined DEBUG_SCAN_FRAGREQ
5297     ndbout_c("SCAN_FRAGREQ to %x", ref);
5298     printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
5299                       NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
5300                       DBLQH);
5301     printf("ATTRINFO: ");
5302     print(handle.m_ptr[0], stdout);
5303     printf("KEYINFO: ");
5304     print(handle.m_ptr[1], stdout);
5305 #endif
5306 
5307     if (refToNode(ref) == getOwnNodeId())
5308     {
5309       c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
5310     }
5311     else
5312     {
5313       c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
5314     }
5315 
5316     if (prune && !repeatable)
5317     {
5318       /**
5319        * For a non-repeatable pruned scan, key info is unique for each
5320        * fragment and therefore cannot be reused, so we release key info
5321        * right away.
5322        */
5323       jam();
5324       sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
5325                  NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5326       fragPtr.p->m_rangePtrI = RNIL;
5327       fragPtr.p->reset_ranges();
5328     }
5329     else
5330     {
5331       /**
5332        * Reuse key info for multiple fragments and/or multiple repetitions
5333        * of the scan.
5334        */
5335       jam();
5336       sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
5337                           NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5338     }
5339     handle.clear();
5340 
5341     fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
5342     data.m_frags_outstanding++;
5343     batchRange += bs_rows;
5344     requestsSent++;
5345     list.next(fragPtr);
5346   } // while (requestsSent < noOfFrags)
5347 
5348   data.m_frags_not_started -= requestsSent;
5349 }
5350 
5351 void
scanIndex_execTRANSID_AI(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)5352 Dbspj::scanIndex_execTRANSID_AI(Signal* signal,
5353                                 Ptr<Request> requestPtr,
5354                                 Ptr<TreeNode> treeNodePtr,
5355                                 const RowPtr & rowRef)
5356 {
5357   jam();
5358 
5359   LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
5360   Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
5361   Dependency_map::ConstDataBufferIterator it;
5362 
5363   {
5364     for (list.first(it); !it.isNull(); list.next(it))
5365     {
5366       jam();
5367       Ptr<TreeNode> childPtr;
5368       m_treenode_pool.getPtr(childPtr, * it.data);
5369       ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
5370       (this->*(childPtr.p->m_info->m_parent_row))(signal,
5371                                                   requestPtr, childPtr,rowRef);
5372     }
5373   }
5374 
5375   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5376   data.m_rows_received++;
5377 
5378   if (data.m_frags_outstanding == 0 &&
5379       data.m_rows_received == data.m_rows_expecting)
5380   {
5381     jam();
5382     /**
5383      * Finished...
5384      */
5385     if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
5386     {
5387       jam();
5388       reportBatchComplete(signal, requestPtr, treeNodePtr);
5389     }
5390 
5391     checkBatchComplete(signal, requestPtr, 1);
5392     return;
5393   }
5394 }
5395 
5396 void
scanIndex_execSCAN_FRAGCONF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> fragPtr)5397 Dbspj::scanIndex_execSCAN_FRAGCONF(Signal* signal,
5398                                    Ptr<Request> requestPtr,
5399                                    Ptr<TreeNode> treeNodePtr,
5400                                    Ptr<ScanFragHandle> fragPtr)
5401 {
5402   jam();
5403 
5404   const ScanFragConf * conf = (const ScanFragConf*)(signal->getDataPtr());
5405 
5406   Uint32 rows = conf->completedOps;
5407   Uint32 done = conf->fragmentCompleted;
5408 
5409   Uint32 state = fragPtr.p->m_state;
5410   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5411 
5412   if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
5413   {
5414     jam();
5415     /**
5416      * We sent an explicit close request...ignore this...a close will come later
5417      */
5418     return;
5419   }
5420 
5421   requestPtr.p->m_rows += rows;
5422   data.m_totalRows += rows;
5423   data.m_totalBytes += conf->total_len;
5424   data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
5425   data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
5426 
5427   if (!treeNodePtr.p->isLeaf())
5428   {
5429     jam();
5430     data.m_rows_expecting += rows;
5431   }
5432   ndbrequire(data.m_frags_outstanding);
5433   ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5434              state == ScanFragHandle::SFH_WAIT_CLOSE);
5435 
5436   data.m_frags_outstanding--;
5437   fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
5438 
5439   if (done)
5440   {
5441     jam();
5442     fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5443     ndbrequire(data.m_frags_complete < data.m_fragCount);
5444     data.m_frags_complete++;
5445 
5446     if (data.m_frags_complete == data.m_fragCount ||
5447         ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
5448          data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
5449     {
5450       jam();
5451       ndbrequire(requestPtr.p->m_cnt_active);
5452       requestPtr.p->m_cnt_active--;
5453       treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5454     }
5455   }
5456 
5457 
5458   if (data.m_frags_outstanding == 0)
5459   {
5460     const ScanFragReq * const org
5461       = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
5462 
5463     if (data.m_frags_complete == data.m_fragCount)
5464     {
5465       jam();
5466       /**
5467        * Calculate what would have been the optimal parallelism for the
5468        * scan instance that we have just completed, and update
5469        * 'parallelismStat' with this value. We then use this statistics to set
5470        * the initial parallelism for the next instance of this operation.
5471        */
5472       double parallelism = data.m_fragCount;
5473       if (data.m_totalRows > 0)
5474       {
5475         parallelism = MIN(parallelism,
5476                           double(org->batch_size_rows) / data.m_totalRows);
5477       }
5478       if (data.m_totalBytes > 0)
5479       {
5480         parallelism = MIN(parallelism,
5481                           double(org->batch_size_bytes) / data.m_totalBytes);
5482       }
5483       data.m_parallelismStat.update(parallelism);
5484     }
5485 
5486     /**
5487      * Don't reportBatchComplete to children if we're aborting...
5488      */
5489     if (state == ScanFragHandle::SFH_WAIT_CLOSE)
5490     {
5491       jam();
5492       ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
5493     }
5494     else if (! (data.m_rows_received == data.m_rows_expecting))
5495     {
5496       jam();
5497       return;
5498     }
5499     else
5500     {
5501       if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
5502       {
5503         jam();
5504         reportBatchComplete(signal, requestPtr, treeNodePtr);
5505       }
5506     }
5507 
5508     checkBatchComplete(signal, requestPtr, 1);
5509     return;
5510   }
5511 }
5512 
5513 void
scanIndex_execSCAN_FRAGREF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> fragPtr)5514 Dbspj::scanIndex_execSCAN_FRAGREF(Signal* signal,
5515                                   Ptr<Request> requestPtr,
5516                                   Ptr<TreeNode> treeNodePtr,
5517                                   Ptr<ScanFragHandle> fragPtr)
5518 {
5519   jam();
5520 
5521   const ScanFragRef * rep = CAST_CONSTPTR(ScanFragRef, signal->getDataPtr());
5522   const Uint32 errCode = rep->errorCode;
5523 
5524   Uint32 state = fragPtr.p->m_state;
5525   ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5526              state == ScanFragHandle::SFH_WAIT_CLOSE);
5527 
5528   fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5529 
5530   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5531   ndbrequire(data.m_frags_complete < data.m_fragCount);
5532   data.m_frags_complete++;
5533   ndbrequire(data.m_frags_outstanding > 0);
5534   data.m_frags_outstanding--;
5535 
5536   if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5537   {
5538     jam();
5539     ndbrequire(requestPtr.p->m_cnt_active);
5540     requestPtr.p->m_cnt_active--;
5541     treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5542   }
5543 
5544   if (data.m_frags_outstanding == 0)
5545   {
5546     jam();
5547     ndbrequire(requestPtr.p->m_outstanding);
5548     requestPtr.p->m_outstanding--;
5549   }
5550 
5551   abort(signal, requestPtr, errCode);
5552 }
5553 
5554 void
scanIndex_execSCAN_NEXTREQ(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5555 Dbspj::scanIndex_execSCAN_NEXTREQ(Signal* signal,
5556                                   Ptr<Request> requestPtr,
5557                                   Ptr<TreeNode> treeNodePtr)
5558 {
5559   jam();
5560 
5561   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5562   const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
5563 
5564   data.m_rows_received = 0;
5565   data.m_rows_expecting = 0;
5566   ndbassert(data.m_frags_outstanding == 0);
5567 
5568   ndbrequire(data.m_frags_complete < data.m_fragCount);
5569   if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
5570   {
5571     jam();
5572     /**
5573      * Since fetching few but large batches is more efficient, we
5574      * set parallelism to the lowest value where we can still expect each
5575      * batch to be full.
5576      */
5577     if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
5578         data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
5579     {
5580       jam();
5581       data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5582                                org->batch_size_rows);
5583       if (data.m_largestBatchRows > 0)
5584       {
5585         jam();
5586         data.m_parallelism =
5587           MIN(org->batch_size_rows / data.m_largestBatchRows,
5588               data.m_parallelism);
5589       }
5590       if (data.m_largestBatchBytes > 0)
5591       {
5592         jam();
5593         data.m_parallelism =
5594           MIN(data.m_parallelism,
5595               org->batch_size_bytes/data.m_largestBatchBytes);
5596       }
5597       if (data.m_frags_complete == 0 &&
5598           data.m_frags_not_started % data.m_parallelism != 0)
5599       {
5600         jam();
5601         /**
5602          * Set parallelism such that we can expect to have similar
5603          * parallelism in each batch. For example if there are 8 remaining
5604          * fragments, then we should fecth 2 times 4 fragments rather than
5605          * 7+1.
5606          */
5607         const Uint32 roundTrips =
5608           1 + data.m_frags_not_started / data.m_parallelism;
5609         data.m_parallelism = data.m_frags_not_started / roundTrips;
5610       }
5611     }
5612     else
5613     {
5614       jam();
5615       // We get full batches, so we should lower parallelism.
5616       data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5617                                MAX(1, data.m_parallelism/2));
5618     }
5619     ndbassert(data.m_parallelism > 0);
5620 #ifdef DEBUG_SCAN_FRAGREQ
5621     DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
5622           data.m_parallelism <<
5623           " fragments with " << org->batch_size_rows/data.m_parallelism <<
5624           " rows and " << org->batch_size_bytes/data.m_parallelism <<
5625           " bytes.");
5626 #endif
5627   }
5628   else
5629   {
5630     jam();
5631     data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5632                              org->batch_size_rows);
5633   }
5634 
5635   const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
5636   ndbassert(bs_rows > 0);
5637   ScanFragNextReq* req =
5638     reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
5639   req->requestInfo = 0;
5640   ScanFragNextReq::setCorrFactorFlag(req->requestInfo);
5641   req->transId1 = requestPtr.p->m_transId[0];
5642   req->transId2 = requestPtr.p->m_transId[1];
5643   req->batch_size_rows = bs_rows;
5644   req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
5645 
5646   Uint32 batchRange = 0;
5647   Ptr<ScanFragHandle> fragPtr;
5648   Uint32 sentFragCount = 0;
5649   {
5650     /**
5651      * First, ask for more data from fragments that are already started.
5652      */
5653   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5654   list.first(fragPtr);
5655     while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
5656   {
5657     jam();
5658       ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
5659                 fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
5660                 fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
5661     if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
5662     {
5663       jam();
5664 
5665       data.m_frags_outstanding++;
5666       req->variableData[0] = batchRange;
5667       fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
5668       batchRange += bs_rows;
5669 
5670       DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
5671             << treeNodePtr.p->m_send.m_ref
5672               << ", m_node_no=" << treeNodePtr.p->m_node_no
5673             << ", senderData: " << req->senderData);
5674 
5675 #ifdef DEBUG_SCAN_FRAGREQ
5676       printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
5677                            ScanFragNextReq:: SignalLength + 1, DBLQH);
5678 #endif
5679 
5680       req->senderData = fragPtr.i;
5681       sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5682                  ScanFragNextReq::SignalLength + 1,
5683                  JBB);
5684         sentFragCount++;
5685       }
5686       list.next(fragPtr);
5687     }
5688   }
5689 
5690   if (sentFragCount < data.m_parallelism)
5691   {
5692     /**
5693      * Then start new fragments until we reach data.m_parallelism.
5694      */
5695     jam();
5696     ndbassert(data.m_frags_not_started != 0);
5697     scanIndex_send(signal,
5698                    requestPtr,
5699                    treeNodePtr,
5700                    data.m_parallelism - sentFragCount,
5701                    org->batch_size_bytes/data.m_parallelism,
5702                    bs_rows,
5703                    batchRange);
5704   }
5705   /**
5706    * cursor should not have been positioned here...
5707    *   unless we actually had something more to send.
5708    *   so require that we did actually send something
5709    */
5710   ndbrequire(data.m_frags_outstanding > 0);
5711   ndbrequire(data.m_batch_chunks > 0);
5712   data.m_batch_chunks++;
5713 
5714   requestPtr.p->m_outstanding++;
5715   ndbassert(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
5716 }
5717 
5718 void
scanIndex_complete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5719 Dbspj::scanIndex_complete(Signal* signal,
5720                           Ptr<Request> requestPtr,
5721                           Ptr<TreeNode> treeNodePtr)
5722 {
5723   jam();
5724   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5725   ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
5726   if (!data.m_fragments.isEmpty())
5727   {
5728     jam();
5729     DihScanTabCompleteRep* rep=(DihScanTabCompleteRep*)signal->getDataPtrSend();
5730     rep->tableId = dst->tableId;
5731     rep->scanCookie = data.m_scanCookie;
5732     sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP,
5733                signal, DihScanTabCompleteRep::SignalLength, JBB);
5734   }
5735 }
5736 
5737 void
scanIndex_abort(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5738 Dbspj::scanIndex_abort(Signal* signal,
5739                        Ptr<Request> requestPtr,
5740                        Ptr<TreeNode> treeNodePtr)
5741 {
5742   jam();
5743 
5744   switch(treeNodePtr.p->m_state){
5745   case TreeNode::TN_BUILDING:
5746   case TreeNode::TN_PREPARING:
5747   case TreeNode::TN_INACTIVE:
5748   case TreeNode::TN_COMPLETING:
5749   case TreeNode::TN_END:
5750     ndbout_c("H'%.8x H'%.8x scanIndex_abort state: %u",
5751              requestPtr.p->m_transId[0],
5752              requestPtr.p->m_transId[1],
5753              treeNodePtr.p->m_state);
5754     return;
5755 
5756   case TreeNode::TN_ACTIVE:
5757     jam();
5758     break;
5759   }
5760 
5761   ScanFragNextReq* req = CAST_PTR(ScanFragNextReq, signal->getDataPtrSend());
5762   req->requestInfo = ScanFragNextReq::ZCLOSE;
5763   req->transId1 = requestPtr.p->m_transId[0];
5764   req->transId2 = requestPtr.p->m_transId[1];
5765   req->batch_size_rows = 0;
5766   req->batch_size_bytes = 0;
5767 
5768   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5769   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5770   Ptr<ScanFragHandle> fragPtr;
5771 
5772   Uint32 cnt_waiting = 0;
5773   Uint32 cnt_scanning = 0;
5774   for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5775   {
5776     switch(fragPtr.p->m_state){
5777     case ScanFragHandle::SFH_NOT_STARTED:
5778     case ScanFragHandle::SFH_COMPLETE:
5779     case ScanFragHandle::SFH_WAIT_CLOSE:
5780       jam();
5781       break;
5782     case ScanFragHandle::SFH_WAIT_NEXTREQ:
5783       jam();
5784       cnt_waiting++;              // was idle...
5785       data.m_frags_outstanding++; // is closing
5786       goto do_abort;
5787     case ScanFragHandle::SFH_SCANNING:
5788       jam();
5789       cnt_scanning++;
5790       goto do_abort;
5791     do_abort:
5792       req->senderData = fragPtr.i;
5793       sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5794                  ScanFragNextReq::SignalLength, JBB);
5795 
5796       fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
5797       break;
5798     }
5799   }
5800 
5801   if (cnt_scanning == 0)
5802   {
5803     if (cnt_waiting > 0)
5804     {
5805       /**
5806        * If all were waiting...this should increase m_outstanding
5807        */
5808       jam();
5809       requestPtr.p->m_outstanding++;
5810     }
5811     else
5812     {
5813       /**
5814        * All fragments are either complete or not yet started, so there is
5815        * nothing to abort.
5816        */
5817       jam();
5818       ndbassert(data.m_frags_not_started > 0);
5819       ndbrequire(requestPtr.p->m_cnt_active);
5820       requestPtr.p->m_cnt_active--;
5821       treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5822     }
5823   }
5824 }
5825 
5826 Uint32
scanIndex_execNODE_FAILREP(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,NdbNodeBitmask nodes)5827 Dbspj::scanIndex_execNODE_FAILREP(Signal* signal,
5828                                   Ptr<Request> requestPtr,
5829                                   Ptr<TreeNode> treeNodePtr,
5830                                   NdbNodeBitmask nodes)
5831 {
5832   jam();
5833 
5834   switch(treeNodePtr.p->m_state){
5835   case TreeNode::TN_PREPARING:
5836   case TreeNode::TN_INACTIVE:
5837     return 1;
5838 
5839   case TreeNode::TN_BUILDING:
5840   case TreeNode::TN_COMPLETING:
5841   case TreeNode::TN_END:
5842     return 0;
5843 
5844   case TreeNode::TN_ACTIVE:
5845     jam();
5846     break;
5847   }
5848 
5849 
5850   Uint32 sum = 0;
5851   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5852   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5853   Ptr<ScanFragHandle> fragPtr;
5854 
5855   Uint32 save0 = data.m_frags_outstanding;
5856   Uint32 save1 = data.m_frags_complete;
5857 
5858   for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5859   {
5860     if (nodes.get(refToNode(fragPtr.p->m_ref)) == false)
5861     {
5862       jam();
5863       /**
5864        * No action needed
5865        */
5866       continue;
5867     }
5868 
5869     switch(fragPtr.p->m_state){
5870     case ScanFragHandle::SFH_NOT_STARTED:
5871       jam();
5872       ndbrequire(data.m_frags_complete < data.m_fragCount);
5873       data.m_frags_complete++;
5874       ndbrequire(data.m_frags_not_started > 0);
5875       data.m_frags_not_started--;
5876       // fall through
5877     case ScanFragHandle::SFH_COMPLETE:
5878       jam();
5879       sum++; // indicate that we should abort
5880       /**
5881        * we could keep list of all fragments...
5882        *   or execute DIGETNODES again...
5883        *   but for now, we don't
5884        */
5885       break;
5886     case ScanFragHandle::SFH_WAIT_CLOSE:
5887     case ScanFragHandle::SFH_SCANNING:
5888       jam();
5889       ndbrequire(data.m_frags_outstanding > 0);
5890       data.m_frags_outstanding--;
5891       // fall through
5892     case ScanFragHandle::SFH_WAIT_NEXTREQ:
5893       jam();
5894       sum++;
5895       ndbrequire(data.m_frags_complete < data.m_fragCount);
5896       data.m_frags_complete++;
5897       break;
5898     }
5899     fragPtr.p->m_ref = 0;
5900     fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5901   }
5902 
5903   if (save0 != 0 && data.m_frags_outstanding == 0)
5904   {
5905     jam();
5906     ndbrequire(requestPtr.p->m_outstanding);
5907     requestPtr.p->m_outstanding--;
5908   }
5909 
5910   if (save1 != 0 &&
5911       data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5912   {
5913     jam();
5914     ndbrequire(requestPtr.p->m_cnt_active);
5915     requestPtr.p->m_cnt_active--;
5916     treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5917   }
5918 
5919   return sum;
5920 }
5921 
5922 void
scanIndex_release_rangekeys(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5923 Dbspj::scanIndex_release_rangekeys(Ptr<Request> requestPtr,
5924                                    Ptr<TreeNode> treeNodePtr)
5925 {
5926   jam();
5927   DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
5928           << " m_node_no: " << treeNodePtr.p->m_node_no);
5929 
5930   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5931   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5932   Ptr<ScanFragHandle> fragPtr;
5933 
5934   if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
5935   {
5936     jam();
5937     for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5938     {
5939       if (fragPtr.p->m_rangePtrI != RNIL)
5940       {
5941         releaseSection(fragPtr.p->m_rangePtrI);
5942         fragPtr.p->m_rangePtrI = RNIL;
5943       }
5944       fragPtr.p->reset_ranges();
5945     }
5946   }
5947   else
5948   {
5949     jam();
5950     list.first(fragPtr);
5951     if (fragPtr.p->m_rangePtrI != RNIL)
5952     {
5953       releaseSection(fragPtr.p->m_rangePtrI);
5954       fragPtr.p->m_rangePtrI = RNIL;
5955     }
5956     fragPtr.p->reset_ranges();
5957   }
5958 }
5959 
5960 /**
5961  * Parent batch has completed, and will not refetch (X-joined) results
5962  * from its childs. Release & reset range keys which are unsent or we
5963  * have kept for possible resubmits.
5964  */
5965 void
scanIndex_parent_batch_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5966 Dbspj::scanIndex_parent_batch_cleanup(Ptr<Request> requestPtr,
5967                                       Ptr<TreeNode> treeNodePtr)
5968 {
5969   DEBUG("scanIndex_parent_batch_cleanup");
5970   scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5971 }
5972 
5973 void
scanIndex_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5974 Dbspj::scanIndex_cleanup(Ptr<Request> requestPtr,
5975                          Ptr<TreeNode> treeNodePtr)
5976 {
5977   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5978   DEBUG("scanIndex_cleanup");
5979 
5980   /**
5981    * Range keys has been collected wherever there are uncompleted
5982    * parent batches...release them to avoid memleak.
5983    */
5984   scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5985 
5986   {
5987     Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5988     list.remove();
5989   }
5990   if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
5991   {
5992     jam();
5993     LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
5994     Local_pattern_store pattern(pool, data.m_prunePattern);
5995     pattern.release();
5996   }
5997   else if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE)
5998   {
5999     jam();
6000     if (data.m_constPrunePtrI != RNIL)
6001     {
6002       jam();
6003       releaseSection(data.m_constPrunePtrI);
6004       data.m_constPrunePtrI = RNIL;
6005     }
6006   }
6007 
6008   cleanup_common(requestPtr, treeNodePtr);
6009 }
6010 
6011 /**
6012  * END - MODULE SCAN INDEX
6013  */
6014 
6015 /**
6016  * Static OpInfo handling
6017  */
6018 const Dbspj::OpInfo*
getOpInfo(Uint32 op)6019 Dbspj::getOpInfo(Uint32 op)
6020 {
6021   DEBUG("getOpInfo(" << op << ")");
6022   switch(op){
6023   case QueryNode::QN_LOOKUP:
6024     return &Dbspj::g_LookupOpInfo;
6025   case QueryNode::QN_SCAN_FRAG:
6026     return &Dbspj::g_ScanFragOpInfo;
6027   case QueryNode::QN_SCAN_INDEX:
6028     return &Dbspj::g_ScanIndexOpInfo;
6029   default:
6030     return 0;
6031   }
6032 }
6033 
6034 /**
6035  * MODULE COMMON PARSE/UNPACK
6036  */
6037 
6038 /**
6039  *  @returns dstLen + 1 on error
6040  */
6041 static
6042 Uint32
unpackList(Uint32 dstLen,Uint32 * dst,Dbspj::DABuffer & buffer)6043 unpackList(Uint32 dstLen, Uint32 * dst, Dbspj::DABuffer & buffer)
6044 {
6045   const Uint32 * ptr = buffer.ptr;
6046   if (likely(ptr != buffer.end))
6047   {
6048     Uint32 tmp = * ptr++;
6049     Uint32 cnt = tmp & 0xFFFF;
6050 
6051     * dst ++ = (tmp >> 16); // Store first
6052     DEBUG("cnt: " << cnt << " first: " << (tmp >> 16));
6053 
6054     if (cnt > 1)
6055     {
6056       Uint32 len = cnt / 2;
6057       if (unlikely(cnt >= dstLen || (ptr + len > buffer.end)))
6058         goto error;
6059 
6060       cnt --; // subtract item stored in header
6061 
6062       for (Uint32 i = 0; i < cnt/2; i++)
6063       {
6064         * dst++ = (* ptr) & 0xFFFF;
6065         * dst++ = (* ptr) >> 16;
6066         ptr++;
6067       }
6068 
6069       if (cnt & 1)
6070       {
6071         * dst ++ = * ptr & 0xFFFF;
6072         ptr++;
6073       }
6074 
6075       cnt ++; // readd item stored in header
6076     }
6077     buffer.ptr = ptr;
6078     return cnt;
6079   }
6080   return 0;
6081 
6082 error:
6083   return dstLen + 1;
6084 }
6085 
6086 /**
6087  * This fuctions takes an array of attrinfo, and builds "header"
6088  *   which can be used to do random access inside the row
6089  */
6090 Uint32
buildRowHeader(RowPtr::Header * header,SegmentedSectionPtr ptr)6091 Dbspj::buildRowHeader(RowPtr::Header * header, SegmentedSectionPtr ptr)
6092 {
6093   Uint32 tmp, len;
6094   Uint32 * dst = header->m_offset;
6095   const Uint32 * const save = dst;
6096   SectionReader r0(ptr, getSectionSegmentPool());
6097   Uint32 offset = 0;
6098   do
6099   {
6100     * dst++ = offset;
6101     r0.getWord(&tmp);
6102     len = AttributeHeader::getDataSize(tmp);
6103     offset += 1 + len;
6104   } while (r0.step(len));
6105 
6106   return header->m_len = static_cast<Uint32>(dst - save);
6107 }
6108 
6109 /**
6110  * This fuctions takes an array of attrinfo, and builds "header"
6111  *   which can be used to do random access inside the row
6112  */
6113 Uint32
buildRowHeader(RowPtr::Header * header,const Uint32 * & src,Uint32 len)6114 Dbspj::buildRowHeader(RowPtr::Header * header, const Uint32 *& src, Uint32 len)
6115 {
6116   Uint32 * dst = header->m_offset;
6117   const Uint32 * save = dst;
6118   Uint32 offset = 0;
6119   for (Uint32 i = 0; i<len; i++)
6120   {
6121     * dst ++ = offset;
6122     Uint32 tmp = * src++;
6123     Uint32 tmp_len = AttributeHeader::getDataSize(tmp);
6124     offset += 1 + tmp_len;
6125     src += tmp_len;
6126   }
6127 
6128   return header->m_len = static_cast<Uint32>(dst - save);
6129 }
6130 
6131 Uint32
appendToPattern(Local_pattern_store & pattern,DABuffer & tree,Uint32 len)6132 Dbspj::appendToPattern(Local_pattern_store & pattern,
6133                        DABuffer & tree, Uint32 len)
6134 {
6135   if (unlikely(tree.ptr + len > tree.end))
6136     return DbspjErr::InvalidTreeNodeSpecification;
6137 
6138   if (unlikely(pattern.append(tree.ptr, len)==0))
6139     return  DbspjErr::OutOfQueryMemory;
6140 
6141   tree.ptr += len;
6142   return 0;
6143 }
6144 
6145 Uint32
appendParamToPattern(Local_pattern_store & dst,const RowPtr::Linear & row,Uint32 col)6146 Dbspj::appendParamToPattern(Local_pattern_store& dst,
6147                             const RowPtr::Linear & row, Uint32 col)
6148 {
6149   /**
6150    * TODO handle errors
6151    */
6152   Uint32 offset = row.m_header->m_offset[col];
6153   const Uint32 * ptr = row.m_data + offset;
6154   Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6155   /* Param COL's converted to DATA when appended to pattern */
6156   Uint32 info = QueryPattern::data(len);
6157   return dst.append(&info,1) && dst.append(ptr,len) ? 0 : DbspjErr::OutOfQueryMemory;
6158 }
6159 
6160 Uint32
appendParamHeadToPattern(Local_pattern_store & dst,const RowPtr::Linear & row,Uint32 col)6161 Dbspj::appendParamHeadToPattern(Local_pattern_store& dst,
6162                                 const RowPtr::Linear & row, Uint32 col)
6163 {
6164   /**
6165    * TODO handle errors
6166    */
6167   Uint32 offset = row.m_header->m_offset[col];
6168   const Uint32 * ptr = row.m_data + offset;
6169   Uint32 len = AttributeHeader::getDataSize(*ptr);
6170   /* Param COL's converted to DATA when appended to pattern */
6171   Uint32 info = QueryPattern::data(len+1);
6172   return dst.append(&info,1) && dst.append(ptr,len+1) ? 0 : DbspjErr::OutOfQueryMemory;
6173 }
6174 
6175 Uint32
appendTreeToSection(Uint32 & ptrI,SectionReader & tree,Uint32 len)6176 Dbspj::appendTreeToSection(Uint32 & ptrI, SectionReader & tree, Uint32 len)
6177 {
6178   /**
6179    * TODO handle errors
6180    */
6181   Uint32 SZ = 16;
6182   Uint32 tmp[16];
6183   while (len > SZ)
6184   {
6185     jam();
6186     tree.getWords(tmp, SZ);
6187     ndbrequire(appendToSection(ptrI, tmp, SZ));
6188     len -= SZ;
6189   }
6190 
6191   tree.getWords(tmp, len);
6192   return appendToSection(ptrI, tmp, len) ? 0 : /** todo error code */ 1;
6193 #if TODO
6194 err:
6195   return 1;
6196 #endif
6197 }
6198 
6199 void
getCorrelationData(const RowPtr::Section & row,Uint32 col,Uint32 & correlationNumber)6200 Dbspj::getCorrelationData(const RowPtr::Section & row,
6201                           Uint32 col,
6202                           Uint32& correlationNumber)
6203 {
6204   /**
6205    * TODO handle errors
6206    */
6207   SegmentedSectionPtr ptr(row.m_dataPtr);
6208   SectionReader reader(ptr, getSectionSegmentPool());
6209   Uint32 offset = row.m_header->m_offset[col];
6210   ndbrequire(reader.step(offset));
6211   Uint32 tmp;
6212   ndbrequire(reader.getWord(&tmp));
6213   Uint32 len = AttributeHeader::getDataSize(tmp);
6214   ndbrequire(len == 1);
6215   ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32);
6216   ndbrequire(reader.getWord(&correlationNumber));
6217 }
6218 
6219 void
getCorrelationData(const RowPtr::Linear & row,Uint32 col,Uint32 & correlationNumber)6220 Dbspj::getCorrelationData(const RowPtr::Linear & row,
6221                           Uint32 col,
6222                           Uint32& correlationNumber)
6223 {
6224   /**
6225    * TODO handle errors
6226    */
6227   Uint32 offset = row.m_header->m_offset[col];
6228   Uint32 tmp = row.m_data[offset];
6229   Uint32 len = AttributeHeader::getDataSize(tmp);
6230   ndbrequire(len == 1);
6231   ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32);
6232   correlationNumber = row.m_data[offset+1];
6233 }
6234 
6235 Uint32
appendColToSection(Uint32 & dst,const RowPtr::Section & row,Uint32 col,bool & hasNull)6236 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Section & row,
6237                           Uint32 col, bool& hasNull)
6238 {
6239   /**
6240    * TODO handle errors
6241    */
6242   SegmentedSectionPtr ptr(row.m_dataPtr);
6243   SectionReader reader(ptr, getSectionSegmentPool());
6244   Uint32 offset = row.m_header->m_offset[col];
6245   ndbrequire(reader.step(offset));
6246   Uint32 tmp;
6247   ndbrequire(reader.getWord(&tmp));
6248   Uint32 len = AttributeHeader::getDataSize(tmp);
6249   if (unlikely(len==0))
6250   {
6251     jam();
6252     hasNull = true;  // NULL-value in key
6253     return 0;
6254   }
6255   return appendTreeToSection(dst, reader, len);
6256 }
6257 
6258 Uint32
appendColToSection(Uint32 & dst,const RowPtr::Linear & row,Uint32 col,bool & hasNull)6259 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Linear & row,
6260                           Uint32 col, bool& hasNull)
6261 {
6262   /**
6263    * TODO handle errors
6264    */
6265   Uint32 offset = row.m_header->m_offset[col];
6266   const Uint32 * ptr = row.m_data + offset;
6267   Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6268   if (unlikely(len==0))
6269   {
6270     jam();
6271     hasNull = true;  // NULL-value in key
6272     return 0;
6273   }
6274   return appendToSection(dst, ptr, len) ? 0 : DbspjErr::InvalidPattern;
6275 }
6276 
6277 Uint32
appendAttrinfoToSection(Uint32 & dst,const RowPtr::Linear & row,Uint32 col,bool & hasNull)6278 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Linear & row,
6279                                Uint32 col, bool& hasNull)
6280 {
6281   /**
6282    * TODO handle errors
6283    */
6284   Uint32 offset = row.m_header->m_offset[col];
6285   const Uint32 * ptr = row.m_data + offset;
6286   Uint32 len = AttributeHeader::getDataSize(* ptr);
6287   if (unlikely(len==0))
6288   {
6289     jam();
6290     hasNull = true;  // NULL-value in key
6291   }
6292   return appendToSection(dst, ptr, 1 + len) ? 0 : DbspjErr::InvalidPattern;
6293 }
6294 
6295 Uint32
appendAttrinfoToSection(Uint32 & dst,const RowPtr::Section & row,Uint32 col,bool & hasNull)6296 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Section & row,
6297                                Uint32 col, bool& hasNull)
6298 {
6299   /**
6300    * TODO handle errors
6301    */
6302   SegmentedSectionPtr ptr(row.m_dataPtr);
6303   SectionReader reader(ptr, getSectionSegmentPool());
6304   Uint32 offset = row.m_header->m_offset[col];
6305   ndbrequire(reader.step(offset));
6306   Uint32 tmp;
6307   ndbrequire(reader.peekWord(&tmp));
6308   Uint32 len = AttributeHeader::getDataSize(tmp);
6309   if (unlikely(len==0))
6310   {
6311     jam();
6312     hasNull = true;  // NULL-value in key
6313   }
6314   return appendTreeToSection(dst, reader, 1 + len);
6315 }
6316 
6317 /**
6318  * 'PkCol' is the composite NDB$PK column in an unique index consisting of
6319  * a fragment id and the composite PK value (all PK columns concatenated)
6320  */
6321 Uint32
appendPkColToSection(Uint32 & dst,const RowPtr::Section & row,Uint32 col)6322 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Section & row, Uint32 col)
6323 {
6324   /**
6325    * TODO handle errors
6326    */
6327   SegmentedSectionPtr ptr(row.m_dataPtr);
6328   SectionReader reader(ptr, getSectionSegmentPool());
6329   Uint32 offset = row.m_header->m_offset[col];
6330   ndbrequire(reader.step(offset));
6331   Uint32 tmp;
6332   ndbrequire(reader.getWord(&tmp));
6333   Uint32 len = AttributeHeader::getDataSize(tmp);
6334   ndbrequire(len>1);  // NULL-value in PkKey is an error
6335   ndbrequire(reader.step(1)); // Skip fragid
6336   return appendTreeToSection(dst, reader, len-1);
6337 }
6338 
6339 /**
6340  * 'PkCol' is the composite NDB$PK column in an unique index consisting of
6341  * a fragment id and the composite PK value (all PK columns concatenated)
6342  */
6343 Uint32
appendPkColToSection(Uint32 & dst,const RowPtr::Linear & row,Uint32 col)6344 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Linear & row, Uint32 col)
6345 {
6346   Uint32 offset = row.m_header->m_offset[col];
6347   Uint32 tmp = row.m_data[offset];
6348   Uint32 len = AttributeHeader::getDataSize(tmp);
6349   ndbrequire(len>1);  // NULL-value in PkKey is an error
6350   return appendToSection(dst, row.m_data+offset+2, len - 1) ? 0 : /** todo error code */ 1;
6351 }
6352 
6353 Uint32
appendFromParent(Uint32 & dst,Local_pattern_store & pattern,Local_pattern_store::ConstDataBufferIterator & it,Uint32 levels,const RowPtr & rowptr,bool & hasNull)6354 Dbspj::appendFromParent(Uint32 & dst, Local_pattern_store& pattern,
6355                         Local_pattern_store::ConstDataBufferIterator& it,
6356                         Uint32 levels, const RowPtr & rowptr,
6357                         bool& hasNull)
6358 {
6359   Ptr<TreeNode> treeNodePtr;
6360   m_treenode_pool.getPtr(treeNodePtr, rowptr.m_src_node_ptrI);
6361   Uint32 corrVal = rowptr.m_src_correlation;
6362   RowPtr targetRow;
6363   while (levels--)
6364   {
6365     jam();
6366     if (unlikely(treeNodePtr.p->m_parentPtrI == RNIL))
6367     {
6368       DEBUG_CRASH();
6369       return DbspjErr::InvalidPattern;
6370     }
6371     m_treenode_pool.getPtr(treeNodePtr, treeNodePtr.p->m_parentPtrI);
6372     if (unlikely((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0))
6373     {
6374       DEBUG_CRASH();
6375       return DbspjErr::InvalidPattern;
6376     }
6377 
6378     RowRef ref;
6379     treeNodePtr.p->m_row_map.copyto(ref);
6380     Uint32 allocator = ref.m_allocator;
6381     const Uint32 * mapptr;
6382     if (allocator == 0)
6383     {
6384       jam();
6385       mapptr = get_row_ptr_stack(ref);
6386     }
6387     else
6388     {
6389       jam();
6390       mapptr = get_row_ptr_var(ref);
6391     }
6392 
6393     Uint32 pos = corrVal >> 16; // parent corr-val
6394     if (unlikely(! (pos < treeNodePtr.p->m_row_map.m_size)))
6395     {
6396       DEBUG_CRASH();
6397       return DbspjErr::InvalidPattern;
6398     }
6399 
6400     // load ref to parent row
6401     treeNodePtr.p->m_row_map.load(mapptr, pos, ref);
6402 
6403     const Uint32 * rowptr;
6404     if (allocator == 0)
6405     {
6406       jam();
6407       rowptr = get_row_ptr_stack(ref);
6408     }
6409     else
6410     {
6411       jam();
6412       rowptr = get_row_ptr_var(ref);
6413     }
6414     setupRowPtr(treeNodePtr, targetRow, ref, rowptr);
6415 
6416     if (levels)
6417     {
6418       jam();
6419       getCorrelationData(targetRow.m_row_data.m_linear,
6420                          targetRow.m_row_data.m_linear.m_header->m_len - 1,
6421                          corrVal);
6422     }
6423   }
6424 
6425   if (unlikely(it.isNull()))
6426   {
6427     DEBUG_CRASH();
6428     return DbspjErr::InvalidPattern;
6429   }
6430 
6431   Uint32 info = *it.data;
6432   Uint32 type = QueryPattern::getType(info);
6433   Uint32 val = QueryPattern::getLength(info);
6434   pattern.next(it);
6435   switch(type){
6436   case QueryPattern::P_COL:
6437     jam();
6438     return appendColToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6439     break;
6440   case QueryPattern::P_UNQ_PK:
6441     jam();
6442     return appendPkColToSection(dst, targetRow.m_row_data.m_linear, val);
6443     break;
6444   case QueryPattern::P_ATTRINFO:
6445     jam();
6446     return appendAttrinfoToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6447     break;
6448   case QueryPattern::P_DATA:
6449     jam();
6450     // retreiving DATA from parent...is...an error
6451     break;
6452   case QueryPattern::P_PARENT:
6453     jam();
6454     // no point in nesting P_PARENT...an error
6455     break;
6456   case QueryPattern::P_PARAM:
6457   case QueryPattern::P_PARAM_HEADER:
6458     jam();
6459     // should have been expanded during build
6460     break;
6461   }
6462 
6463   DEBUG_CRASH();
6464   return DbspjErr::InvalidPattern;
6465 }
6466 
6467 Uint32
appendDataToSection(Uint32 & ptrI,Local_pattern_store & pattern,Local_pattern_store::ConstDataBufferIterator & it,Uint32 len,bool & hasNull)6468 Dbspj::appendDataToSection(Uint32 & ptrI,
6469                            Local_pattern_store& pattern,
6470                            Local_pattern_store::ConstDataBufferIterator& it,
6471                            Uint32 len, bool& hasNull)
6472 {
6473   if (unlikely(len==0))
6474   {
6475     jam();
6476     hasNull = true;
6477     return 0;
6478   }
6479 
6480 #if 0
6481   /**
6482    * TODO handle errors
6483    */
6484   Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6485   while (len > NDB_SECTION_SEGMENT_SZ)
6486   {
6487     pattern.copyout(tmp, NDB_SECTION_SEGMENT_SZ, it);
6488     appendToSection(ptrI, tmp, NDB_SECTION_SEGMENT_SZ);
6489     len -= NDB_SECTION_SEGMENT_SZ;
6490   }
6491 
6492   pattern.copyout(tmp, len, it);
6493   appendToSection(ptrI, tmp, len);
6494   return 0;
6495 #else
6496   Uint32 remaining = len;
6497   Uint32 dstIdx = 0;
6498   Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6499 
6500   while (remaining > 0 && !it.isNull())
6501   {
6502     tmp[dstIdx] = *it.data;
6503     remaining--;
6504     dstIdx++;
6505     pattern.next(it);
6506     if (dstIdx == NDB_SECTION_SEGMENT_SZ || remaining == 0)
6507     {
6508       if (!appendToSection(ptrI, tmp, dstIdx))
6509       {
6510         DEBUG_CRASH();
6511         return DbspjErr::InvalidPattern;
6512       }
6513       dstIdx = 0;
6514     }
6515   }
6516   if (remaining > 0)
6517   {
6518     DEBUG_CRASH();
6519     return DbspjErr::InvalidPattern;
6520   }
6521   else
6522   {
6523     return 0;
6524   }
6525 #endif
6526 }
6527 
6528 Uint32
createEmptySection(Uint32 & dst)6529 Dbspj::createEmptySection(Uint32 & dst)
6530 {
6531   Uint32 tmp;
6532   SegmentedSectionPtr ptr;
6533   if (likely(import(ptr, &tmp, 0)))
6534   {
6535     jam();
6536     dst = ptr.i;
6537     return 0;
6538   }
6539 
6540   jam();
6541   return DbspjErr::OutOfSectionMemory;
6542 }
6543 
6544 /**
6545  * This function takes a pattern and a row and expands it into a section
6546  */
6547 Uint32
expandS(Uint32 & _dst,Local_pattern_store & pattern,const RowPtr & row,bool & hasNull)6548 Dbspj::expandS(Uint32 & _dst, Local_pattern_store& pattern,
6549                const RowPtr & row, bool& hasNull)
6550 {
6551   Uint32 err;
6552   Uint32 dst = _dst;
6553   hasNull = false;
6554   Local_pattern_store::ConstDataBufferIterator it;
6555   pattern.first(it);
6556   while (!it.isNull())
6557   {
6558     Uint32 info = *it.data;
6559     Uint32 type = QueryPattern::getType(info);
6560     Uint32 val = QueryPattern::getLength(info);
6561     pattern.next(it);
6562     switch(type){
6563     case QueryPattern::P_COL:
6564       jam();
6565       err = appendColToSection(dst, row.m_row_data.m_section, val, hasNull);
6566       break;
6567     case QueryPattern::P_UNQ_PK:
6568       jam();
6569       err = appendPkColToSection(dst, row.m_row_data.m_section, val);
6570       break;
6571     case QueryPattern::P_ATTRINFO:
6572       jam();
6573       err = appendAttrinfoToSection(dst, row.m_row_data.m_section, val, hasNull);
6574       break;
6575     case QueryPattern::P_DATA:
6576       jam();
6577       err = appendDataToSection(dst, pattern, it, val, hasNull);
6578       break;
6579     case QueryPattern::P_PARENT:
6580       jam();
6581       // P_PARENT is a prefix to another pattern token
6582       // that permits code to access rows from earlier than immediate parent.
6583       // val is no of levels to move up the tree
6584       err = appendFromParent(dst, pattern, it, val, row, hasNull);
6585       break;
6586       // PARAM's was converted to DATA by ::expand(pattern...)
6587     case QueryPattern::P_PARAM:
6588     case QueryPattern::P_PARAM_HEADER:
6589     default:
6590       jam();
6591       err = DbspjErr::InvalidPattern;
6592       DEBUG_CRASH();
6593     }
6594     if (unlikely(err != 0))
6595     {
6596       jam();
6597       DEBUG_CRASH();
6598       goto error;
6599     }
6600   }
6601 
6602   _dst = dst;
6603   return 0;
6604 error:
6605   jam();
6606   return err;
6607 }
6608 
6609 /**
6610  * This function takes a pattern and a row and expands it into a section
6611  */
6612 Uint32
expandL(Uint32 & _dst,Local_pattern_store & pattern,const RowPtr & row,bool & hasNull)6613 Dbspj::expandL(Uint32 & _dst, Local_pattern_store& pattern,
6614                const RowPtr & row, bool& hasNull)
6615 {
6616   Uint32 err;
6617   Uint32 dst = _dst;
6618   hasNull = false;
6619   Local_pattern_store::ConstDataBufferIterator it;
6620   pattern.first(it);
6621   while (!it.isNull())
6622   {
6623     Uint32 info = *it.data;
6624     Uint32 type = QueryPattern::getType(info);
6625     Uint32 val = QueryPattern::getLength(info);
6626     pattern.next(it);
6627     switch(type){
6628     case QueryPattern::P_COL:
6629       jam();
6630       err = appendColToSection(dst, row.m_row_data.m_linear, val, hasNull);
6631       break;
6632     case QueryPattern::P_UNQ_PK:
6633       jam();
6634       err = appendPkColToSection(dst, row.m_row_data.m_linear, val);
6635       break;
6636     case QueryPattern::P_ATTRINFO:
6637       jam();
6638       err = appendAttrinfoToSection(dst, row.m_row_data.m_linear, val, hasNull);
6639       break;
6640     case QueryPattern::P_DATA:
6641       jam();
6642       err = appendDataToSection(dst, pattern, it, val, hasNull);
6643       break;
6644     case QueryPattern::P_PARENT:
6645       jam();
6646       // P_PARENT is a prefix to another pattern token
6647       // that permits code to access rows from earlier than immediate parent
6648       // val is no of levels to move up the tree
6649       err = appendFromParent(dst, pattern, it, val, row, hasNull);
6650       break;
6651       // PARAM's was converted to DATA by ::expand(pattern...)
6652     case QueryPattern::P_PARAM:
6653     case QueryPattern::P_PARAM_HEADER:
6654     default:
6655       jam();
6656       err = DbspjErr::InvalidPattern;
6657       DEBUG_CRASH();
6658     }
6659     if (unlikely(err != 0))
6660     {
6661       jam();
6662       DEBUG_CRASH();
6663       goto error;
6664     }
6665   }
6666 
6667   _dst = dst;
6668   return 0;
6669 error:
6670   jam();
6671   return err;
6672 }
6673 
6674 Uint32
expand(Uint32 & ptrI,DABuffer & pattern,Uint32 len,DABuffer & param,Uint32 paramCnt,bool & hasNull)6675 Dbspj::expand(Uint32 & ptrI, DABuffer& pattern, Uint32 len,
6676               DABuffer& param, Uint32 paramCnt, bool& hasNull)
6677 {
6678   /**
6679    * TODO handle error
6680    */
6681   Uint32 err;
6682   Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6683   struct RowPtr::Linear row;
6684   row.m_data = param.ptr;
6685   row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6686   buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6687 
6688   Uint32 dst = ptrI;
6689   const Uint32 * ptr = pattern.ptr;
6690   const Uint32 * end = ptr + len;
6691   hasNull = false;
6692 
6693   for (; ptr < end; )
6694   {
6695     Uint32 info = * ptr++;
6696     Uint32 type = QueryPattern::getType(info);
6697     Uint32 val = QueryPattern::getLength(info);
6698     switch(type){
6699     case QueryPattern::P_PARAM:
6700       jam();
6701       ndbassert(val < paramCnt);
6702       err = appendColToSection(dst, row, val, hasNull);
6703       break;
6704     case QueryPattern::P_PARAM_HEADER:
6705       jam();
6706       ndbassert(val < paramCnt);
6707       err = appendAttrinfoToSection(dst, row, val, hasNull);
6708       break;
6709     case QueryPattern::P_DATA:
6710       if (unlikely(val==0))
6711       {
6712         jam();
6713         hasNull = true;
6714         err = 0;
6715       }
6716       else if (likely(appendToSection(dst, ptr, val)))
6717       {
6718         jam();
6719         err = 0;
6720       }
6721       else
6722       {
6723         jam();
6724         err = DbspjErr::InvalidPattern;
6725       }
6726       ptr += val;
6727       break;
6728     case QueryPattern::P_COL:    // (linked) COL's not expected here
6729     case QueryPattern::P_PARENT: // Prefix to P_COL
6730     case QueryPattern::P_ATTRINFO:
6731     case QueryPattern::P_UNQ_PK:
6732     default:
6733       jam();
6734       jamLine(type);
6735       err = DbspjErr::InvalidPattern;
6736       DEBUG_CRASH();
6737     }
6738     if (unlikely(err != 0))
6739     {
6740       jam();
6741       DEBUG_CRASH();
6742       goto error;
6743     }
6744   }
6745 
6746   /**
6747    * Iterate forward
6748    */
6749   pattern.ptr = end;
6750 
6751 error:
6752   jam();
6753   ptrI = dst;
6754   return err;
6755 }
6756 
6757 Uint32
expand(Local_pattern_store & dst,Ptr<TreeNode> treeNodePtr,DABuffer & pattern,Uint32 len,DABuffer & param,Uint32 paramCnt)6758 Dbspj::expand(Local_pattern_store& dst, Ptr<TreeNode> treeNodePtr,
6759               DABuffer& pattern, Uint32 len,
6760               DABuffer& param, Uint32 paramCnt)
6761 {
6762   /**
6763    * TODO handle error
6764    */
6765   Uint32 err;
6766   Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6767   struct RowPtr::Linear row;
6768   row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6769   row.m_data = param.ptr;
6770   buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6771 
6772   const Uint32 * end = pattern.ptr + len;
6773   for (; pattern.ptr < end; )
6774   {
6775     Uint32 info = *pattern.ptr;
6776     Uint32 type = QueryPattern::getType(info);
6777     Uint32 val = QueryPattern::getLength(info);
6778     switch(type){
6779     case QueryPattern::P_COL:
6780     case QueryPattern::P_UNQ_PK:
6781     case QueryPattern::P_ATTRINFO:
6782       jam();
6783       err = appendToPattern(dst, pattern, 1);
6784       break;
6785     case QueryPattern::P_DATA:
6786       jam();
6787       err = appendToPattern(dst, pattern, val+1);
6788       break;
6789     case QueryPattern::P_PARAM:
6790       jam();
6791       // NOTE: Converted to P_DATA by appendParamToPattern
6792       ndbassert(val < paramCnt);
6793       err = appendParamToPattern(dst, row, val);
6794       pattern.ptr++;
6795       break;
6796     case QueryPattern::P_PARAM_HEADER:
6797       jam();
6798       // NOTE: Converted to P_DATA by appendParamHeadToPattern
6799       ndbassert(val < paramCnt);
6800       err = appendParamHeadToPattern(dst, row, val);
6801       pattern.ptr++;
6802       break;
6803     case QueryPattern::P_PARENT: // Prefix to P_COL
6804     {
6805       jam();
6806       err = appendToPattern(dst, pattern, 1);
6807 
6808       // Locate requested grandparent and request it to
6809       // T_ROW_BUFFER its result rows
6810       Ptr<TreeNode> parentPtr;
6811       m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
6812       while (val--)
6813       {
6814         jam();
6815         ndbassert(parentPtr.p->m_parentPtrI != RNIL);
6816         m_treenode_pool.getPtr(parentPtr, parentPtr.p->m_parentPtrI);
6817         parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER;
6818         parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER_MAP;
6819       }
6820       Ptr<Request> requestPtr;
6821       m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
6822       requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS;
6823       break;
6824     }
6825     default:
6826       jam();
6827       err = DbspjErr::InvalidPattern;
6828       DEBUG_CRASH();
6829     }
6830 
6831     if (unlikely(err != 0))
6832     {
6833       DEBUG_CRASH();
6834       goto error;
6835     }
6836   }
6837   return 0;
6838 
6839 error:
6840   jam();
6841   return err;
6842 }
6843 
6844 Uint32
parseDA(Build_context & ctx,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,DABuffer & tree,Uint32 treeBits,DABuffer & param,Uint32 paramBits)6845 Dbspj::parseDA(Build_context& ctx,
6846                Ptr<Request> requestPtr,
6847                Ptr<TreeNode> treeNodePtr,
6848                DABuffer& tree, Uint32 treeBits,
6849                DABuffer& param, Uint32 paramBits)
6850 {
6851   Uint32 err;
6852   Uint32 attrInfoPtrI = RNIL;
6853   Uint32 attrParamPtrI = RNIL;
6854 
6855   do
6856   {
6857     if (treeBits & DABits::NI_REPEAT_SCAN_RESULT)
6858     {
6859       jam();
6860       DEBUG("use REPEAT_SCAN_RESULT when returning results");
6861       requestPtr.p->m_bits |= Request::RT_REPEAT_SCAN_RESULT;
6862     } // DABits::NI_HAS_PARENT
6863 
6864     if (treeBits & DABits::NI_HAS_PARENT)
6865     {
6866       jam();
6867       DEBUG("NI_HAS_PARENT");
6868       /**
6869        * OPTIONAL PART 1:
6870        *
6871        * Parent nodes are stored first in optional part
6872        *   this is a list of 16-bit numbers refering to
6873        *   *earlier* nodes in tree
6874        *   the list stores length of list as first 16-bit
6875        */
6876       err = DbspjErr::InvalidTreeNodeSpecification;
6877       Uint32 dst[63];
6878       Uint32 cnt = unpackList(NDB_ARRAY_SIZE(dst), dst, tree);
6879       if (unlikely(cnt > NDB_ARRAY_SIZE(dst)))
6880       {
6881         DEBUG_CRASH();
6882         break;
6883       }
6884 
6885       err = 0;
6886 
6887       if (unlikely(cnt!=1))
6888       {
6889         /**
6890          * Only a single parent supported for now, i.e only trees
6891          */
6892         DEBUG_CRASH();
6893       }
6894 
6895       for (Uint32 i = 0; i<cnt; i++)
6896       {
6897         DEBUG("adding " << dst[i] << " as parent");
6898         Ptr<TreeNode> parentPtr = ctx.m_node_list[dst[i]];
6899         LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
6900         Local_dependency_map map(pool, parentPtr.p->m_dependent_nodes);
6901         if (unlikely(!map.append(&treeNodePtr.i, 1)))
6902         {
6903           err = DbspjErr::OutOfQueryMemory;
6904           DEBUG_CRASH();
6905           break;
6906         }
6907         parentPtr.p->m_bits &= ~(Uint32)TreeNode::T_LEAF;
6908         treeNodePtr.p->m_parentPtrI = parentPtr.i;
6909 
6910         // Build Bitmask of all ancestors to treeNode
6911         treeNodePtr.p->m_ancestors = parentPtr.p->m_ancestors;
6912         treeNodePtr.p->m_ancestors.set(parentPtr.p->m_node_no);
6913       }
6914 
6915       if (unlikely(err != 0))
6916         break;
6917     } // DABits::NI_HAS_PARENT
6918 
6919     err = DbspjErr::InvalidTreeParametersSpecificationKeyParamBitsMissmatch;
6920     if (unlikely( ((treeBits  & DABits::NI_KEY_PARAMS)==0) !=
6921                   ((paramBits & DABits::PI_KEY_PARAMS)==0)))
6922     {
6923       DEBUG_CRASH();
6924       break;
6925     }
6926 
6927     if (treeBits & (DABits::NI_KEY_PARAMS
6928                     | DABits::NI_KEY_LINKED
6929                     | DABits::NI_KEY_CONSTS))
6930     {
6931       jam();
6932       DEBUG("NI_KEY_PARAMS | NI_KEY_LINKED | NI_KEY_CONSTS");
6933 
6934       /**
6935        * OPTIONAL PART 2:
6936        *
6937        * If keys are parametrized or linked
6938        *   DATA0[LO/HI] - Length of key pattern/#parameters to key
6939        */
6940       Uint32 len_cnt = * tree.ptr ++;
6941       Uint32 len = len_cnt & 0xFFFF; // length of pattern in words
6942       Uint32 cnt = len_cnt >> 16;    // no of parameters
6943 
6944       LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
6945       Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
6946 
6947       err = DbspjErr::InvalidTreeParametersSpecificationIncorrectKeyParamCount;
6948       if (unlikely( ((cnt==0) != ((treeBits & DABits::NI_KEY_PARAMS) == 0)) ||
6949                     ((cnt==0) != ((paramBits & DABits::PI_KEY_PARAMS) == 0))))
6950       {
6951         DEBUG_CRASH();
6952         break;
6953       }
6954 
6955       if (treeBits & DABits::NI_KEY_LINKED)
6956       {
6957         jam();
6958         DEBUG("LINKED-KEY PATTERN w/ " << cnt << " PARAM values");
6959         /**
6960          * Expand pattern into a new pattern (with linked values)
6961          */
6962         err = expand(pattern, treeNodePtr, tree, len, param, cnt);
6963 
6964         /**
6965          * This node constructs a new key for each send
6966          */
6967         treeNodePtr.p->m_bits |= TreeNode::T_KEYINFO_CONSTRUCTED;
6968       }
6969       else
6970       {
6971         jam();
6972         DEBUG("FIXED-KEY w/ " << cnt << " PARAM values");
6973         /**
6974          * Expand pattern directly into keyinfo
6975          *   This means a "fixed" key from here on
6976          */
6977         bool hasNull;
6978         Uint32 keyInfoPtrI = RNIL;
6979         err = expand(keyInfoPtrI, tree, len, param, cnt, hasNull);
6980         if (unlikely(hasNull))
6981         {
6982           /* API should have elliminated requests w/ const-NULL keys */
6983           jam();
6984           DEBUG("BEWARE: FIXED-key contain NULL values");
6985 //        treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE;
6986 //        break;
6987           ndbrequire(false);
6988         }
6989         treeNodePtr.p->m_send.m_keyInfoPtrI = keyInfoPtrI;
6990       }
6991 
6992       if (unlikely(err != 0))
6993       {
6994         DEBUG_CRASH();
6995         break;
6996       }
6997     } // DABits::NI_KEY_...
6998 
6999     const Uint32 mask =
7000       DABits::NI_LINKED_ATTR | DABits::NI_ATTR_INTERPRET |
7001       DABits::NI_ATTR_LINKED | DABits::NI_ATTR_PARAMS;
7002 
7003     if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0)
7004     {
7005       jam();
7006       /**
7007        * OPTIONAL PART 3: attrinfo handling
7008        * - NI_LINKED_ATTR - these are attributes to be passed to children
7009        * - PI_ATTR_LIST   - this is "user-columns" (passed as parameters)
7010 
7011        * - NI_ATTR_INTERPRET - tree contains interpreted program
7012        * - NI_ATTR_LINKED - means that the attr-info contains linked-values
7013        * - NI_ATTR_PARAMS - means that the attr-info is parameterized
7014        *   PI_ATTR_PARAMS - means that the parameters contains attr parameters
7015        *
7016        * IF NI_ATTR_INTERPRET
7017        *   DATA0[LO/HI] = Length of program / total #arguments to program
7018        *   DATA1..N     = Program
7019        *
7020        * IF NI_ATTR_PARAMS
7021        *   DATA0[LO/HI] = Length / #param
7022        *   DATA1..N     = PARAM-0...PARAM-M
7023        *
7024        * IF PI_ATTR_INTERPRET
7025        *   DATA0[LO/HI] = Length of program / Length of subroutine-part
7026        *   DATA1..N     = Program (scan filter)
7027        *
7028        * IF NI_ATTR_LINKED
7029        *   DATA0[LO/HI] = Length / #
7030        *
7031        *
7032        */
7033       Uint32 sections[5] = { 0, 0, 0, 0, 0 };
7034       Uint32 * sectionptrs = 0;
7035 
7036       bool interpreted =
7037         (treeBits & DABits::NI_ATTR_INTERPRET) ||
7038         (paramBits & DABits::PI_ATTR_INTERPRET) ||
7039         (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED);
7040 
7041       if (interpreted)
7042       {
7043         /**
7044          * Add section headers for interpreted execution
7045          *   and create pointer so that they can be updated later
7046          */
7047         jam();
7048         err = DbspjErr::OutOfSectionMemory;
7049         if (unlikely(!appendToSection(attrInfoPtrI, sections, 5)))
7050         {
7051           DEBUG_CRASH();
7052           break;
7053         }
7054 
7055         SegmentedSectionPtr ptr;
7056         getSection(ptr, attrInfoPtrI);
7057         sectionptrs = ptr.p->theData;
7058 
7059         if (treeBits & DABits::NI_ATTR_INTERPRET)
7060         {
7061           jam();
7062 
7063           /**
7064            * Having two interpreter programs is an error.
7065            */
7066           err = DbspjErr::BothTreeAndParametersContainInterpretedProgram;
7067           if (unlikely(paramBits & DABits::PI_ATTR_INTERPRET))
7068           {
7069             DEBUG_CRASH();
7070             break;
7071           }
7072 
7073           treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7074           Uint32 len2 = * tree.ptr++;
7075           Uint32 len_prg = len2 & 0xFFFF; // Length of interpret program
7076           Uint32 len_pattern = len2 >> 16;// Length of attr param pattern
7077           err = DbspjErr::OutOfSectionMemory;
7078           if (unlikely(!appendToSection(attrInfoPtrI, tree.ptr, len_prg)))
7079           {
7080             DEBUG_CRASH();
7081             break;
7082           }
7083 
7084           tree.ptr += len_prg;
7085           sectionptrs[1] = len_prg; // size of interpret program
7086 
7087           Uint32 tmp = * tree.ptr ++; // attr-pattern header
7088           Uint32 cnt = tmp & 0xFFFF;
7089 
7090           if (treeBits & DABits::NI_ATTR_LINKED)
7091           {
7092             jam();
7093             /**
7094              * Expand pattern into a new pattern (with linked values)
7095              */
7096             LocalArenaPoolImpl pool(requestPtr.p->m_arena,
7097                                     m_dependency_map_pool);
7098             Local_pattern_store pattern(pool,treeNodePtr.p->m_attrParamPattern);
7099             err = expand(pattern, treeNodePtr, tree, len_pattern, param, cnt);
7100             if (unlikely(err))
7101             {
7102               DEBUG_CRASH();
7103               break;
7104             }
7105             /**
7106              * This node constructs a new attr-info for each send
7107              */
7108             treeNodePtr.p->m_bits |= TreeNode::T_ATTRINFO_CONSTRUCTED;
7109           }
7110           else
7111           {
7112             jam();
7113             /**
7114              * Expand pattern directly into attr-info param
7115              *   This means a "fixed" attr-info param from here on
7116              */
7117             bool hasNull;
7118             err = expand(attrParamPtrI, tree, len_pattern, param, cnt, hasNull);
7119             if (unlikely(err))
7120             {
7121               DEBUG_CRASH();
7122               break;
7123             }
7124 //          ndbrequire(!hasNull);
7125           }
7126         }
7127         else // if (treeBits & DABits::NI_ATTR_INTERPRET)
7128         {
7129           jam();
7130           /**
7131            * Only relevant for interpreted stuff
7132            */
7133           ndbrequire((treeBits & DABits::NI_ATTR_PARAMS) == 0);
7134           ndbrequire((paramBits & DABits::PI_ATTR_PARAMS) == 0);
7135           ndbrequire((treeBits & DABits::NI_ATTR_LINKED) == 0);
7136 
7137           treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7138 
7139           if (! (paramBits & DABits::PI_ATTR_INTERPRET))
7140           {
7141             jam();
7142 
7143             /**
7144              * Tree node has interpreted execution,
7145              *   but no interpreted program specified
7146              *   auto-add Exit_ok (i.e return each row)
7147              */
7148             Uint32 tmp = Interpreter::ExitOK();
7149             err = DbspjErr::OutOfSectionMemory;
7150             if (unlikely(!appendToSection(attrInfoPtrI, &tmp, 1)))
7151             {
7152               DEBUG_CRASH();
7153               break;
7154             }
7155             sectionptrs[1] = 1;
7156           }
7157         } // if (treeBits & DABits::NI_ATTR_INTERPRET)
7158       } // if (interpreted)
7159 
7160       if (paramBits & DABits::PI_ATTR_INTERPRET)
7161       {
7162         jam();
7163 
7164         /**
7165          * Add the interpreted code that represents the scan filter.
7166          */
7167         const Uint32 len2 = * param.ptr++;
7168         Uint32 program_len = len2 & 0xFFFF;
7169         Uint32 subroutine_len = len2 >> 16;
7170         err = DbspjErr::OutOfSectionMemory;
7171         if (unlikely(!appendToSection(attrInfoPtrI, param.ptr, program_len)))
7172         {
7173           DEBUG_CRASH();
7174           break;
7175         }
7176         /**
7177          * The interpreted code is added is in the "Interpreted execute region"
7178          * of the attrinfo (see Dbtup::interpreterStartLab() for details).
7179          * It will thus execute before reading the attributes that constitutes
7180          * the projections.
7181          */
7182         sectionptrs[1] = program_len;
7183         param.ptr += program_len;
7184 
7185         if (subroutine_len)
7186         {
7187           if (unlikely(!appendToSection(attrParamPtrI,
7188                                         param.ptr, subroutine_len)))
7189           {
7190             DEBUG_CRASH();
7191             break;
7192           }
7193           sectionptrs[4] = subroutine_len;
7194           param.ptr += subroutine_len;
7195         }
7196         treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7197       }
7198 
7199       Uint32 sum_read = 0;
7200       Uint32 dst[MAX_ATTRIBUTES_IN_TABLE + 2];
7201 
7202       if (paramBits & DABits::PI_ATTR_LIST)
7203       {
7204         jam();
7205         Uint32 len = * param.ptr++;
7206         DEBUG("PI_ATTR_LIST");
7207 
7208         treeNodePtr.p->m_bits |= TreeNode::T_USER_PROJECTION;
7209         err = DbspjErr::OutOfSectionMemory;
7210         if (!appendToSection(attrInfoPtrI, param.ptr, len))
7211         {
7212           DEBUG_CRASH();
7213           break;
7214         }
7215 
7216         param.ptr += len;
7217 
7218         /**
7219          * Insert a flush of this partial result set
7220          */
7221         Uint32 flush[4];
7222         flush[0] = AttributeHeader::FLUSH_AI << 16;
7223         flush[1] = ctx.m_resultRef;
7224         flush[2] = ctx.m_resultData;
7225         flush[3] = ctx.m_senderRef; // RouteRef
7226         if (!appendToSection(attrInfoPtrI, flush, 4))
7227         {
7228           DEBUG_CRASH();
7229           break;
7230         }
7231 
7232         sum_read += len + 4;
7233       }
7234 
7235       if (treeBits & DABits::NI_LINKED_ATTR)
7236       {
7237         jam();
7238         DEBUG("NI_LINKED_ATTR");
7239         err = DbspjErr::InvalidTreeNodeSpecification;
7240         Uint32 cnt = unpackList(MAX_ATTRIBUTES_IN_TABLE, dst, tree);
7241         if (unlikely(cnt > MAX_ATTRIBUTES_IN_TABLE))
7242         {
7243           DEBUG_CRASH();
7244           break;
7245         }
7246 
7247         /**
7248          * AttributeHeader contains attrId in 16-higher bits
7249          */
7250         for (Uint32 i = 0; i<cnt; i++)
7251           dst[i] <<= 16;
7252 
7253         /**
7254          * Read correlation factor
7255          */
7256         dst[cnt++] = AttributeHeader::CORR_FACTOR32 << 16;
7257 
7258         err = DbspjErr::OutOfSectionMemory;
7259         if (!appendToSection(attrInfoPtrI, dst, cnt))
7260         {
7261           DEBUG_CRASH();
7262           break;
7263         }
7264 
7265         sum_read += cnt;
7266       }
7267 
7268       if (interpreted)
7269       {
7270         jam();
7271         /**
7272          * Let reads be performed *after* interpreted program
7273          *   i.e in "final read"-section
7274          */
7275         sectionptrs[3] = sum_read;
7276 
7277         if (attrParamPtrI != RNIL)
7278         {
7279           jam();
7280           ndbrequire(!(treeNodePtr.p->m_bits&TreeNode::T_ATTRINFO_CONSTRUCTED));
7281 
7282           SegmentedSectionPtr ptr;
7283           getSection(ptr, attrParamPtrI);
7284           {
7285             SectionReader r0(ptr, getSectionSegmentPool());
7286             err = appendTreeToSection(attrInfoPtrI, r0, ptr.sz);
7287             sectionptrs[4] = ptr.sz;
7288             if (unlikely(err != 0))
7289             {
7290               DEBUG_CRASH();
7291               break;
7292             }
7293           }
7294           releaseSection(attrParamPtrI);
7295         }
7296       }
7297 
7298       treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
7299     } // if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0)
7300 
7301     return 0;
7302   } while (0);
7303 
7304   return err;
7305 }
7306 
7307 /**
7308  * END - MODULE COMMON PARSE/UNPACK
7309  */
7310 
7311 /**
7312  * Process a scan request for an ndb$info table. (These are used for monitoring
7313  * purposes and do not contain application data.)
7314  */
execDBINFO_SCANREQ(Signal * signal)7315 void Dbspj::execDBINFO_SCANREQ(Signal *signal)
7316 {
7317   DbinfoScanReq req= * CAST_PTR(DbinfoScanReq, &signal->theData[0]);
7318   const Ndbinfo::ScanCursor* cursor =
7319     CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
7320   Ndbinfo::Ratelimit rl;
7321 
7322   jamEntry();
7323 
7324   switch(req.tableId){
7325 
7326     // The SPJ block only implements the ndbinfo.counters table.
7327   case Ndbinfo::COUNTERS_TABLEID:
7328   {
7329     Ndbinfo::counter_entry counters[] = {
7330       { Ndbinfo::SPJ_READS_RECEIVED_COUNTER,
7331         c_Counters.get_counter(CI_READS_RECEIVED) },
7332       { Ndbinfo::SPJ_LOCAL_READS_SENT_COUNTER,
7333         c_Counters.get_counter(CI_LOCAL_READS_SENT) },
7334       { Ndbinfo::SPJ_REMOTE_READS_SENT_COUNTER,
7335         c_Counters.get_counter(CI_REMOTE_READS_SENT) },
7336       { Ndbinfo::SPJ_READS_NOT_FOUND_COUNTER,
7337         c_Counters.get_counter(CI_READS_NOT_FOUND) },
7338       { Ndbinfo::SPJ_TABLE_SCANS_RECEIVED_COUNTER,
7339         c_Counters.get_counter(CI_TABLE_SCANS_RECEIVED) },
7340       { Ndbinfo::SPJ_LOCAL_TABLE_SCANS_SENT_COUNTER,
7341         c_Counters.get_counter(CI_LOCAL_TABLE_SCANS_SENT) },
7342       { Ndbinfo::SPJ_RANGE_SCANS_RECEIVED_COUNTER,
7343         c_Counters.get_counter(CI_RANGE_SCANS_RECEIVED) },
7344       { Ndbinfo::SPJ_LOCAL_RANGE_SCANS_SENT_COUNTER,
7345         c_Counters.get_counter(CI_LOCAL_RANGE_SCANS_SENT) },
7346       { Ndbinfo::SPJ_REMOTE_RANGE_SCANS_SENT_COUNTER,
7347         c_Counters.get_counter(CI_REMOTE_RANGE_SCANS_SENT) },
7348       { Ndbinfo::SPJ_SCAN_BATCHES_RETURNED_COUNTER,
7349         c_Counters.get_counter(CI_SCAN_BATCHES_RETURNED) },
7350       { Ndbinfo::SPJ_SCAN_ROWS_RETURNED_COUNTER,
7351         c_Counters.get_counter(CI_SCAN_ROWS_RETURNED) },
7352       { Ndbinfo::SPJ_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7353         c_Counters.get_counter(CI_PRUNED_RANGE_SCANS_RECEIVED) },
7354       { Ndbinfo::SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7355         c_Counters.get_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED) }
7356     };
7357     const size_t num_counters = sizeof(counters) / sizeof(counters[0]);
7358 
7359     Uint32 i = cursor->data[0];
7360     const BlockNumber bn = blockToMain(number());
7361     while(i < num_counters)
7362     {
7363       jam();
7364       Ndbinfo::Row row(signal, req);
7365       row.write_uint32(getOwnNodeId());
7366       row.write_uint32(bn);           // block number
7367       row.write_uint32(instance());   // block instance
7368       row.write_uint32(counters[i].id);
7369 
7370       row.write_uint64(counters[i].val);
7371       ndbinfo_send_row(signal, req, row, rl);
7372       i++;
7373       if (rl.need_break(req))
7374       {
7375         jam();
7376         ndbinfo_send_scan_break(signal, req, rl, i);
7377         return;
7378       }
7379     }
7380     break;
7381   }
7382 
7383   default:
7384     break;
7385   }
7386 
7387   ndbinfo_send_scan_conf(signal, req, rl);
7388 } // Dbspj::execDBINFO_SCANREQ(Signal *signal)
7389 
update(double sample)7390 void Dbspj::IncrementalStatistics::update(double sample)
7391 {
7392   // Prevent wrap-around
7393   if(m_noOfSamples < 0xffffffff)
7394   {
7395     m_noOfSamples++;
7396     const double delta = sample - m_mean;
7397     m_mean += delta/m_noOfSamples;
7398     m_sumSquare +=  delta * (sample - m_mean);
7399   }
7400 }
7401