1 /*
2 Copyright (c) 2004, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #define DBSPJ_C
26 #include "Dbspj.hpp"
27
28 #include <SectionReader.hpp>
29 #include <signaldata/LqhKey.hpp>
30 #include <signaldata/QueryTree.hpp>
31 #include <signaldata/TcKeyRef.hpp>
32 #include <signaldata/RouteOrd.hpp>
33 #include <signaldata/TransIdAI.hpp>
34 #include <signaldata/DiGetNodes.hpp>
35 #include <signaldata/DihScanTab.hpp>
36 #include <signaldata/AttrInfo.hpp>
37 #include <Interpreter.hpp>
38 #include <AttributeHeader.hpp>
39 #include <AttributeDescriptor.hpp>
40 #include <KeyDescriptor.hpp>
41 #include <md5_hash.hpp>
42 #include <signaldata/TcKeyConf.hpp>
43
44 #include <signaldata/NodeFailRep.hpp>
45 #include <signaldata/ReadNodesConf.hpp>
46
47 // Use DEBUG to print messages that should be
48 // seen only when we debug the product
49
50 #ifdef VM_TRACE
51
52 #define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
53 #define DEBUG_LQHKEYREQ
54 #define DEBUG_SCAN_FRAGREQ
55
56 #else
57
58 #define DEBUG(x)
59
60 #endif
61
62 #if 1
63 #define DEBUG_CRASH() ndbrequire(false)
64 #else
65 #define DEBUG_CRASH()
66 #endif
67
68 #if 1
69 #undef DEBUG
70 #define DEBUG(x)
71 #undef DEBUG_LQHKEYREQ
72 #undef DEBUG_SCAN_FRAGREQ
73 #endif
74
75 const Ptr<Dbspj::TreeNode> Dbspj::NullTreeNodePtr = { 0, RNIL };
76 const Dbspj::RowRef Dbspj::NullRowRef = { RNIL, GLOBAL_PAGE_SIZE_WORDS, { 0 } };
77
78 /** A noop for now.*/
execREAD_CONFIG_REQ(Signal * signal)79 void Dbspj::execREAD_CONFIG_REQ(Signal* signal)
80 {
81 jamEntry();
82 const ReadConfigReq req =
83 *reinterpret_cast<const ReadConfigReq*>(signal->getDataPtr());
84
85 Pool_context pc;
86 pc.m_block = this;
87
88 DEBUG("execREAD_CONFIG_REQ");
89 DEBUG("sizeof(Request): " << sizeof(Request) <<
90 " sizeof(TreeNode): " << sizeof(TreeNode));
91
92 m_arenaAllocator.init(1024, RT_SPJ_ARENA_BLOCK, pc);
93 m_request_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_REQUEST, pc);
94 m_treenode_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_TREENODE, pc);
95 m_scanfraghandle_pool.arena_pool_init(&m_arenaAllocator, RT_SPJ_SCANFRAG, pc);
96 m_lookup_request_hash.setSize(16);
97 m_scan_request_hash.setSize(16);
98 void* ptr = m_ctx.m_mm.get_memroot();
99 m_page_pool.set((RowPage*)ptr, (Uint32)~0);
100
101 Record_info ri;
102 Dependency_map::createRecordInfo(ri, RT_SPJ_DATABUFFER);
103 m_dependency_map_pool.init(&m_arenaAllocator, ri, pc);
104
105 ReadConfigConf* const conf =
106 reinterpret_cast<ReadConfigConf*>(signal->getDataPtrSend());
107 conf->senderRef = reference();
108 conf->senderData = req.senderData;
109
110 sendSignal(req.senderRef, GSN_READ_CONFIG_CONF, signal,
111 ReadConfigConf::SignalLength, JBB);
112 }//Dbspj::execREAD_CONF_REQ()
113
114 static Uint32 f_STTOR_REF = 0;
115
execSTTOR(Signal * signal)116 void Dbspj::execSTTOR(Signal* signal)
117 {
118 //#define UNIT_TEST_DATABUFFER2
119
120 jamEntry();
121 /* START CASE */
122 const Uint16 tphase = signal->theData[1];
123 f_STTOR_REF = signal->getSendersBlockRef();
124
125 ndbout << "Dbspj::execSTTOR() inst:" << instance()
126 << " phase=" << tphase << endl;
127
128 if (tphase == 1)
129 {
130 jam();
131 signal->theData[0] = 0;
132 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 1000, 1);
133 }
134
135 if (tphase == 4)
136 {
137 jam();
138
139 signal->theData[0] = reference();
140 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
141 return;
142 }
143
144 sendSTTORRY(signal);
145
146 #ifdef UNIT_TEST_DATABUFFER2
147 if (tphase == 120)
148 {
149 ndbout_c("basic test of ArenaPool / DataBuffer2");
150
151 for (Uint32 i = 0; i<100; i++)
152 {
153 ArenaHead ah;
154 if (!m_arenaAllocator.seize(ah))
155 {
156 ndbout_c("Failed to allocate arena");
157 break;
158 }
159
160 ndbout_c("*** LOOP %u", i);
161 Uint32 sum = 0;
162 Dependency_map::Head head;
163 LocalArenaPoolImpl pool(ah, m_dependency_map_pool);
164 for (Uint32 j = 0; j<100; j++)
165 {
166 Uint32 sz = rand() % 1000;
167 if (0)
168 ndbout_c("adding %u", sz);
169 Local_dependency_map list(pool, head);
170 for (Uint32 i = 0; i<sz; i++)
171 signal->theData[i] = sum + i;
172 list.append(signal->theData, sz);
173 sum += sz;
174 }
175
176 {
177 ndbrequire(head.getSize() == sum);
178 Local_dependency_map list(pool, head);
179 Dependency_map::ConstDataBufferIterator it;
180 Uint32 cnt = 0;
181 for (list.first(it); !it.isNull(); list.next(it))
182 {
183 ndbrequire(* it.data == cnt);
184 cnt++;
185 }
186
187 ndbrequire(cnt == sum);
188 }
189
190 Resource_limit rl;
191 if (m_ctx.m_mm.get_resource_limit(7, rl))
192 {
193 ndbout_c("Resource %d min: %d max: %d curr: %d",
194 7, rl.m_min, rl.m_max, rl.m_curr);
195 }
196
197 {
198 ndbout_c("release map");
199 Local_dependency_map list(pool, head);
200 list.release();
201 }
202
203 ndbout_c("release all");
204 m_arenaAllocator.release(ah);
205 ndbout_c("*** LOOP %u sum: %u", i, sum);
206 }
207 }
208 #endif
209 }//Dbspj::execSTTOR()
210
211 void
sendSTTORRY(Signal * signal)212 Dbspj::sendSTTORRY(Signal* signal)
213 {
214 signal->theData[0] = 0;
215 signal->theData[1] = 0; /* BLOCK CATEGORY */
216 signal->theData[2] = 0; /* SIGNAL VERSION NUMBER */
217 signal->theData[3] = 4;
218 #ifdef UNIT_TEST_DATABUFFER2
219 signal->theData[4] = 120; /* Start phase end*/
220 #else
221 signal->theData[4] = 255;
222 #endif
223 signal->theData[5] = 255;
224 sendSignal(f_STTOR_REF, GSN_STTORRY, signal, 6, JBB);
225 }
226
227 void
execREAD_NODESCONF(Signal * signal)228 Dbspj::execREAD_NODESCONF(Signal* signal)
229 {
230 jamEntry();
231
232 ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
233
234 if (getNodeState().getNodeRestartInProgress())
235 {
236 jam();
237 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
238 c_alive_nodes.set(getOwnNodeId());
239 }
240 else
241 {
242 jam();
243 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
244 NdbNodeBitmask tmp;
245 tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
246 c_alive_nodes.bitOR(tmp);
247 }
248
249 sendSTTORRY(signal);
250 }
251
252 void
execINCL_NODEREQ(Signal * signal)253 Dbspj::execINCL_NODEREQ(Signal* signal)
254 {
255 jamEntry();
256 const Uint32 senderRef = signal->theData[0];
257 const Uint32 nodeId = signal->theData[1];
258
259 ndbrequire(!c_alive_nodes.get(nodeId));
260 c_alive_nodes.set(nodeId);
261
262 signal->theData[0] = nodeId;
263 signal->theData[1] = reference();
264 sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
265 }
266
267 void
execNODE_FAILREP(Signal * signal)268 Dbspj::execNODE_FAILREP(Signal* signal)
269 {
270 jamEntry();
271
272 const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
273 NdbNodeBitmask failed;
274 failed.assign(NdbNodeBitmask::Size, rep->theNodes);
275
276 c_alive_nodes.bitANDC(failed);
277
278 signal->theData[0] = 1;
279 signal->theData[1] = 0;
280 failed.copyto(NdbNodeBitmask::Size, signal->theData + 2);
281 sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
282 JBB);
283 }
284
285 void
execAPI_FAILREQ(Signal * signal)286 Dbspj::execAPI_FAILREQ(Signal* signal)
287 {
288 jamEntry();
289 Uint32 failedApiNode = signal->theData[0];
290 ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
291
292 /**
293 * We only need to care about lookups
294 * as SCAN's are aborted by DBTC
295 */
296
297 signal->theData[0] = failedApiNode;
298 signal->theData[1] = reference();
299 sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
300 }
301
302 void
execCONTINUEB(Signal * signal)303 Dbspj::execCONTINUEB(Signal* signal)
304 {
305 jamEntry();
306 switch(signal->theData[0]) {
307 case 0:
308 releaseGlobal(signal);
309 return;
310 case 1:
311 nodeFail_checkRequests(signal);
312 return;
313 case 2:
314 nodeFail_checkRequests(signal);
315 return;
316 }
317
318 ndbrequire(false);
319 }
320
321 void
nodeFail_checkRequests(Signal * signal)322 Dbspj::nodeFail_checkRequests(Signal* signal)
323 {
324 jam();
325 const Uint32 type = signal->theData[0];
326 const Uint32 bucket = signal->theData[1];
327
328 NdbNodeBitmask failed;
329 failed.assign(NdbNodeBitmask::Size, signal->theData+2);
330
331 Request_iterator iter;
332 Request_hash * hash;
333 switch(type){
334 case 1:
335 hash = &m_lookup_request_hash;
336 break;
337 case 2:
338 hash = &m_scan_request_hash;
339 break;
340 }
341 hash->next(bucket, iter);
342
343 const Uint32 RT_BREAK = 64;
344 for(Uint32 i = 0; (i<RT_BREAK || iter.bucket == bucket) &&
345 !iter.curr.isNull(); i++)
346 {
347 jam();
348
349 Ptr<Request> requestPtr = iter.curr;
350 hash->next(iter);
351 i += nodeFail(signal, requestPtr, failed);
352 }
353
354 if (!iter.curr.isNull())
355 {
356 jam();
357 signal->theData[0] = type;
358 signal->theData[1] = bucket;
359 failed.copyto(NdbNodeBitmask::Size, signal->theData+2);
360 sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
361 JBB);
362 }
363 else if (type == 1)
364 {
365 jam();
366 signal->theData[0] = 2;
367 signal->theData[1] = 0;
368 failed.copyto(NdbNodeBitmask::Size, signal->theData+2);
369 sendSignal(reference(), GSN_CONTINUEB, signal, 2 + NdbNodeBitmask::Size,
370 JBB);
371 }
372 else if (type == 2)
373 {
374 jam();
375 ndbout_c("Finished with handling node-failure");
376 }
377 }
378
379 /**
380 * MODULE LQHKEYREQ
381 */
execLQHKEYREQ(Signal * signal)382 void Dbspj::execLQHKEYREQ(Signal* signal)
383 {
384 jamEntry();
385 c_Counters.incr_counter(CI_READS_RECEIVED, 1);
386
387 const LqhKeyReq* req = reinterpret_cast<const LqhKeyReq*>(signal->getDataPtr());
388
389 /**
390 * #0 - KEYINFO contains key for first operation (used for hash in TC)
391 * #1 - ATTRINFO contains tree + parameters
392 * (unless StoredProcId is set, when only paramters are sent,
393 * but this is not yet implemented)
394 */
395 SectionHandle handle = SectionHandle(this, signal);
396 SegmentedSectionPtr ssPtr;
397 handle.getSection(ssPtr, LqhKeyReq::AttrInfoSectionNum);
398
399 Uint32 err;
400 Ptr<Request> requestPtr = { 0, RNIL };
401 do
402 {
403 ArenaHead ah;
404 err = DbspjErr::OutOfQueryMemory;
405 if (unlikely(!m_arenaAllocator.seize(ah)))
406 break;
407
408
409 m_request_pool.seize(ah, requestPtr);
410
411 new (requestPtr.p) Request(ah);
412 do_init(requestPtr.p, req, signal->getSendersBlockRef());
413
414 Uint32 len_cnt;
415
416 {
417 SectionReader r0(ssPtr, getSectionSegmentPool());
418
419 err = DbspjErr::ZeroLengthQueryTree;
420 if (unlikely(!r0.getWord(&len_cnt)))
421 break;
422 }
423
424 Uint32 len = QueryTree::getLength(len_cnt);
425 Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
426
427 {
428 SectionReader treeReader(ssPtr, getSectionSegmentPool());
429 SectionReader paramReader(ssPtr, getSectionSegmentPool());
430 paramReader.step(len); // skip over tree to parameters
431
432 Build_context ctx;
433 ctx.m_resultRef = req->variableData[0];
434 ctx.m_savepointId = req->savePointId;
435 ctx.m_scanPrio = 1;
436 ctx.m_start_signal = signal;
437 ctx.m_keyPtr.i = handle.m_ptr[LqhKeyReq::KeyInfoSectionNum].i;
438 ctx.m_senderRef = signal->getSendersBlockRef();
439
440 err = build(ctx, requestPtr, treeReader, paramReader);
441 if (unlikely(err != 0))
442 break;
443 }
444
445 /**
446 * a query being shipped as a LQHKEYREQ may only return finite rows
447 * i.e be a (multi-)lookup
448 */
449 ndbassert(requestPtr.p->isLookup());
450 ndbassert(requestPtr.p->m_node_cnt == cnt);
451 err = DbspjErr::InvalidRequest;
452 if (unlikely(!requestPtr.p->isLookup() || requestPtr.p->m_node_cnt != cnt))
453 break;
454
455 /**
456 * Store request in list(s)/hash(es)
457 */
458 store_lookup(requestPtr);
459
460 release(ssPtr);
461 handle.clear();
462
463 start(signal, requestPtr);
464 return;
465 } while (0);
466
467 /**
468 * Error handling below,
469 * 'err' may contain error code.
470 */
471 if (!requestPtr.isNull())
472 {
473 jam();
474 m_request_pool.release(requestPtr);
475 }
476 releaseSections(handle);
477 handle_early_lqhkey_ref(signal, req, err);
478 }
479
480 void
do_init(Request * requestP,const LqhKeyReq * req,Uint32 senderRef)481 Dbspj::do_init(Request* requestP, const LqhKeyReq* req, Uint32 senderRef)
482 {
483 requestP->m_bits = 0;
484 requestP->m_errCode = 0;
485 requestP->m_state = Request::RS_BUILDING;
486 requestP->m_node_cnt = 0;
487 requestP->m_cnt_active = 0;
488 requestP->m_rows = 0;
489 requestP->m_active_nodes.clear();
490 requestP->m_outstanding = 0;
491 requestP->m_transId[0] = req->transId1;
492 requestP->m_transId[1] = req->transId2;
493 bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
494 #ifdef SPJ_TRACE_TIME
495 requestP->m_cnt_batches = 0;
496 requestP->m_sum_rows = 0;
497 requestP->m_sum_running = 0;
498 requestP->m_sum_waiting = 0;
499 requestP->m_save_time = spj_now();
500 #endif
501 const Uint32 reqInfo = req->requestInfo;
502 Uint32 tmp = req->clientConnectPtr;
503 if (LqhKeyReq::getDirtyFlag(reqInfo) &&
504 LqhKeyReq::getOperation(reqInfo) == ZREAD)
505 {
506 jam();
507
508 ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
509 //const Uint32 apiRef = lqhKeyReq->variableData[0];
510 //const Uint32 apiOpRec = lqhKeyReq->variableData[1];
511 tmp = req->variableData[1];
512 requestP->m_senderData = tmp;
513 requestP->m_senderRef = senderRef;
514 }
515 else
516 {
517 if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
518 {
519 if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
520 tmp = req->variableData[2];
521 else
522 tmp = req->variableData[0];
523 }
524 requestP->m_senderData = tmp;
525 requestP->m_senderRef = senderRef;
526 }
527 requestP->m_rootResultData = tmp;
528 }
529
530 void
store_lookup(Ptr<Request> requestPtr)531 Dbspj::store_lookup(Ptr<Request> requestPtr)
532 {
533 ndbassert(requestPtr.p->isLookup());
534 Ptr<Request> tmp;
535 bool found = m_lookup_request_hash.find(tmp, *requestPtr.p);
536 ndbrequire(found == false);
537 m_lookup_request_hash.add(requestPtr);
538 }
539
540 void
handle_early_lqhkey_ref(Signal * signal,const LqhKeyReq * lqhKeyReq,Uint32 err)541 Dbspj::handle_early_lqhkey_ref(Signal* signal,
542 const LqhKeyReq * lqhKeyReq,
543 Uint32 err)
544 {
545 /**
546 * Error path...
547 */
548 ndbrequire(err);
549 const Uint32 reqInfo = lqhKeyReq->requestInfo;
550 const Uint32 transid[2] = { lqhKeyReq->transId1, lqhKeyReq->transId2 };
551
552 if (LqhKeyReq::getDirtyFlag(reqInfo) &&
553 LqhKeyReq::getOperation(reqInfo) == ZREAD)
554 {
555 jam();
556 /* Dirty read sends TCKEYREF direct to client, and nothing to TC */
557 ndbrequire(LqhKeyReq::getApplicationAddressFlag(reqInfo));
558 const Uint32 apiRef = lqhKeyReq->variableData[0];
559 const Uint32 apiOpRec = lqhKeyReq->variableData[1];
560
561 TcKeyRef* const tcKeyRef = reinterpret_cast<TcKeyRef*>(signal->getDataPtrSend());
562
563 tcKeyRef->connectPtr = apiOpRec;
564 tcKeyRef->transId[0] = transid[0];
565 tcKeyRef->transId[1] = transid[1];
566 tcKeyRef->errorCode = err;
567 sendTCKEYREF(signal, apiRef, signal->getSendersBlockRef());
568 }
569 else
570 {
571 jam();
572 const Uint32 returnref = signal->getSendersBlockRef();
573 const Uint32 clientPtr = lqhKeyReq->clientConnectPtr;
574
575 Uint32 TcOprec = clientPtr;
576 if (LqhKeyReq::getSameClientAndTcFlag(reqInfo) == 1)
577 {
578 if (LqhKeyReq::getApplicationAddressFlag(reqInfo))
579 TcOprec = lqhKeyReq->variableData[2];
580 else
581 TcOprec = lqhKeyReq->variableData[0];
582 }
583
584 LqhKeyRef* const ref = reinterpret_cast<LqhKeyRef*>(signal->getDataPtrSend());
585 ref->userRef = clientPtr;
586 ref->connectPtr = TcOprec;
587 ref->errorCode = err;
588 ref->transId1 = transid[0];
589 ref->transId2 = transid[1];
590 sendSignal(returnref, GSN_LQHKEYREF, signal,
591 LqhKeyRef::SignalLength, JBB);
592 }
593 }
594
595 void
sendTCKEYREF(Signal * signal,Uint32 ref,Uint32 routeRef)596 Dbspj::sendTCKEYREF(Signal* signal, Uint32 ref, Uint32 routeRef)
597 {
598 const Uint32 nodeId = refToNode(ref);
599 const bool connectedToNode = getNodeInfo(nodeId).m_connected;
600
601 if (likely(connectedToNode))
602 {
603 jam();
604 sendSignal(ref, GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB);
605 }
606 else
607 {
608 jam();
609 memmove(signal->theData+25, signal->theData, 4*TcKeyRef::SignalLength);
610 RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend();
611 ord->dstRef = ref;
612 ord->srcRef = reference();
613 ord->gsn = GSN_TCKEYREF;
614 ord->cnt = 0;
615 LinearSectionPtr ptr[3];
616 ptr[0].p = signal->theData+25;
617 ptr[0].sz = TcKeyRef::SignalLength;
618 sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
619 ptr, 1);
620 }
621 }
622
623 void
sendTCKEYCONF(Signal * signal,Uint32 len,Uint32 ref,Uint32 routeRef)624 Dbspj::sendTCKEYCONF(Signal* signal, Uint32 len, Uint32 ref, Uint32 routeRef)
625 {
626 const Uint32 nodeId = refToNode(ref);
627 const bool connectedToNode = getNodeInfo(nodeId).m_connected;
628
629 if (likely(connectedToNode))
630 {
631 jam();
632 sendSignal(ref, GSN_TCKEYCONF, signal, len, JBB);
633 }
634 else
635 {
636 jam();
637 memmove(signal->theData+25, signal->theData, 4*len);
638 RouteOrd* ord = (RouteOrd*)signal->getDataPtrSend();
639 ord->dstRef = ref;
640 ord->srcRef = reference();
641 ord->gsn = GSN_TCKEYCONF;
642 ord->cnt = 0;
643 LinearSectionPtr ptr[3];
644 ptr[0].p = signal->theData+25;
645 ptr[0].sz = len;
646 sendSignal(routeRef, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBB,
647 ptr, 1);
648 }
649 }
650
651 /**
652 * END - MODULE LQHKEYREQ
653 */
654
655
656 /**
657 * MODULE SCAN_FRAGREQ
658 */
659 void
execSCAN_FRAGREQ(Signal * signal)660 Dbspj::execSCAN_FRAGREQ(Signal* signal)
661 {
662 jamEntry();
663
664 /* Reassemble if the request was fragmented */
665 if (!assembleFragments(signal))
666 {
667 jam();
668 return;
669 }
670
671 const ScanFragReq * req = (ScanFragReq *)&signal->theData[0];
672
673 #ifdef DEBUG_SCAN_FRAGREQ
674 ndbout_c("Incomming SCAN_FRAGREQ ");
675 printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
676 ScanFragReq::SignalLength + 2,
677 DBLQH);
678 #endif
679
680 /**
681 * #0 - ATTRINFO contains tree + parameters
682 * (unless StoredProcId is set, when only paramters are sent,
683 * but this is not yet implemented)
684 * #1 - KEYINFO if first op is index scan - contains bounds for first scan
685 * if first op is lookup - contains keyinfo for lookup
686 */
687 SectionHandle handle = SectionHandle(this, signal);
688 SegmentedSectionPtr ssPtr;
689 handle.getSection(ssPtr, ScanFragReq::AttrInfoSectionNum);
690
691 Uint32 err;
692 Ptr<Request> requestPtr = { 0, RNIL };
693 do
694 {
695 ArenaHead ah;
696 err = DbspjErr::OutOfQueryMemory;
697 if (unlikely(!m_arenaAllocator.seize(ah)))
698 break;
699
700 m_request_pool.seize(ah, requestPtr);
701
702 new (requestPtr.p) Request(ah);
703 do_init(requestPtr.p, req, signal->getSendersBlockRef());
704
705 Uint32 len_cnt;
706 {
707 SectionReader r0(ssPtr, getSectionSegmentPool());
708 err = DbspjErr::ZeroLengthQueryTree;
709 if (unlikely(!r0.getWord(&len_cnt)))
710 break;
711 }
712
713 Uint32 len = QueryTree::getLength(len_cnt);
714 Uint32 cnt = QueryTree::getNodeCnt(len_cnt);
715
716 {
717 SectionReader treeReader(ssPtr, getSectionSegmentPool());
718 SectionReader paramReader(ssPtr, getSectionSegmentPool());
719 paramReader.step(len); // skip over tree to parameters
720
721 Build_context ctx;
722 ctx.m_resultRef = req->resultRef;
723 ctx.m_scanPrio = ScanFragReq::getScanPrio(req->requestInfo);
724 ctx.m_savepointId = req->savePointId;
725 ctx.m_batch_size_rows = req->batch_size_rows;
726 ctx.m_start_signal = signal;
727 ctx.m_senderRef = signal->getSendersBlockRef();
728
729 if (handle.m_cnt > 1)
730 {
731 jam();
732 ctx.m_keyPtr.i = handle.m_ptr[ScanFragReq::KeyInfoSectionNum].i;
733 }
734 else
735 {
736 jam();
737 ctx.m_keyPtr.i = RNIL;
738 }
739
740 err = build(ctx, requestPtr, treeReader, paramReader);
741 if (unlikely(err != 0))
742 break;
743 }
744
745 ndbassert(requestPtr.p->isScan());
746 ndbassert(requestPtr.p->m_node_cnt == cnt);
747 err = DbspjErr::InvalidRequest;
748 if (unlikely(!requestPtr.p->isScan() || requestPtr.p->m_node_cnt != cnt))
749 break;
750
751 /**
752 * Store request in list(s)/hash(es)
753 */
754 store_scan(requestPtr);
755
756 release(ssPtr);
757 handle.clear();
758
759 start(signal, requestPtr);
760 return;
761 } while (0);
762
763 if (!requestPtr.isNull())
764 {
765 jam();
766 m_request_pool.release(requestPtr);
767 }
768 releaseSections(handle);
769 handle_early_scanfrag_ref(signal, req, err);
770 }
771
772 void
do_init(Request * requestP,const ScanFragReq * req,Uint32 senderRef)773 Dbspj::do_init(Request* requestP, const ScanFragReq* req, Uint32 senderRef)
774 {
775 requestP->m_bits = 0;
776 requestP->m_errCode = 0;
777 requestP->m_state = Request::RS_BUILDING;
778 requestP->m_node_cnt = 0;
779 requestP->m_cnt_active = 0;
780 requestP->m_rows = 0;
781 requestP->m_active_nodes.clear();
782 requestP->m_outstanding = 0;
783 requestP->m_senderRef = senderRef;
784 requestP->m_senderData = req->senderData;
785 requestP->m_transId[0] = req->transId1;
786 requestP->m_transId[1] = req->transId2;
787 requestP->m_rootResultData = req->resultData;
788 bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
789 #ifdef SPJ_TRACE_TIME
790 requestP->m_cnt_batches = 0;
791 requestP->m_sum_rows = 0;
792 requestP->m_sum_running = 0;
793 requestP->m_sum_waiting = 0;
794 requestP->m_save_time = spj_now();
795 #endif
796 }
797
798 void
store_scan(Ptr<Request> requestPtr)799 Dbspj::store_scan(Ptr<Request> requestPtr)
800 {
801 ndbassert(requestPtr.p->isScan());
802 Ptr<Request> tmp;
803 bool found = m_scan_request_hash.find(tmp, *requestPtr.p);
804 ndbrequire(found == false);
805 m_scan_request_hash.add(requestPtr);
806 }
807
808 void
handle_early_scanfrag_ref(Signal * signal,const ScanFragReq * _req,Uint32 err)809 Dbspj::handle_early_scanfrag_ref(Signal* signal,
810 const ScanFragReq * _req,
811 Uint32 err)
812 {
813 ScanFragReq req = *_req;
814 Uint32 senderRef = signal->getSendersBlockRef();
815
816 ScanFragRef * ref = (ScanFragRef*)&signal->theData[0];
817 ref->senderData = req.senderData;
818 ref->transId1 = req.transId1;
819 ref->transId2 = req.transId2;
820 ref->errorCode = err;
821 sendSignal(senderRef, GSN_SCAN_FRAGREF, signal,
822 ScanFragRef::SignalLength, JBB);
823 }
824
825 /**
826 * END - MODULE SCAN_FRAGREQ
827 */
828
829 /**
830 * MODULE GENERIC
831 */
832 Uint32
build(Build_context & ctx,Ptr<Request> requestPtr,SectionReader & tree,SectionReader & param)833 Dbspj::build(Build_context& ctx,
834 Ptr<Request> requestPtr,
835 SectionReader & tree,
836 SectionReader & param)
837 {
838 Uint32 tmp0, tmp1;
839 Uint32 err = DbspjErr::ZeroLengthQueryTree;
840 ctx.m_cnt = 0;
841 ctx.m_scan_cnt = 0;
842
843 tree.getWord(&tmp0);
844 Uint32 loop = QueryTree::getNodeCnt(tmp0);
845
846 DEBUG("::build()");
847 err = DbspjErr::InvalidTreeNodeCount;
848 if (loop == 0 || loop > NDB_SPJ_MAX_TREE_NODES)
849 {
850 DEBUG_CRASH();
851 goto error;
852 }
853
854 while (ctx.m_cnt < loop)
855 {
856 DEBUG(" - loop " << ctx.m_cnt << " pos: " << tree.getPos().currPos);
857 tree.peekWord(&tmp0);
858 param.peekWord(&tmp1);
859 Uint32 node_op = QueryNode::getOpType(tmp0);
860 Uint32 node_len = QueryNode::getLength(tmp0);
861 Uint32 param_op = QueryNodeParameters::getOpType(tmp1);
862 Uint32 param_len = QueryNodeParameters::getLength(tmp1);
863
864 err = DbspjErr::QueryNodeTooBig;
865 if (unlikely(node_len >= NDB_ARRAY_SIZE(m_buffer0)))
866 {
867 DEBUG_CRASH();
868 goto error;
869 }
870
871 err = DbspjErr::QueryNodeParametersTooBig;
872 if (unlikely(param_len >= NDB_ARRAY_SIZE(m_buffer1)))
873 {
874 DEBUG_CRASH();
875 goto error;
876 }
877
878 err = DbspjErr::InvalidTreeNodeSpecification;
879 if (unlikely(tree.getWords(m_buffer0, node_len) == false))
880 {
881 DEBUG_CRASH();
882 goto error;
883 }
884
885 err = DbspjErr::InvalidTreeParametersSpecification;
886 if (unlikely(param.getWords(m_buffer1, param_len) == false))
887 {
888 DEBUG_CRASH();
889 goto error;
890 }
891
892 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
893 printf("node: ");
894 for (Uint32 i = 0; i<node_len; i++)
895 printf("0x%.8x ", m_buffer0[i]);
896 printf("\n");
897
898 printf("param: ");
899 for (Uint32 i = 0; i<param_len; i++)
900 printf("0x%.8x ", m_buffer1[i]);
901 printf("\n");
902 #endif
903
904 err = DbspjErr::UnknowQueryOperation;
905 if (unlikely(node_op != param_op))
906 {
907 DEBUG_CRASH();
908 goto error;
909 }
910
911 const OpInfo* info = getOpInfo(node_op);
912 if (unlikely(info == 0))
913 {
914 DEBUG_CRASH();
915 goto error;
916 }
917
918 QueryNode* qn = (QueryNode*)m_buffer0;
919 QueryNodeParameters * qp = (QueryNodeParameters*)m_buffer1;
920 qn->len = node_len;
921 qp->len = param_len;
922 err = (this->*(info->m_build))(ctx, requestPtr, qn, qp);
923 if (unlikely(err != 0))
924 {
925 DEBUG_CRASH();
926 goto error;
927 }
928
929 /**
930 * only first node gets access to signal
931 */
932 ctx.m_start_signal = 0;
933
934 /**
935 * TODO handle error, by aborting request
936 */
937 ndbrequire(ctx.m_cnt < NDB_ARRAY_SIZE(ctx.m_node_list));
938 ctx.m_cnt++;
939 }
940 requestPtr.p->m_node_cnt = ctx.m_cnt;
941
942 /**
943 * Init ROW_BUFFERS for those TreeNodes requiring either
944 * T_ROW_BUFFER or T_ROW_BUFFER_MAP.
945 */
946 if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
947 {
948 Ptr<TreeNode> treeNodePtr;
949 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
950 for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
951 {
952 if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
953 {
954 jam();
955 treeNodePtr.p->m_row_map.init();
956 }
957 else if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
958 {
959 jam();
960 treeNodePtr.p->m_row_list.init();
961 }
962 }
963 }
964
965 if (ctx.m_scan_cnt > 1)
966 {
967 jam();
968 requestPtr.p->m_bits |= Request::RT_MULTI_SCAN;
969
970 /**
971 * Iff, multi-scan is non-bushy (normal case)
972 * we don't strictly need RT_VAR_ALLOC for RT_ROW_BUFFERS
973 * but could instead pop-row stack frame,
974 * however this is not implemented...
975 *
976 * so, use RT_VAR_ALLOC
977 */
978 if (requestPtr.p->m_bits & Request::RT_ROW_BUFFERS)
979 {
980 jam();
981 requestPtr.p->m_bits |= Request::RT_VAR_ALLOC;
982 }
983 }
984
985 return 0;
986
987 error:
988 jam();
989 return err;
990 }
991
992 Uint32
createNode(Build_context & ctx,Ptr<Request> requestPtr,Ptr<TreeNode> & treeNodePtr)993 Dbspj::createNode(Build_context& ctx, Ptr<Request> requestPtr,
994 Ptr<TreeNode> & treeNodePtr)
995 {
996 /**
997 * In the future, we can have different TreeNode-allocation strategies
998 * that can be setup using the Build_context
999 *
1000 */
1001 if (m_treenode_pool.seize(requestPtr.p->m_arena, treeNodePtr))
1002 {
1003 DEBUG("createNode - seize -> ptrI: " << treeNodePtr.i);
1004 new (treeNodePtr.p) TreeNode(requestPtr.i);
1005 ctx.m_node_list[ctx.m_cnt] = treeNodePtr;
1006 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1007 list.addLast(treeNodePtr);
1008 treeNodePtr.p->m_node_no = ctx.m_cnt;
1009 return 0;
1010 }
1011 return DbspjErr::OutOfOperations;
1012 }
1013
1014 void
start(Signal * signal,Ptr<Request> requestPtr)1015 Dbspj::start(Signal* signal,
1016 Ptr<Request> requestPtr)
1017 {
1018 if (requestPtr.p->m_bits & Request::RT_NEED_PREPARE)
1019 {
1020 jam();
1021 requestPtr.p->m_outstanding = 0;
1022 requestPtr.p->m_state = Request::RS_PREPARING;
1023
1024 Ptr<TreeNode> nodePtr;
1025 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1026 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1027 {
1028 jam();
1029 ndbrequire(nodePtr.p->m_info != 0);
1030 if (nodePtr.p->m_info->m_prepare != 0)
1031 {
1032 jam();
1033 (this->*(nodePtr.p->m_info->m_prepare))(signal, requestPtr, nodePtr);
1034 }
1035 }
1036
1037 /**
1038 * preferably RT_NEED_PREPARE should only be set if blocking
1039 * calls are used, in which case m_outstanding should have been increased
1040 */
1041 ndbassert(requestPtr.p->m_outstanding);
1042 }
1043
1044 checkPrepareComplete(signal, requestPtr, 0);
1045 }
1046
1047 void
checkPrepareComplete(Signal * signal,Ptr<Request> requestPtr,Uint32 cnt)1048 Dbspj::checkPrepareComplete(Signal * signal, Ptr<Request> requestPtr,
1049 Uint32 cnt)
1050 {
1051 ndbrequire(requestPtr.p->m_outstanding >= cnt);
1052 requestPtr.p->m_outstanding -= cnt;
1053
1054 if (requestPtr.p->m_outstanding == 0)
1055 {
1056 jam();
1057
1058 if (unlikely((requestPtr.p->m_state & Request::RS_ABORTING) != 0))
1059 {
1060 jam();
1061 batchComplete(signal, requestPtr);
1062 return;
1063 }
1064
1065 requestPtr.p->m_state = Request::RS_RUNNING;
1066 Ptr<TreeNode> nodePtr;
1067 {
1068 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1069 ndbrequire(list.first(nodePtr));
1070 }
1071 ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_start != 0);
1072 (this->*(nodePtr.p->m_info->m_start))(signal, requestPtr, nodePtr);
1073 }
1074 }
1075
1076 void
checkBatchComplete(Signal * signal,Ptr<Request> requestPtr,Uint32 cnt)1077 Dbspj::checkBatchComplete(Signal * signal, Ptr<Request> requestPtr,
1078 Uint32 cnt)
1079 {
1080 ndbrequire(requestPtr.p->m_outstanding >= cnt);
1081 requestPtr.p->m_outstanding -= cnt;
1082
1083 if (requestPtr.p->m_outstanding == 0)
1084 {
1085 jam();
1086 batchComplete(signal, requestPtr);
1087 }
1088 }
1089
1090 void
batchComplete(Signal * signal,Ptr<Request> requestPtr)1091 Dbspj::batchComplete(Signal* signal, Ptr<Request> requestPtr)
1092 {
1093 ndbrequire(requestPtr.p->m_outstanding == 0); // "definition" of batchComplete
1094
1095 bool is_complete = requestPtr.p->m_cnt_active == 0;
1096 bool need_complete_phase = requestPtr.p->m_bits & Request::RT_NEED_COMPLETE;
1097
1098 if (requestPtr.p->isLookup())
1099 {
1100 ndbassert(requestPtr.p->m_cnt_active == 0);
1101 }
1102
1103 if (!is_complete || (is_complete && need_complete_phase == false))
1104 {
1105 /**
1106 * one batch complete, and either
1107 * - request not complete
1108 * - or not complete_phase needed
1109 */
1110 jam();
1111
1112 if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1113 {
1114 ndbassert(is_complete);
1115 }
1116
1117 prepareNextBatch(signal, requestPtr);
1118 sendConf(signal, requestPtr, is_complete);
1119 }
1120 else if (is_complete && need_complete_phase)
1121 {
1122 jam();
1123 /**
1124 * run complete-phase
1125 */
1126 complete(signal, requestPtr);
1127 return;
1128 }
1129
1130 if (requestPtr.p->m_cnt_active == 0)
1131 {
1132 jam();
1133 /**
1134 * request completed
1135 */
1136 cleanup(requestPtr);
1137 }
1138 else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
1139 {
1140 jam();
1141 /**
1142 * release unneeded buffers and position cursor for SCAN_NEXTREQ
1143 */
1144 releaseScanBuffers(requestPtr);
1145 }
1146 else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
1147 {
1148 jam();
1149 /**
1150 * if not multiple scans in request, simply release all pages allocated
1151 * for row buffers (all rows will be released anyway)
1152 */
1153 releaseRequestBuffers(requestPtr, true);
1154 }
1155 }
1156
1157 /**
1158 * Locate next TreeNode(s) to retrieve more rows from.
1159 *
1160 * Calculate set of the 'm_active_nodes' we will receive from in NEXTREQ.
1161 * Add these TreeNodes to the cursor list to be iterated.
1162 */
1163 void
prepareNextBatch(Signal * signal,Ptr<Request> requestPtr)1164 Dbspj::prepareNextBatch(Signal* signal, Ptr<Request> requestPtr)
1165 {
1166 requestPtr.p->m_cursor_nodes.init();
1167 requestPtr.p->m_active_nodes.clear();
1168
1169 if (requestPtr.p->m_cnt_active == 0)
1170 {
1171 jam();
1172 return;
1173 }
1174
1175 if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT)
1176 {
1177 /**
1178 * If REPEAT_SCAN_RESULT we handle bushy scans by return more *new* rows
1179 * from only one of the active child scans. If there are multiple
1180 * bushy scans not being able to return their current result set in
1181 * a single batch, result sets from the other child scans are repeated
1182 * until all rows has been returned to the API client.
1183 *
1184 * Hence, the cross joined results from the bushy scans are partly
1185 * produced within the SPJ block on a 'batchsize granularity',
1186 * and partly is the responsibility of the API-client by iterating
1187 * the result rows within the current result batches.
1188 * (Opposed to non-REPEAT_SCAN_RESULT, the client only have to care about
1189 * the current batched rows - no buffering is required)
1190 */
1191 jam();
1192 Ptr<TreeNode> nodePtr;
1193 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1194
1195 /**
1196 * Locate last 'TN_ACTIVE' TreeNode which is the only one choosen
1197 * to return more *new* rows.
1198 */
1199 for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1200 {
1201 if (nodePtr.p->m_state == TreeNode::TN_ACTIVE)
1202 {
1203 jam();
1204 DEBUG("Will fetch more from 'active' m_node_no: " << nodePtr.p->m_node_no);
1205 /**
1206 * A later NEXTREQ will request a *new* batch of rows from this TreeNode.
1207 */
1208 registerActiveCursor(requestPtr, nodePtr);
1209 break;
1210 }
1211 }
1212
1213 /**
1214 * Restart/repeat other (index scan) child batches which:
1215 * - Being 'after' nodePtr located above.
1216 * - Not being an ancestor of (depends on) any 'active' TreeNode.
1217 * (As these scans are started when rows from these parent nodes
1218 * arrives.)
1219 */
1220 if (!nodePtr.isNull())
1221 {
1222 jam();
1223 DEBUG("Calculate 'active', w/ cursor on m_node_no: " << nodePtr.p->m_node_no);
1224
1225 /* Restart any partial index-scans after this 'TN_ACTIVE' TreeNode */
1226 for (list.next(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1227 {
1228 jam();
1229 if (!nodePtr.p->m_ancestors.overlaps (requestPtr.p->m_active_nodes))
1230 {
1231 jam();
1232 ndbrequire(nodePtr.p->m_state != TreeNode::TN_ACTIVE);
1233 ndbrequire(nodePtr.p->m_info != 0);
1234 if (nodePtr.p->m_info->m_parent_batch_repeat != 0)
1235 {
1236 jam();
1237 (this->*(nodePtr.p->m_info->m_parent_batch_repeat))(signal,
1238 requestPtr,
1239 nodePtr);
1240 }
1241 }
1242 }
1243 } // if (!nodePtr.isNull()
1244 }
1245 else // not 'RT_REPEAT_SCAN_RESULT'
1246 {
1247 /**
1248 * If not REPEAT_SCAN_RESULT multiple active TreeNodes may return their
1249 * remaining result simultaneously. In case of bushy-scans, these
1250 * concurrent result streams are cross joins of each other
1251 * in SQL terms. In order to produce the cross joined result, it is
1252 * the responsibility of the API-client to buffer these streams and
1253 * iterate them to produce the cross join.
1254 */
1255 jam();
1256 Ptr<TreeNode> nodePtr;
1257 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1258 TreeNodeBitMask ancestors_of_active;
1259
1260 for (list.last(nodePtr); !nodePtr.isNull(); list.prev(nodePtr))
1261 {
1262 /**
1263 * If we are active (i.e not consumed all rows originating
1264 * from parent rows) and we are not in the set of parents
1265 * for any active child:
1266 *
1267 * Then, this is a position that execSCAN_NEXTREQ should continue
1268 */
1269 if (nodePtr.p->m_state == TreeNode::TN_ACTIVE &&
1270 !ancestors_of_active.get (nodePtr.p->m_node_no))
1271 {
1272 jam();
1273 DEBUG("Add 'active' m_node_no: " << nodePtr.p->m_node_no);
1274 registerActiveCursor(requestPtr, nodePtr);
1275 ancestors_of_active.bitOR(nodePtr.p->m_ancestors);
1276 }
1277 }
1278 } // if (RT_REPEAT_SCAN_RESULT)
1279
1280 DEBUG("Calculated 'm_active_nodes': " << requestPtr.p->m_active_nodes.rep.data[0]);
1281 }
1282
1283 void
sendConf(Signal * signal,Ptr<Request> requestPtr,bool is_complete)1284 Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete)
1285 {
1286 if (requestPtr.p->isScan())
1287 {
1288 if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1289 {
1290 jam();
1291 /**
1292 * We aborted request ourselves (due to node-failure ?)
1293 * but TC haven't contacted us...so we can't reply yet...
1294 */
1295 ndbrequire(is_complete);
1296 ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
1297 return;
1298 }
1299
1300 if (requestPtr.p->m_errCode == 0)
1301 {
1302 jam();
1303 ScanFragConf * conf=
1304 reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
1305 conf->senderData = requestPtr.p->m_senderData;
1306 conf->transId1 = requestPtr.p->m_transId[0];
1307 conf->transId2 = requestPtr.p->m_transId[1];
1308 conf->completedOps = requestPtr.p->m_rows;
1309 conf->fragmentCompleted = is_complete ? 1 : 0;
1310 conf->total_len = requestPtr.p->m_active_nodes.rep.data[0];
1311
1312 c_Counters.incr_counter(CI_SCAN_BATCHES_RETURNED, 1);
1313 c_Counters.incr_counter(CI_SCAN_ROWS_RETURNED, requestPtr.p->m_rows);
1314
1315 #ifdef SPJ_TRACE_TIME
1316 Uint64 now = spj_now();
1317 Uint64 then = requestPtr.p->m_save_time;
1318
1319 requestPtr.p->m_sum_rows += requestPtr.p->m_rows;
1320 requestPtr.p->m_sum_running += Uint32(now - then);
1321 requestPtr.p->m_cnt_batches++;
1322 requestPtr.p->m_save_time = now;
1323
1324 if (is_complete)
1325 {
1326 Uint32 cnt = requestPtr.p->m_cnt_batches;
1327 ndbout_c("batches: %u avg_rows: %u avg_running: %u avg_wait: %u",
1328 cnt,
1329 (requestPtr.p->m_sum_rows / cnt),
1330 (requestPtr.p->m_sum_running / cnt),
1331 cnt == 1 ? 0 : requestPtr.p->m_sum_waiting / (cnt - 1));
1332 }
1333 #endif
1334
1335 /**
1336 * reset for next batch
1337 */
1338 requestPtr.p->m_rows = 0;
1339 if (!is_complete)
1340 {
1341 jam();
1342 requestPtr.p->m_state |= Request::RS_WAITING;
1343 }
1344 #ifdef DEBUG_SCAN_FRAGREQ
1345 ndbout_c("Dbspj::sendConf() sending SCAN_FRAGCONF ");
1346 printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1347 conf->total_len,
1348 DBLQH);
1349 #endif
1350 sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
1351 ScanFragConf::SignalLength, JBB);
1352 }
1353 else
1354 {
1355 jam();
1356 ndbrequire(is_complete);
1357 ScanFragRef * ref=
1358 reinterpret_cast<ScanFragRef*>(signal->getDataPtrSend());
1359 ref->senderData = requestPtr.p->m_senderData;
1360 ref->transId1 = requestPtr.p->m_transId[0];
1361 ref->transId2 = requestPtr.p->m_transId[1];
1362 ref->errorCode = requestPtr.p->m_errCode;
1363
1364 sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
1365 ScanFragRef::SignalLength, JBB);
1366 }
1367 }
1368 else
1369 {
1370 ndbassert(is_complete);
1371 if (requestPtr.p->m_errCode)
1372 {
1373 jam();
1374 Uint32 resultRef = getResultRef(requestPtr);
1375 TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr();
1376 ref->connectPtr = requestPtr.p->m_senderData;
1377 ref->transId[0] = requestPtr.p->m_transId[0];
1378 ref->transId[1] = requestPtr.p->m_transId[1];
1379 ref->errorCode = requestPtr.p->m_errCode;
1380 ref->errorData = 0;
1381
1382 sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
1383 }
1384 }
1385 }
1386
1387 Uint32
getResultRef(Ptr<Request> requestPtr)1388 Dbspj::getResultRef(Ptr<Request> requestPtr)
1389 {
1390 Ptr<TreeNode> nodePtr;
1391 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1392 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1393 {
1394 if (nodePtr.p->m_info == &g_LookupOpInfo)
1395 {
1396 jam();
1397 return nodePtr.p->m_lookup_data.m_api_resultRef;
1398 }
1399 }
1400 ndbrequire(false);
1401 return 0;
1402 }
1403
1404 void
releaseScanBuffers(Ptr<Request> requestPtr)1405 Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
1406 {
1407 Ptr<TreeNode> treeNodePtr;
1408 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1409 TreeNodeBitMask ancestors_of_active;
1410
1411 for (list.last(treeNodePtr); !treeNodePtr.isNull(); list.prev(treeNodePtr))
1412 {
1413 /**
1414 * If there are no active children,
1415 * then we can cleanup in our sub-branch
1416 */
1417 if (!ancestors_of_active.get(treeNodePtr.p->m_node_no))
1418 {
1419 if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1420 {
1421 jam();
1422 releaseNodeRows(requestPtr, treeNodePtr);
1423 }
1424
1425 /**
1426 * Cleanup ACTIVE nodes fetching more rows in a NEXTREQ,
1427 * or nodes being in 'm_active_nodes' as they will 'repeat'.
1428 * (and then become active)
1429 */
1430 if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE ||
1431 requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no))
1432 {
1433 jam();
1434 cleanupChildBranch(requestPtr, treeNodePtr);
1435 }
1436 }
1437
1438 /**
1439 * Collect ancestors of all nodes which are, or will
1440 * become active in NEXTREQ (possibly repeated)
1441 */
1442 if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE ||
1443 requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no))
1444 {
1445 ancestors_of_active.bitOR(treeNodePtr.p->m_ancestors);
1446 }
1447 }
1448 /**
1449 * Needs to be atleast 1 active otherwise we should have
1450 * taken the cleanup "path" in batchComplete
1451 */
1452 ndbrequire(requestPtr.p->m_cnt_active >= 1);
1453 }
1454
1455 void
registerActiveCursor(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1456 Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1457 {
1458 Uint32 bit = treeNodePtr.p->m_node_no;
1459 ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
1460 requestPtr.p->m_active_nodes.set(bit);
1461
1462 Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
1463 #ifdef VM_TRACE
1464 {
1465 Ptr<TreeNode> nodePtr;
1466 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1467 {
1468 ndbrequire(nodePtr.i != treeNodePtr.i);
1469 }
1470 }
1471 #endif
1472 list.add(treeNodePtr);
1473 }
1474
1475 void
cleanupChildBranch(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1476 Dbspj::cleanupChildBranch(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1477 {
1478 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1479 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1480 Dependency_map::ConstDataBufferIterator it;
1481 for (list.first(it); !it.isNull(); list.next(it))
1482 {
1483 jam();
1484 Ptr<TreeNode> childPtr;
1485 m_treenode_pool.getPtr(childPtr, *it.data);
1486 if (childPtr.p->m_info->m_parent_batch_cleanup != 0)
1487 {
1488 jam();
1489 (this->*(childPtr.p->m_info->m_parent_batch_cleanup))(requestPtr,
1490 childPtr);
1491 }
1492 cleanupChildBranch(requestPtr,childPtr);
1493 }
1494 }
1495
1496 void
releaseNodeRows(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1497 Dbspj::releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1498 {
1499 /**
1500 * Release all rows associated with tree node
1501 */
1502
1503 // only when var-alloc, or else stack will be popped wo/ consideration
1504 // to individual rows
1505 ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1506 ndbassert(treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER);
1507
1508 /**
1509 * Two ways to iterate...
1510 */
1511 if ((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0)
1512 {
1513 jam();
1514 Uint32 cnt = 0;
1515 SLFifoRowListIterator iter;
1516 for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1517 {
1518 jam();
1519 RowRef pos = iter.m_ref;
1520 next(iter);
1521 releaseRow(requestPtr, pos);
1522 cnt ++;
1523 }
1524 treeNodePtr.p->m_row_list.init();
1525 DEBUG("SLFifoRowListIterator: released " << cnt << " rows!");
1526 }
1527 else
1528 {
1529 jam();
1530 Uint32 cnt = 0;
1531 RowMapIterator iter;
1532 for (first(requestPtr, treeNodePtr, iter); !iter.isNull(); )
1533 {
1534 jam();
1535 RowRef pos = iter.m_ref;
1536 // this could be made more efficient by not actually seting up m_row_ptr
1537 next(iter);
1538 releaseRow(requestPtr, pos);
1539 cnt++;
1540 }
1541 treeNodePtr.p->m_row_map.init();
1542 DEBUG("RowMapIterator: released " << cnt << " rows!");
1543 }
1544 }
1545
1546 void
releaseRow(Ptr<Request> requestPtr,RowRef pos)1547 Dbspj::releaseRow(Ptr<Request> requestPtr, RowRef pos)
1548 {
1549 ndbassert(requestPtr.p->m_bits & Request::RT_VAR_ALLOC);
1550 ndbassert(pos.m_allocator == 1);
1551 Ptr<RowPage> ptr;
1552 m_page_pool.getPtr(ptr, pos.m_page_id);
1553 ((Var_page*)ptr.p)->free_record(pos.m_page_pos, Var_page::CHAIN);
1554 Uint32 free_space = ((Var_page*)ptr.p)->free_space;
1555 if (free_space == 0)
1556 {
1557 jam();
1558 LocalDLFifoList<RowPage> list(m_page_pool,
1559 requestPtr.p->m_rowBuffer.m_page_list);
1560 list.remove(ptr);
1561 releasePage(ptr);
1562 }
1563 else if (free_space > requestPtr.p->m_rowBuffer.m_var.m_free)
1564 {
1565 LocalDLFifoList<RowPage> list(m_page_pool,
1566 requestPtr.p->m_rowBuffer.m_page_list);
1567 list.remove(ptr);
1568 list.addLast(ptr);
1569 requestPtr.p->m_rowBuffer.m_var.m_free = free_space;
1570 }
1571 }
1572
1573 void
releaseRequestBuffers(Ptr<Request> requestPtr,bool reset)1574 Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr, bool reset)
1575 {
1576 /**
1577 * Release all pages for request
1578 */
1579 {
1580 {
1581 LocalDLFifoList<RowPage> list(m_page_pool,
1582 requestPtr.p->m_rowBuffer.m_page_list);
1583 if (!list.isEmpty())
1584 {
1585 jam();
1586 Ptr<RowPage> first, last;
1587 list.first(first);
1588 list.last(last);
1589 releasePages(first.i, last);
1590 list.remove();
1591 }
1592 }
1593 requestPtr.p->m_rowBuffer.stack_init();
1594 }
1595
1596 if (reset)
1597 {
1598 Ptr<TreeNode> nodePtr;
1599 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1600 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1601 {
1602 jam();
1603 if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
1604 {
1605 jam();
1606 if (nodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)
1607 {
1608 jam();
1609 nodePtr.p->m_row_map.init();
1610 }
1611 else
1612 {
1613 nodePtr.p->m_row_list.init();
1614 }
1615 }
1616 }
1617 }
1618 }
1619
1620 void
reportBatchComplete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1621 Dbspj::reportBatchComplete(Signal * signal, Ptr<Request> requestPtr,
1622 Ptr<TreeNode> treeNodePtr)
1623 {
1624 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1625 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1626 Dependency_map::ConstDataBufferIterator it;
1627 for (list.first(it); !it.isNull(); list.next(it))
1628 {
1629 jam();
1630 Ptr<TreeNode> childPtr;
1631 m_treenode_pool.getPtr(childPtr, * it.data);
1632 if (childPtr.p->m_bits & TreeNode::T_NEED_REPORT_BATCH_COMPLETED)
1633 {
1634 jam();
1635 ndbrequire(childPtr.p->m_info != 0 &&
1636 childPtr.p->m_info->m_parent_batch_complete !=0 );
1637 (this->*(childPtr.p->m_info->m_parent_batch_complete))(signal,
1638 requestPtr,
1639 childPtr);
1640 }
1641 }
1642 }
1643
1644 void
abort(Signal * signal,Ptr<Request> requestPtr,Uint32 errCode)1645 Dbspj::abort(Signal* signal, Ptr<Request> requestPtr, Uint32 errCode)
1646 {
1647 jam();
1648
1649 if ((requestPtr.p->m_state & Request::RS_ABORTING) != 0)
1650 {
1651 jam();
1652 goto checkcomplete;
1653 }
1654
1655 requestPtr.p->m_state |= Request::RS_ABORTING;
1656 requestPtr.p->m_errCode = errCode;
1657
1658 {
1659 Ptr<TreeNode> nodePtr;
1660 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1661 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1662 {
1663 jam();
1664 /**
1665 * clear T_REPORT_BATCH_COMPLETE so that child nodes don't get confused
1666 * during abort
1667 */
1668 nodePtr.p->m_bits &= ~Uint32(TreeNode::T_REPORT_BATCH_COMPLETE);
1669
1670 ndbrequire(nodePtr.p->m_info != 0);
1671 if (nodePtr.p->m_info->m_abort != 0)
1672 {
1673 jam();
1674 (this->*(nodePtr.p->m_info->m_abort))(signal, requestPtr, nodePtr);
1675 }
1676 }
1677 }
1678
1679 checkcomplete:
1680 checkBatchComplete(signal, requestPtr, 0);
1681 }
1682
1683 Uint32
nodeFail(Signal * signal,Ptr<Request> requestPtr,NdbNodeBitmask nodes)1684 Dbspj::nodeFail(Signal* signal, Ptr<Request> requestPtr,
1685 NdbNodeBitmask nodes)
1686 {
1687 Uint32 cnt = 0;
1688 Uint32 iter = 0;
1689
1690 {
1691 Ptr<TreeNode> nodePtr;
1692 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1693 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1694 {
1695 jam();
1696 ndbrequire(nodePtr.p->m_info != 0);
1697 if (nodePtr.p->m_info->m_execNODE_FAILREP != 0)
1698 {
1699 jam();
1700 iter ++;
1701 cnt += (this->*(nodePtr.p->m_info->m_execNODE_FAILREP))(signal,
1702 requestPtr,
1703 nodePtr, nodes);
1704 }
1705 }
1706 }
1707
1708 if (cnt == 0)
1709 {
1710 jam();
1711 /**
1712 * None of the operations needed NodeFailRep "action"
1713 * check if our TC has died...but...only needed in
1714 * scan case...for lookup...not so...
1715 */
1716 if (requestPtr.p->isScan() &&
1717 nodes.get(refToNode(requestPtr.p->m_senderRef)))
1718 {
1719 jam();
1720 abort(signal, requestPtr, DbspjErr::NodeFailure);
1721 }
1722 }
1723 else
1724 {
1725 jam();
1726 abort(signal, requestPtr, DbspjErr::NodeFailure);
1727 }
1728
1729 return cnt + iter;
1730 }
1731
1732 void
complete(Signal * signal,Ptr<Request> requestPtr)1733 Dbspj::complete(Signal* signal, Ptr<Request> requestPtr)
1734 {
1735 /**
1736 * we need to run complete-phase before sending last SCAN_FRAGCONF
1737 */
1738 Uint32 flags = requestPtr.p->m_state &
1739 (Request::RS_ABORTING | Request::RS_WAITING);
1740
1741 requestPtr.p->m_state = Request::RS_COMPLETING | flags;
1742
1743 // clear bit so that next batchComplete()
1744 // will continue to cleanup
1745 ndbassert((requestPtr.p->m_bits & Request::RT_NEED_COMPLETE) != 0);
1746 requestPtr.p->m_bits &= ~(Uint32)Request::RT_NEED_COMPLETE;
1747 requestPtr.p->m_outstanding = 0;
1748 {
1749 Ptr<TreeNode> nodePtr;
1750 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1751 for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
1752 {
1753 jam();
1754 ndbrequire(nodePtr.p->m_info != 0);
1755 if (nodePtr.p->m_info->m_complete != 0)
1756 {
1757 jam();
1758 (this->*(nodePtr.p->m_info->m_complete))(signal, requestPtr, nodePtr);
1759 }
1760 }
1761
1762 /**
1763 * preferably RT_NEED_COMPLETE should only be set if blocking
1764 * calls are used, in which case m_outstanding should have been increased
1765 *
1766 * BUT: scanIndex does DIH_SCAN_TAB_COMPLETE_REP which does not send reply
1767 * so it not really "blocking"
1768 * i.e remove assert
1769 */
1770 //ndbassert(requestPtr.p->m_outstanding);
1771 }
1772 checkBatchComplete(signal, requestPtr, 0);
1773 }
1774
1775 void
cleanup(Ptr<Request> requestPtr)1776 Dbspj::cleanup(Ptr<Request> requestPtr)
1777 {
1778 ndbrequire(requestPtr.p->m_cnt_active == 0);
1779 {
1780 Ptr<TreeNode> nodePtr;
1781 Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
1782 for (list.first(nodePtr); !nodePtr.isNull(); )
1783 {
1784 jam();
1785 ndbrequire(nodePtr.p->m_info != 0 && nodePtr.p->m_info->m_cleanup != 0);
1786 (this->*(nodePtr.p->m_info->m_cleanup))(requestPtr, nodePtr);
1787
1788 Ptr<TreeNode> tmp = nodePtr;
1789 list.next(nodePtr);
1790 m_treenode_pool.release(tmp);
1791 }
1792 list.remove();
1793 }
1794 if (requestPtr.p->isScan())
1795 {
1796 jam();
1797
1798 if (unlikely((requestPtr.p->m_state & Request::RS_WAITING) != 0))
1799 {
1800 jam();
1801 requestPtr.p->m_state = Request::RS_ABORTED;
1802 return;
1803 }
1804
1805 #ifdef VM_TRACE
1806 {
1807 Request key;
1808 key.m_transId[0] = requestPtr.p->m_transId[0];
1809 key.m_transId[1] = requestPtr.p->m_transId[1];
1810 key.m_senderData = requestPtr.p->m_senderData;
1811 Ptr<Request> tmp;
1812 ndbrequire(m_scan_request_hash.find(tmp, key));
1813 }
1814 #endif
1815 m_scan_request_hash.remove(requestPtr);
1816 }
1817 else
1818 {
1819 jam();
1820 #ifdef VM_TRACE
1821 {
1822 Request key;
1823 key.m_transId[0] = requestPtr.p->m_transId[0];
1824 key.m_transId[1] = requestPtr.p->m_transId[1];
1825 key.m_senderData = requestPtr.p->m_senderData;
1826 Ptr<Request> tmp;
1827 ndbrequire(m_lookup_request_hash.find(tmp, key));
1828 }
1829 #endif
1830 m_lookup_request_hash.remove(requestPtr);
1831 }
1832 releaseRequestBuffers(requestPtr, false);
1833 ArenaHead ah = requestPtr.p->m_arena;
1834 m_request_pool.release(requestPtr);
1835 m_arenaAllocator.release(ah);
1836 }
1837
1838 void
cleanup_common(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)1839 Dbspj::cleanup_common(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
1840 {
1841 jam();
1842
1843 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
1844 {
1845 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
1846 list.release();
1847 }
1848
1849 {
1850 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
1851 pattern.release();
1852 }
1853
1854 {
1855 Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
1856 pattern.release();
1857 }
1858
1859 if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL)
1860 {
1861 jam();
1862 releaseSection(treeNodePtr.p->m_send.m_keyInfoPtrI);
1863 }
1864
1865 if (treeNodePtr.p->m_send.m_attrInfoPtrI != RNIL)
1866 {
1867 jam();
1868 releaseSection(treeNodePtr.p->m_send.m_attrInfoPtrI);
1869 }
1870 }
1871
1872 /**
1873 * Processing of signals from LQH
1874 */
1875 void
execLQHKEYREF(Signal * signal)1876 Dbspj::execLQHKEYREF(Signal* signal)
1877 {
1878 jamEntry();
1879
1880 const LqhKeyRef* ref = reinterpret_cast<const LqhKeyRef*>(signal->getDataPtr());
1881
1882 DEBUG("execLQHKEYREF, errorCode:" << ref->errorCode);
1883 Ptr<TreeNode> treeNodePtr;
1884 m_treenode_pool.getPtr(treeNodePtr, ref->connectPtr);
1885
1886 Ptr<Request> requestPtr;
1887 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1888
1889 ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYREF);
1890 (this->*(treeNodePtr.p->m_info->m_execLQHKEYREF))(signal,
1891 requestPtr,
1892 treeNodePtr);
1893 }
1894
1895 void
execLQHKEYCONF(Signal * signal)1896 Dbspj::execLQHKEYCONF(Signal* signal)
1897 {
1898 jamEntry();
1899
1900 DEBUG("execLQHKEYCONF");
1901
1902 const LqhKeyConf* conf = reinterpret_cast<const LqhKeyConf*>(signal->getDataPtr());
1903 Ptr<TreeNode> treeNodePtr;
1904 m_treenode_pool.getPtr(treeNodePtr, conf->opPtr);
1905
1906 Ptr<Request> requestPtr;
1907 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1908
1909 ndbrequire(treeNodePtr.p->m_info && treeNodePtr.p->m_info->m_execLQHKEYCONF);
1910 (this->*(treeNodePtr.p->m_info->m_execLQHKEYCONF))(signal,
1911 requestPtr,
1912 treeNodePtr);
1913 }
1914
1915 void
execSCAN_FRAGREF(Signal * signal)1916 Dbspj::execSCAN_FRAGREF(Signal* signal)
1917 {
1918 jamEntry();
1919 const ScanFragRef* ref = reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
1920
1921 DEBUG("execSCAN_FRAGREF, errorCode:" << ref->errorCode);
1922
1923 Ptr<ScanFragHandle> scanFragHandlePtr;
1924 m_scanfraghandle_pool.getPtr(scanFragHandlePtr, ref->senderData);
1925 Ptr<TreeNode> treeNodePtr;
1926 m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1927 Ptr<Request> requestPtr;
1928 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1929
1930 ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGREF);
1931 (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGREF))(signal,
1932 requestPtr,
1933 treeNodePtr,
1934 scanFragHandlePtr);
1935 }
1936
1937 void
execSCAN_HBREP(Signal * signal)1938 Dbspj::execSCAN_HBREP(Signal* signal)
1939 {
1940 jamEntry();
1941
1942 Uint32 senderData = signal->theData[0];
1943 //Uint32 transId[2] = { signal->theData[1], signal->theData[2] };
1944
1945 Ptr<ScanFragHandle> scanFragHandlePtr;
1946 m_scanfraghandle_pool.getPtr(scanFragHandlePtr, senderData);
1947 Ptr<TreeNode> treeNodePtr;
1948 m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1949 Ptr<Request> requestPtr;
1950 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1951
1952 Uint32 ref = requestPtr.p->m_senderRef;
1953 signal->theData[0] = requestPtr.p->m_senderData;
1954 sendSignal(ref, GSN_SCAN_HBREP, signal, 3, JBB);
1955 }
1956
1957 void
execSCAN_FRAGCONF(Signal * signal)1958 Dbspj::execSCAN_FRAGCONF(Signal* signal)
1959 {
1960 jamEntry();
1961 DEBUG("execSCAN_FRAGCONF");
1962
1963 const ScanFragConf* conf = reinterpret_cast<const ScanFragConf*>(signal->getDataPtr());
1964
1965 #ifdef DEBUG_SCAN_FRAGREQ
1966 ndbout_c("Dbspj::execSCAN_FRAGCONF() receiveing SCAN_FRAGCONF ");
1967 printSCAN_FRAGCONF(stdout, signal->getDataPtrSend(),
1968 conf->total_len,
1969 DBLQH);
1970 #endif
1971
1972 Ptr<ScanFragHandle> scanFragHandlePtr;
1973 m_scanfraghandle_pool.getPtr(scanFragHandlePtr, conf->senderData);
1974 Ptr<TreeNode> treeNodePtr;
1975 m_treenode_pool.getPtr(treeNodePtr, scanFragHandlePtr.p->m_treeNodePtrI);
1976 Ptr<Request> requestPtr;
1977 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
1978
1979 ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execSCAN_FRAGCONF);
1980 (this->*(treeNodePtr.p->m_info->m_execSCAN_FRAGCONF))(signal,
1981 requestPtr,
1982 treeNodePtr,
1983 scanFragHandlePtr);
1984 }
1985
1986 void
execSCAN_NEXTREQ(Signal * signal)1987 Dbspj::execSCAN_NEXTREQ(Signal* signal)
1988 {
1989 jamEntry();
1990 const ScanFragNextReq * req = (ScanFragNextReq*)&signal->theData[0];
1991
1992 DEBUG("Incomming SCAN_NEXTREQ");
1993 #ifdef DEBUG_SCAN_FRAGREQ
1994 printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
1995 ScanFragNextReq::SignalLength, DBLQH);
1996 #endif
1997
1998 Request key;
1999 key.m_transId[0] = req->transId1;
2000 key.m_transId[1] = req->transId2;
2001 key.m_senderData = req->senderData;
2002
2003 Ptr<Request> requestPtr;
2004 if (unlikely(!m_scan_request_hash.find(requestPtr, key)))
2005 {
2006 jam();
2007 ndbrequire(req->requestInfo == ScanFragNextReq::ZCLOSE);
2008 return;
2009 }
2010
2011 #ifdef SPJ_TRACE_TIME
2012 Uint64 now = spj_now();
2013 Uint64 then = requestPtr.p->m_save_time;
2014 requestPtr.p->m_sum_waiting += Uint32(now - then);
2015 requestPtr.p->m_save_time = now;
2016 #endif
2017
2018 Uint32 state = requestPtr.p->m_state;
2019 requestPtr.p->m_state = state & ~Uint32(Request::RS_WAITING);
2020
2021 if (unlikely(state == Request::RS_ABORTED))
2022 {
2023 jam();
2024 batchComplete(signal, requestPtr);
2025 return;
2026 }
2027
2028 if (unlikely((state & Request::RS_ABORTING) != 0))
2029 {
2030 jam();
2031 /**
2032 * abort is already in progress...
2033 * since RS_WAITING is cleared...it will end this request
2034 */
2035 return;
2036 }
2037
2038 if (req->requestInfo == ScanFragNextReq::ZCLOSE) // Requested close scan
2039 {
2040 jam();
2041 abort(signal, requestPtr, 0);
2042 return;
2043 }
2044
2045 ndbrequire((state & Request::RS_WAITING) != 0);
2046 ndbrequire(requestPtr.p->m_outstanding == 0);
2047
2048 {
2049 /**
2050 * Scroll all relevant cursors...
2051 */
2052 Ptr<TreeNode> treeNodePtr;
2053 Local_TreeNodeCursor_list list(m_treenode_pool,
2054 requestPtr.p->m_cursor_nodes);
2055 Uint32 cnt_active = 0;
2056
2057 for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
2058 {
2059 if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
2060 {
2061 jam();
2062 DEBUG("SCAN_NEXTREQ on TreeNode: " << treeNodePtr.i
2063 << ", m_node_no: " << treeNodePtr.p->m_node_no
2064 << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2065
2066 ndbrequire(treeNodePtr.p->m_info != 0 &&
2067 treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
2068 (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
2069 requestPtr,
2070 treeNodePtr);
2071 cnt_active++;
2072 }
2073 else
2074 {
2075 /**
2076 * Restart any other scans not being 'TN_ACTIVE'
2077 * (Only effective if 'RT_REPEAT_SCAN_RESULT')
2078 */
2079 jam();
2080 ndbrequire(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT);
2081 DEBUG(" Restart TreeNode: " << treeNodePtr.i
2082 << ", m_node_no: " << treeNodePtr.p->m_node_no
2083 << ", w/ m_parentPtrI: " << treeNodePtr.p->m_parentPtrI);
2084
2085 ndbrequire(treeNodePtr.p->m_info != 0 &&
2086 treeNodePtr.p->m_info->m_parent_batch_complete !=0 );
2087 (this->*(treeNodePtr.p->m_info->m_parent_batch_complete))(signal,
2088 requestPtr,
2089 treeNodePtr);
2090 }
2091 }
2092 /* Expected only a single ACTIVE TreeNode among the cursors */
2093 ndbrequire(cnt_active == 1 ||
2094 !(requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT));
2095 }
2096 }
2097
2098 void
execTRANSID_AI(Signal * signal)2099 Dbspj::execTRANSID_AI(Signal* signal)
2100 {
2101 jamEntry();
2102 DEBUG("execTRANSID_AI");
2103 TransIdAI * req = (TransIdAI *)signal->getDataPtr();
2104 Uint32 ptrI = req->connectPtr;
2105 //Uint32 transId[2] = { req->transId[0], req->transId[1] };
2106
2107 Ptr<TreeNode> treeNodePtr;
2108 m_treenode_pool.getPtr(treeNodePtr, ptrI);
2109 Ptr<Request> requestPtr;
2110 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
2111
2112 ndbrequire(signal->getNoOfSections() != 0); // TODO check if this can happen
2113
2114 SegmentedSectionPtr dataPtr;
2115 {
2116 SectionHandle handle(this, signal);
2117 handle.getSection(dataPtr, 0);
2118 handle.clear();
2119 }
2120
2121 #if defined(DEBUG_LQHKEYREQ) || defined(DEBUG_SCAN_FRAGREQ)
2122 printf("execTRANSID_AI: ");
2123 print(dataPtr, stdout);
2124 #endif
2125
2126 /**
2127 * build easy-access-array for row
2128 */
2129 Uint32 tmp[2+MAX_ATTRIBUTES_IN_TABLE];
2130 RowPtr::Header* header = CAST_PTR(RowPtr::Header, &tmp[0]);
2131
2132 Uint32 cnt = buildRowHeader(header, dataPtr);
2133 ndbassert(header->m_len < NDB_ARRAY_SIZE(tmp));
2134
2135 struct RowPtr row;
2136 row.m_type = RowPtr::RT_SECTION;
2137 row.m_src_node_ptrI = treeNodePtr.i;
2138 row.m_row_data.m_section.m_header = header;
2139 row.m_row_data.m_section.m_dataPtr.assign(dataPtr);
2140
2141 getCorrelationData(row.m_row_data.m_section,
2142 cnt - 1,
2143 row.m_src_correlation);
2144
2145 if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
2146 {
2147 jam();
2148 Uint32 err = storeRow(requestPtr, treeNodePtr, row);
2149 ndbrequire(err == 0);
2150 }
2151
2152 ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execTRANSID_AI);
2153
2154 (this->*(treeNodePtr.p->m_info->m_execTRANSID_AI))(signal,
2155 requestPtr,
2156 treeNodePtr,
2157 row);
2158 release(dataPtr);
2159 }
2160
2161 Uint32
storeRow(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,RowPtr & row)2162 Dbspj::storeRow(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr, RowPtr &row)
2163 {
2164 ndbassert(row.m_type == RowPtr::RT_SECTION);
2165 SegmentedSectionPtr dataPtr = row.m_row_data.m_section.m_dataPtr;
2166 Uint32 * headptr = (Uint32*)row.m_row_data.m_section.m_header;
2167 Uint32 headlen = 1 + row.m_row_data.m_section.m_header->m_len;
2168
2169 /**
2170 * If rows are not in map, then they are kept in linked list
2171 */
2172 Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
2173 0 : 2;
2174
2175 Uint32 totlen = 0;
2176 totlen += dataPtr.sz;
2177 totlen += headlen;
2178 totlen += linklen;
2179
2180 RowRef ref;
2181 Uint32 * dstptr = 0;
2182 if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2183 {
2184 jam();
2185 dstptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2186 }
2187 else
2188 {
2189 jam();
2190 dstptr = varAlloc(requestPtr.p->m_rowBuffer, ref, totlen);
2191 }
2192
2193 if (unlikely(dstptr == 0))
2194 {
2195 jam();
2196 return DbspjErr::OutOfRowMemory;
2197 }
2198
2199 row.m_type = RowPtr::RT_LINEAR;
2200 row.m_row_data.m_linear.m_row_ref = ref;
2201 row.m_row_data.m_linear.m_header = (RowPtr::Header*)(dstptr + linklen);
2202 row.m_row_data.m_linear.m_data = dstptr + linklen + headlen;
2203
2204 memcpy(dstptr + linklen, headptr, 4 * headlen);
2205 copy(dstptr + linklen + headlen, dataPtr);
2206
2207 if (linklen)
2208 {
2209 jam();
2210 NullRowRef.copyto_link(dstptr); // Null terminate list...
2211 add_to_list(treeNodePtr.p->m_row_list, ref);
2212 }
2213 else
2214 {
2215 jam();
2216 return add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
2217 }
2218
2219 return 0;
2220 }
2221
2222 void
setupRowPtr(Ptr<TreeNode> treeNodePtr,RowPtr & row,RowRef ref,const Uint32 * src)2223 Dbspj::setupRowPtr(Ptr<TreeNode> treeNodePtr,
2224 RowPtr& row, RowRef ref, const Uint32 * src)
2225 {
2226 Uint32 linklen = (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP)?
2227 0 : 2;
2228 const RowPtr::Header * headptr = (RowPtr::Header*)(src + linklen);
2229 Uint32 headlen = 1 + headptr->m_len;
2230
2231 row.m_type = RowPtr::RT_LINEAR;
2232 row.m_row_data.m_linear.m_row_ref = ref;
2233 row.m_row_data.m_linear.m_header = headptr;
2234 row.m_row_data.m_linear.m_data = (Uint32*)headptr + headlen;
2235 }
2236
2237 void
add_to_list(SLFifoRowList & list,RowRef rowref)2238 Dbspj::add_to_list(SLFifoRowList & list, RowRef rowref)
2239 {
2240 if (list.isNull())
2241 {
2242 jam();
2243 list.m_first_row_page_id = rowref.m_page_id;
2244 list.m_first_row_page_pos = rowref.m_page_pos;
2245 }
2246 else
2247 {
2248 jam();
2249 /**
2250 * add last to list
2251 */
2252 RowRef last;
2253 last.m_allocator = rowref.m_allocator;
2254 last.m_page_id = list.m_last_row_page_id;
2255 last.m_page_pos = list.m_last_row_page_pos;
2256 Uint32 * rowptr;
2257 if (rowref.m_allocator == 0)
2258 {
2259 jam();
2260 rowptr = get_row_ptr_stack(last);
2261 }
2262 else
2263 {
2264 jam();
2265 rowptr = get_row_ptr_var(last);
2266 }
2267 rowref.copyto_link(rowptr);
2268 }
2269
2270 list.m_last_row_page_id = rowref.m_page_id;
2271 list.m_last_row_page_pos = rowref.m_page_pos;
2272 }
2273
2274 Uint32 *
get_row_ptr_stack(RowRef pos)2275 Dbspj::get_row_ptr_stack(RowRef pos)
2276 {
2277 ndbassert(pos.m_allocator == 0);
2278 Ptr<RowPage> ptr;
2279 m_page_pool.getPtr(ptr, pos.m_page_id);
2280 return ptr.p->m_data + pos.m_page_pos;
2281 }
2282
2283 Uint32 *
get_row_ptr_var(RowRef pos)2284 Dbspj::get_row_ptr_var(RowRef pos)
2285 {
2286 ndbassert(pos.m_allocator == 1);
2287 Ptr<RowPage> ptr;
2288 m_page_pool.getPtr(ptr, pos.m_page_id);
2289 return ((Var_page*)ptr.p)->get_ptr(pos.m_page_pos);
2290 }
2291
2292 bool
first(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,SLFifoRowListIterator & iter)2293 Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2294 SLFifoRowListIterator& iter)
2295 {
2296 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2297 SLFifoRowList & list = treeNodePtr.p->m_row_list;
2298 if (list.isNull())
2299 {
2300 jam();
2301 iter.setNull();
2302 return false;
2303 }
2304
2305 iter.m_ref.m_allocator = var;
2306 iter.m_ref.m_page_id = list.m_first_row_page_id;
2307 iter.m_ref.m_page_pos = list.m_first_row_page_pos;
2308 if (var == 0)
2309 {
2310 jam();
2311 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2312 }
2313 else
2314 {
2315 jam();
2316 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2317 }
2318
2319 return true;
2320 }
2321
2322 bool
next(SLFifoRowListIterator & iter)2323 Dbspj::next(SLFifoRowListIterator& iter)
2324 {
2325 iter.m_ref.assign_from_link(iter.m_row_ptr);
2326 if (iter.m_ref.isNull())
2327 {
2328 jam();
2329 return false;
2330 }
2331
2332 if (iter.m_ref.m_allocator == 0)
2333 {
2334 jam();
2335 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2336 }
2337 else
2338 {
2339 jam();
2340 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2341 }
2342 return true;
2343 }
2344
2345 bool
next(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,SLFifoRowListIterator & iter,SLFifoRowListIteratorPtr start)2346 Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2347 SLFifoRowListIterator& iter, SLFifoRowListIteratorPtr start)
2348 {
2349 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2350 (void)var;
2351 ndbassert(var == iter.m_ref.m_allocator);
2352 if (iter.m_ref.m_allocator == 0)
2353 {
2354 jam();
2355 iter.m_row_ptr = get_row_ptr_stack(start.m_ref);
2356 }
2357 else
2358 {
2359 jam();
2360 iter.m_row_ptr = get_row_ptr_var(start.m_ref);
2361 }
2362 return next(iter);
2363 }
2364
2365 Uint32
add_to_map(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Uint32 corrVal,RowRef rowref)2366 Dbspj::add_to_map(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2367 Uint32 corrVal, RowRef rowref)
2368 {
2369 Uint32 * mapptr;
2370 RowMap& map = treeNodePtr.p->m_row_map;
2371 if (map.isNull())
2372 {
2373 jam();
2374 Uint16 batchsize = treeNodePtr.p->m_batch_size;
2375 Uint32 sz16 = RowMap::MAP_SIZE_PER_REF_16 * batchsize;
2376 Uint32 sz32 = (sz16 + 1) / 2;
2377 RowRef ref;
2378 if ((requestPtr.p->m_bits & Request::RT_VAR_ALLOC) == 0)
2379 {
2380 jam();
2381 mapptr = stackAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2382 }
2383 else
2384 {
2385 jam();
2386 mapptr = varAlloc(requestPtr.p->m_rowBuffer, ref, sz32);
2387 }
2388 if (unlikely(mapptr == 0))
2389 {
2390 jam();
2391 return DbspjErr::OutOfRowMemory;
2392 }
2393 map.assign(ref);
2394 map.m_elements = 0;
2395 map.m_size = batchsize;
2396 map.clear(mapptr);
2397 }
2398 else
2399 {
2400 jam();
2401 RowRef ref;
2402 map.copyto(ref);
2403 if (ref.m_allocator == 0)
2404 {
2405 jam();
2406 mapptr = get_row_ptr_stack(ref);
2407 }
2408 else
2409 {
2410 jam();
2411 mapptr = get_row_ptr_var(ref);
2412 }
2413 }
2414
2415 Uint32 pos = corrVal & 0xFFFF;
2416 ndbrequire(pos < map.m_size);
2417 ndbrequire(map.m_elements < map.m_size);
2418
2419 if (1)
2420 {
2421 /**
2422 * Check that *pos* is empty
2423 */
2424 RowRef check;
2425 map.load(mapptr, pos, check);
2426 ndbrequire(check.m_page_pos == 0xFFFF);
2427 }
2428
2429 map.store(mapptr, pos, rowref);
2430
2431 return 0;
2432 }
2433
2434 bool
first(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,RowMapIterator & iter)2435 Dbspj::first(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2436 RowMapIterator & iter)
2437 {
2438 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2439 RowMap& map = treeNodePtr.p->m_row_map;
2440 if (map.isNull())
2441 {
2442 jam();
2443 iter.setNull();
2444 return false;
2445 }
2446
2447 if (var == 0)
2448 {
2449 jam();
2450 iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2451 }
2452 else
2453 {
2454 jam();
2455 iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2456 }
2457 iter.m_size = map.m_size;
2458 iter.m_ref.m_allocator = var;
2459
2460 Uint32 pos = 0;
2461 while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2462 pos++;
2463
2464 if (pos == iter.m_size)
2465 {
2466 jam();
2467 iter.setNull();
2468 return false;
2469 }
2470 else
2471 {
2472 jam();
2473 RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2474 iter.m_element_no = pos;
2475 if (var == 0)
2476 {
2477 jam();
2478 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2479 }
2480 else
2481 {
2482 jam();
2483 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2484 }
2485 return true;
2486 }
2487 }
2488
2489 bool
next(RowMapIterator & iter)2490 Dbspj::next(RowMapIterator & iter)
2491 {
2492 Uint32 pos = iter.m_element_no + 1;
2493 while (RowMap::isNull(iter.m_map_ptr, pos) && pos < iter.m_size)
2494 pos++;
2495
2496 if (pos == iter.m_size)
2497 {
2498 jam();
2499 iter.setNull();
2500 return false;
2501 }
2502 else
2503 {
2504 jam();
2505 RowMap::load(iter.m_map_ptr, pos, iter.m_ref);
2506 iter.m_element_no = pos;
2507 if (iter.m_ref.m_allocator == 0)
2508 {
2509 jam();
2510 iter.m_row_ptr = get_row_ptr_stack(iter.m_ref);
2511 }
2512 else
2513 {
2514 jam();
2515 iter.m_row_ptr = get_row_ptr_var(iter.m_ref);
2516 }
2517 return true;
2518 }
2519 }
2520
2521 bool
next(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,RowMapIterator & iter,RowMapIteratorPtr start)2522 Dbspj::next(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr,
2523 RowMapIterator & iter, RowMapIteratorPtr start)
2524 {
2525 Uint32 var = (requestPtr.p->m_bits & Request::RT_VAR_ALLOC) != 0;
2526 RowMap& map = treeNodePtr.p->m_row_map;
2527 ndbrequire(!map.isNull());
2528
2529 if (var == 0)
2530 {
2531 jam();
2532 iter.m_map_ptr = get_row_ptr_stack(map.m_map_ref);
2533 }
2534 else
2535 {
2536 jam();
2537 iter.m_map_ptr = get_row_ptr_var(map.m_map_ref);
2538 }
2539 iter.m_size = map.m_size;
2540
2541 RowMap::load(iter.m_map_ptr, start.m_element_no, iter.m_ref);
2542 iter.m_element_no = start.m_element_no;
2543 return next(iter);
2544 }
2545
2546 Uint32 *
stackAlloc(RowBuffer & buffer,RowRef & dst,Uint32 sz)2547 Dbspj::stackAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2548 {
2549 Ptr<RowPage> ptr;
2550 LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list);
2551
2552 Uint32 pos = buffer.m_stack.m_pos;
2553 const Uint32 SIZE = RowPage::SIZE;
2554 if (list.isEmpty() || (pos + sz) > SIZE)
2555 {
2556 jam();
2557 bool ret = allocPage(ptr);
2558 if (unlikely(ret == false))
2559 {
2560 jam();
2561 return 0;
2562 }
2563
2564 pos = 0;
2565 list.addLast(ptr);
2566 }
2567 else
2568 {
2569 list.last(ptr);
2570 }
2571
2572 dst.m_page_id = ptr.i;
2573 dst.m_page_pos = pos;
2574 dst.m_allocator = 0;
2575 buffer.m_stack.m_pos = pos + sz;
2576 return ptr.p->m_data + pos;
2577 }
2578
2579 Uint32 *
varAlloc(RowBuffer & buffer,RowRef & dst,Uint32 sz)2580 Dbspj::varAlloc(RowBuffer & buffer, RowRef& dst, Uint32 sz)
2581 {
2582 Ptr<RowPage> ptr;
2583 LocalDLFifoList<RowPage> list(m_page_pool, buffer.m_page_list);
2584
2585 Uint32 free_space = buffer.m_var.m_free;
2586 if (list.isEmpty() || free_space < (sz + 1))
2587 {
2588 jam();
2589 bool ret = allocPage(ptr);
2590 if (unlikely(ret == false))
2591 {
2592 jam();
2593 return 0;
2594 }
2595
2596 list.addLast(ptr);
2597 ((Var_page*)ptr.p)->init();
2598 }
2599 else
2600 {
2601 jam();
2602 list.last(ptr);
2603 }
2604
2605 Var_page * vp = (Var_page*)ptr.p;
2606 Uint32 pos = vp->alloc_record(sz, (Var_page*)m_buffer0, Var_page::CHAIN);
2607
2608 dst.m_page_id = ptr.i;
2609 dst.m_page_pos = pos;
2610 dst.m_allocator = 1;
2611 buffer.m_var.m_free = vp->free_space;
2612 return vp->get_ptr(pos);
2613 }
2614
2615 bool
allocPage(Ptr<RowPage> & ptr)2616 Dbspj::allocPage(Ptr<RowPage> & ptr)
2617 {
2618 if (m_free_page_list.firstItem == RNIL)
2619 {
2620 jam();
2621 ptr.p = (RowPage*)m_ctx.m_mm.alloc_page(RT_SPJ_DATABUFFER,
2622 &ptr.i,
2623 Ndbd_mem_manager::NDB_ZONE_ANY);
2624 if (ptr.p == 0)
2625 {
2626 return false;
2627 }
2628 return true;
2629 }
2630 else
2631 {
2632 jam();
2633 LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2634 bool ret = list.remove_front(ptr);
2635 ndbrequire(ret);
2636 return ret;
2637 }
2638 }
2639
2640 void
releasePage(Ptr<RowPage> ptr)2641 Dbspj::releasePage(Ptr<RowPage> ptr)
2642 {
2643 LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2644 list.add(ptr);
2645 }
2646
2647 void
releasePages(Uint32 first,Ptr<RowPage> last)2648 Dbspj::releasePages(Uint32 first, Ptr<RowPage> last)
2649 {
2650 LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2651 list.add(first, last);
2652 }
2653
2654 void
releaseGlobal(Signal * signal)2655 Dbspj::releaseGlobal(Signal * signal)
2656 {
2657 Uint32 delay = 100;
2658 LocalSLList<RowPage> list(m_page_pool, m_free_page_list);
2659 if (list.empty())
2660 {
2661 jam();
2662 delay = 300;
2663 }
2664 else
2665 {
2666 Ptr<RowPage> ptr;
2667 list.remove_front(ptr);
2668 m_ctx.m_mm.release_page(RT_SPJ_DATABUFFER, ptr.i);
2669 }
2670
2671 signal->theData[0] = 0;
2672 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay, 1);
2673 }
2674
2675 /**
2676 * END - MODULE GENERIC
2677 */
2678
2679 /**
2680 * MODULE LOOKUP
2681 */
2682 const Dbspj::OpInfo
2683 Dbspj::g_LookupOpInfo =
2684 {
2685 &Dbspj::lookup_build,
2686 0, // prepare
2687 &Dbspj::lookup_start,
2688 &Dbspj::lookup_execTRANSID_AI,
2689 &Dbspj::lookup_execLQHKEYREF,
2690 &Dbspj::lookup_execLQHKEYCONF,
2691 0, // execSCAN_FRAGREF
2692 0, // execSCAN_FRAGCONF
2693 &Dbspj::lookup_parent_row,
2694 &Dbspj::lookup_parent_batch_complete,
2695 0, // Dbspj::lookup_parent_batch_repeat,
2696 0, // Dbspj::lookup_parent_batch_cleanup,
2697 0, // Dbspj::lookup_execSCAN_NEXTREQ
2698 0, // Dbspj::lookup_complete
2699 &Dbspj::lookup_abort,
2700 &Dbspj::lookup_execNODE_FAILREP,
2701 &Dbspj::lookup_cleanup
2702 };
2703
2704 Uint32
lookup_build(Build_context & ctx,Ptr<Request> requestPtr,const QueryNode * qn,const QueryNodeParameters * qp)2705 Dbspj::lookup_build(Build_context& ctx,
2706 Ptr<Request> requestPtr,
2707 const QueryNode* qn,
2708 const QueryNodeParameters* qp)
2709 {
2710 Uint32 err = 0;
2711 Ptr<TreeNode> treeNodePtr;
2712 const QN_LookupNode * node = (const QN_LookupNode*)qn;
2713 const QN_LookupParameters * param = (const QN_LookupParameters*)qp;
2714 do
2715 {
2716 err = createNode(ctx, requestPtr, treeNodePtr);
2717 if (unlikely(err != 0))
2718 {
2719 DEBUG_CRASH();
2720 break;
2721 }
2722
2723 treeNodePtr.p->m_info = &g_LookupOpInfo;
2724 Uint32 transId1 = requestPtr.p->m_transId[0];
2725 Uint32 transId2 = requestPtr.p->m_transId[1];
2726 Uint32 savePointId = ctx.m_savepointId;
2727
2728 Uint32 treeBits = node->requestInfo;
2729 Uint32 paramBits = param->requestInfo;
2730 //ndbout_c("Dbspj::lookup_build() treeBits=%.8x paramBits=%.8x",
2731 // treeBits, paramBits);
2732 LqhKeyReq* dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
2733 {
2734 /**
2735 * static variables
2736 */
2737 dst->tcBlockref = reference();
2738 dst->clientConnectPtr = treeNodePtr.i;
2739
2740 /**
2741 * TODO reference()+treeNodePtr.i is passed twice
2742 * this can likely be optimized using the requestInfo-bits
2743 * UPDATE: This can be accomplished by *not* setApplicationAddressFlag
2744 * and patch LQH to then instead use tcBlockref/clientConnectPtr
2745 */
2746 dst->transId1 = transId1;
2747 dst->transId2 = transId2;
2748 dst->savePointId = savePointId;
2749 dst->scanInfo = 0;
2750 dst->attrLen = 0;
2751 /** Initialy set reply ref to client, do_send will set SPJ refs if non-LEAF */
2752 dst->variableData[0] = ctx.m_resultRef;
2753 dst->variableData[1] = param->resultData;
2754 Uint32 requestInfo = 0;
2755 LqhKeyReq::setOperation(requestInfo, ZREAD);
2756 LqhKeyReq::setApplicationAddressFlag(requestInfo, 1);
2757 LqhKeyReq::setDirtyFlag(requestInfo, 1);
2758 LqhKeyReq::setSimpleFlag(requestInfo, 1);
2759 LqhKeyReq::setNormalProtocolFlag(requestInfo, 0); // Assume T_LEAF
2760 LqhKeyReq::setCorrFactorFlag(requestInfo, 1);
2761 LqhKeyReq::setNoDiskFlag(requestInfo,
2762 (treeBits & DABits::NI_LINKED_DISK) == 0 &&
2763 (paramBits & DABits::PI_DISK_ATTR) == 0);
2764 dst->requestInfo = requestInfo;
2765 }
2766
2767 err = DbspjErr::InvalidTreeNodeSpecification;
2768 if (unlikely(node->len < QN_LookupNode::NodeSize))
2769 {
2770 DEBUG_CRASH();
2771 break;
2772 }
2773
2774 if (treeBits & QN_LookupNode::L_UNIQUE_INDEX)
2775 {
2776 jam();
2777 treeNodePtr.p->m_bits |= TreeNode::T_UNIQUE_INDEX_LOOKUP;
2778 }
2779
2780 Uint32 tableId = node->tableId;
2781 Uint32 schemaVersion = node->tableVersion;
2782
2783 Uint32 tableSchemaVersion = tableId + ((schemaVersion << 16) & 0xFFFF0000);
2784 dst->tableSchemaVersion = tableSchemaVersion;
2785
2786 err = DbspjErr::InvalidTreeParametersSpecification;
2787 DEBUG("param len: " << param->len);
2788 if (unlikely(param->len < QN_LookupParameters::NodeSize))
2789 {
2790 DEBUG_CRASH();
2791 break;
2792 }
2793
2794 ctx.m_resultData = param->resultData;
2795 treeNodePtr.p->m_lookup_data.m_api_resultRef = ctx.m_resultRef;
2796 treeNodePtr.p->m_lookup_data.m_api_resultData = param->resultData;
2797 treeNodePtr.p->m_lookup_data.m_outstanding = 0;
2798 treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
2799
2800 /**
2801 * Parse stuff common lookup/scan-frag
2802 */
2803 struct DABuffer nodeDA, paramDA;
2804 nodeDA.ptr = node->optional;
2805 nodeDA.end = nodeDA.ptr + (node->len - QN_LookupNode::NodeSize);
2806 paramDA.ptr = param->optional;
2807 paramDA.end = paramDA.ptr + (param->len - QN_LookupParameters::NodeSize);
2808 err = parseDA(ctx, requestPtr, treeNodePtr,
2809 nodeDA, treeBits, paramDA, paramBits);
2810 if (unlikely(err != 0))
2811 {
2812 DEBUG_CRASH();
2813 break;
2814 }
2815
2816 if (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED)
2817 {
2818 jam();
2819 LqhKeyReq::setInterpretedFlag(dst->requestInfo, 1);
2820 }
2821
2822 /**
2823 * Inherit batch size from parent
2824 */
2825 treeNodePtr.p->m_batch_size = 1;
2826 if (treeNodePtr.p->m_parentPtrI != RNIL)
2827 {
2828 jam();
2829 Ptr<TreeNode> parentPtr;
2830 m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
2831 treeNodePtr.p->m_batch_size = parentPtr.p->m_batch_size;
2832 }
2833
2834 if (ctx.m_start_signal)
2835 {
2836 jam();
2837 Signal * signal = ctx.m_start_signal;
2838 const LqhKeyReq* src = (const LqhKeyReq*)signal->getDataPtr();
2839 #if NOT_YET
2840 Uint32 instanceNo =
2841 blockToInstance(signal->header.theReceiversBlockNumber);
2842 treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
2843 instanceNo, getOwnNodeId());
2844 #else
2845 treeNodePtr.p->m_send.m_ref =
2846 numberToRef(DBLQH, getInstanceKey(src->tableSchemaVersion & 0xFFFF,
2847 src->fragmentData & 0xFFFF),
2848 getOwnNodeId());
2849 #endif
2850
2851 Uint32 hashValue = src->hashValue;
2852 Uint32 fragId = src->fragmentData;
2853 Uint32 requestInfo = src->requestInfo;
2854 Uint32 attrLen = src->attrLen; // fragdist-key is in here
2855
2856 /**
2857 * assertions
2858 */
2859 ndbassert(LqhKeyReq::getAttrLen(attrLen) == 0); // Only long
2860 ndbassert(LqhKeyReq::getScanTakeOverFlag(attrLen) == 0);// Not supported
2861 ndbassert(LqhKeyReq::getReorgFlag(attrLen) == 0); // Not supported
2862 ndbassert(LqhKeyReq::getOperation(requestInfo) == ZREAD);
2863 ndbassert(LqhKeyReq::getKeyLen(requestInfo) == 0); // Only long
2864 ndbassert(LqhKeyReq::getMarkerFlag(requestInfo) == 0); // Only read
2865 ndbassert(LqhKeyReq::getAIInLqhKeyReq(requestInfo) == 0);
2866 ndbassert(LqhKeyReq::getSeqNoReplica(requestInfo) == 0);
2867 ndbassert(LqhKeyReq::getLastReplicaNo(requestInfo) == 0);
2868 ndbassert(LqhKeyReq::getApplicationAddressFlag(requestInfo) != 0);
2869 ndbassert(LqhKeyReq::getSameClientAndTcFlag(requestInfo) == 0);
2870
2871 #if TODO
2872 /**
2873 * Handle various lock-modes
2874 */
2875 static Uint8 getDirtyFlag(const UintR & requestInfo);
2876 static Uint8 getSimpleFlag(const UintR & requestInfo);
2877 #endif
2878
2879 Uint32 dst_requestInfo = dst->requestInfo;
2880 ndbassert(LqhKeyReq::getInterpretedFlag(requestInfo) ==
2881 LqhKeyReq::getInterpretedFlag(dst_requestInfo));
2882 ndbassert(LqhKeyReq::getNoDiskFlag(requestInfo) ==
2883 LqhKeyReq::getNoDiskFlag(dst_requestInfo));
2884
2885 dst->hashValue = hashValue;
2886 dst->fragmentData = fragId;
2887 dst->attrLen = attrLen; // fragdist is in here
2888
2889 treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
2890 treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT;
2891 }
2892 return 0;
2893 } while (0);
2894
2895 return err;
2896 }
2897
2898 void
lookup_start(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)2899 Dbspj::lookup_start(Signal* signal,
2900 Ptr<Request> requestPtr,
2901 Ptr<TreeNode> treeNodePtr)
2902 {
2903 lookup_send(signal, requestPtr, treeNodePtr);
2904 }
2905
2906 void
lookup_send(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)2907 Dbspj::lookup_send(Signal* signal,
2908 Ptr<Request> requestPtr,
2909 Ptr<TreeNode> treeNodePtr)
2910 {
2911 jam();
2912
2913 Uint32 cnt = 2;
2914 if (treeNodePtr.p->isLeaf())
2915 {
2916 jam();
2917 if (requestPtr.p->isLookup())
2918 {
2919 jam();
2920 cnt = 0;
2921 }
2922 else
2923 {
2924 jam();
2925 cnt = 1;
2926 }
2927 }
2928
2929 LqhKeyReq* req = reinterpret_cast<LqhKeyReq*>(signal->getDataPtrSend());
2930
2931 memcpy(req, treeNodePtr.p->m_lookup_data.m_lqhKeyReq,
2932 sizeof(treeNodePtr.p->m_lookup_data.m_lqhKeyReq));
2933 req->variableData[2] = treeNodePtr.p->m_send.m_correlation;
2934 req->variableData[3] = requestPtr.p->m_rootResultData;
2935
2936 if (!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()))
2937 {
2938 // Non-LEAF want reply to SPJ instead of ApiClient.
2939 LqhKeyReq::setNormalProtocolFlag(req->requestInfo, 1);
2940 req->variableData[0] = reference();
2941 req->variableData[1] = treeNodePtr.i;
2942 }
2943 else
2944 {
2945 jam();
2946 /**
2947 * Fake that TC sent this request,
2948 * so that it can route a maybe TCKEYREF
2949 */
2950 req->tcBlockref = requestPtr.p->m_senderRef;
2951 }
2952
2953 SectionHandle handle(this);
2954
2955 Uint32 ref = treeNodePtr.p->m_send.m_ref;
2956 Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
2957 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
2958
2959 if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT)
2960 {
2961 jam();
2962 /**
2963 * Pass sections to send
2964 */
2965 treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2966 treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2967 }
2968 else
2969 {
2970 if ((treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED) == 0)
2971 {
2972 jam();
2973 Uint32 tmp = RNIL;
2974 ndbrequire(dupSection(tmp, keyInfoPtrI)); // TODO handle error
2975 keyInfoPtrI = tmp;
2976 }
2977 else
2978 {
2979 jam();
2980 treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
2981 }
2982
2983 if ((treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED) == 0)
2984 {
2985 jam();
2986 Uint32 tmp = RNIL;
2987 ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
2988 attrInfoPtrI = tmp;
2989 }
2990 else
2991 {
2992 jam();
2993 treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
2994 }
2995 }
2996
2997 getSection(handle.m_ptr[0], keyInfoPtrI);
2998 getSection(handle.m_ptr[1], attrInfoPtrI);
2999 handle.m_cnt = 2;
3000
3001 #if defined DEBUG_LQHKEYREQ
3002 ndbout_c("LQHKEYREQ to %x", ref);
3003 printLQHKEYREQ(stdout, signal->getDataPtrSend(),
3004 NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
3005 DBLQH);
3006 printf("KEYINFO: ");
3007 print(handle.m_ptr[0], stdout);
3008 printf("ATTRINFO: ");
3009 print(handle.m_ptr[1], stdout);
3010 #endif
3011
3012 Uint32 Tnode = refToNode(ref);
3013 if (Tnode == getOwnNodeId())
3014 {
3015 c_Counters.incr_counter(CI_LOCAL_READS_SENT, 1);
3016 }
3017 else
3018 {
3019 c_Counters.incr_counter(CI_REMOTE_READS_SENT, 1);
3020 }
3021
3022 if (unlikely(!c_alive_nodes.get(Tnode)))
3023 {
3024 jam();
3025 releaseSections(handle);
3026 abort(signal, requestPtr, DbspjErr::NodeFailure);
3027 return;
3028 }
3029 else if (! (treeNodePtr.p->isLeaf() && requestPtr.p->isLookup()))
3030 {
3031 jam();
3032 ndbassert(Tnode < NDB_ARRAY_SIZE(requestPtr.p->m_lookup_node_data));
3033 requestPtr.p->m_outstanding += cnt;
3034 requestPtr.p->m_lookup_node_data[Tnode] += cnt;
3035 // number wrapped
3036 ndbrequire(! (requestPtr.p->m_lookup_node_data[Tnode] == 0));
3037 }
3038
3039 sendSignal(ref, GSN_LQHKEYREQ, signal,
3040 NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
3041 JBB, &handle);
3042
3043 treeNodePtr.p->m_lookup_data.m_outstanding += cnt;
3044 if (requestPtr.p->isLookup() && treeNodePtr.p->isLeaf())
3045 {
3046 jam();
3047 /**
3048 * Send TCKEYCONF with DirtyReadBit + Tnode,
3049 * so that API can discover if Tnode while waiting for result
3050 */
3051 Uint32 resultRef = req->variableData[0];
3052 Uint32 resultData = req->variableData[1];
3053
3054 TcKeyConf* conf = (TcKeyConf*)signal->getDataPtrSend();
3055 conf->apiConnectPtr = RNIL; // lookup transaction from operations...
3056 conf->confInfo = 0;
3057 TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3058 conf->transId1 = requestPtr.p->m_transId[0];
3059 conf->transId2 = requestPtr.p->m_transId[1];
3060 conf->operations[0].apiOperationPtr = resultData;
3061 conf->operations[0].attrInfoLen = TcKeyConf::DirtyReadBit | Tnode;
3062 Uint32 sigLen = TcKeyConf::StaticLength + TcKeyConf::OperationLength;
3063 sendTCKEYCONF(signal, sigLen, resultRef, requestPtr.p->m_senderRef);
3064 }
3065 }
3066
3067 void
lookup_execTRANSID_AI(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)3068 Dbspj::lookup_execTRANSID_AI(Signal* signal,
3069 Ptr<Request> requestPtr,
3070 Ptr<TreeNode> treeNodePtr,
3071 const RowPtr & rowRef)
3072 {
3073 jam();
3074
3075 Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3076
3077 {
3078 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3079 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3080 Dependency_map::ConstDataBufferIterator it;
3081 for (list.first(it); !it.isNull(); list.next(it))
3082 {
3083 jam();
3084 Ptr<TreeNode> childPtr;
3085 m_treenode_pool.getPtr(childPtr, * it.data);
3086 ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
3087 (this->*(childPtr.p->m_info->m_parent_row))(signal,
3088 requestPtr, childPtr,rowRef);
3089 }
3090 }
3091 ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3092
3093 ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3094 requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3095
3096 treeNodePtr.p->m_lookup_data.m_outstanding--;
3097
3098 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3099 && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3100 && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3101 {
3102 jam();
3103 // We have received all rows for this operation in this batch.
3104 reportBatchComplete(signal, requestPtr, treeNodePtr);
3105
3106 // Prepare for next batch.
3107 treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3108 treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3109 }
3110
3111 checkBatchComplete(signal, requestPtr, 1);
3112 }
3113
3114 void
lookup_execLQHKEYREF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3115 Dbspj::lookup_execLQHKEYREF(Signal* signal,
3116 Ptr<Request> requestPtr,
3117 Ptr<TreeNode> treeNodePtr)
3118 {
3119 const LqhKeyRef * rep = (LqhKeyRef*)signal->getDataPtr();
3120 Uint32 errCode = rep->errorCode;
3121 Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3122
3123 c_Counters.incr_counter(CI_READS_NOT_FOUND, 1);
3124
3125 if (requestPtr.p->isLookup())
3126 {
3127 jam();
3128
3129 /* CONF/REF not requested for lookup-Leaf: */
3130 ndbrequire(!treeNodePtr.p->isLeaf());
3131
3132 /**
3133 * Scan-request does not need to
3134 * send TCKEYREF...
3135 */
3136 /**
3137 * Return back to api...
3138 * NOTE: assume that signal is tampered with
3139 */
3140 Uint32 resultRef = treeNodePtr.p->m_lookup_data.m_api_resultRef;
3141 Uint32 resultData = treeNodePtr.p->m_lookup_data.m_api_resultData;
3142 TcKeyRef* ref = (TcKeyRef*)signal->getDataPtr();
3143 ref->connectPtr = resultData;
3144 ref->transId[0] = requestPtr.p->m_transId[0];
3145 ref->transId[1] = requestPtr.p->m_transId[1];
3146 ref->errorCode = errCode;
3147 ref->errorData = 0;
3148
3149 DEBUG("lookup_execLQHKEYREF, errorCode:" << errCode);
3150
3151 sendTCKEYREF(signal, resultRef, requestPtr.p->m_senderRef);
3152
3153 if (treeNodePtr.p->m_bits & TreeNode::T_UNIQUE_INDEX_LOOKUP)
3154 {
3155 /**
3156 * If this is a "leaf" unique index lookup
3157 * emit extra TCKEYCONF as would have been done with ordinary
3158 * operation
3159 */
3160 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3161 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
3162 Dependency_map::ConstDataBufferIterator it;
3163 ndbrequire(list.first(it));
3164 ndbrequire(list.getSize() == 1); // should only be 1 child
3165 Ptr<TreeNode> childPtr;
3166 m_treenode_pool.getPtr(childPtr, * it.data);
3167 if (childPtr.p->m_bits & TreeNode::T_LEAF)
3168 {
3169 jam();
3170 Uint32 resultRef = childPtr.p->m_lookup_data.m_api_resultRef;
3171 Uint32 resultData = childPtr.p->m_lookup_data.m_api_resultData;
3172 TcKeyConf* conf = (TcKeyConf*)signal->getDataPtr();
3173 conf->apiConnectPtr = RNIL;
3174 conf->confInfo = 0;
3175 conf->gci_hi = 0;
3176 TcKeyConf::setNoOfOperations(conf->confInfo, 1);
3177 conf->transId1 = requestPtr.p->m_transId[0];
3178 conf->transId2 = requestPtr.p->m_transId[1];
3179 conf->operations[0].apiOperationPtr = resultData;
3180 conf->operations[0].attrInfoLen =
3181 TcKeyConf::DirtyReadBit |getOwnNodeId();
3182 sendTCKEYCONF(signal, TcKeyConf::StaticLength + 2, resultRef, requestPtr.p->m_senderRef);
3183 }
3184 }
3185 }
3186 else
3187 {
3188 jam();
3189 switch(errCode){
3190 case 626: // Row not found
3191 case 899: // Interpreter_exit_nok
3192 jam();
3193 break;
3194 default:
3195 jam();
3196 abort(signal, requestPtr, errCode);
3197 }
3198 }
3199
3200 Uint32 cnt = 2;
3201 if (treeNodePtr.p->isLeaf()) // Can't be a lookup-Leaf, asserted above
3202 cnt = 1;
3203
3204 ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= cnt);
3205 requestPtr.p->m_lookup_node_data[Tnode] -= cnt;
3206
3207 treeNodePtr.p->m_lookup_data.m_outstanding -= cnt;
3208
3209 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3210 && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3211 && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3212 {
3213 jam();
3214 // We have received all rows for this operation in this batch.
3215 reportBatchComplete(signal, requestPtr, treeNodePtr);
3216
3217 // Prepare for next batch.
3218 treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3219 treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3220 }
3221
3222 checkBatchComplete(signal, requestPtr, cnt);
3223 }
3224
3225 void
lookup_execLQHKEYCONF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3226 Dbspj::lookup_execLQHKEYCONF(Signal* signal,
3227 Ptr<Request> requestPtr,
3228 Ptr<TreeNode> treeNodePtr)
3229 {
3230 ndbrequire(!(requestPtr.p->isLookup() && treeNodePtr.p->isLeaf()));
3231
3232 Uint32 Tnode = refToNode(signal->getSendersBlockRef());
3233
3234 if (treeNodePtr.p->m_bits & TreeNode::T_USER_PROJECTION)
3235 {
3236 jam();
3237 requestPtr.p->m_rows++;
3238 }
3239
3240 ndbassert(requestPtr.p->m_lookup_node_data[Tnode] >= 1);
3241 requestPtr.p->m_lookup_node_data[Tnode] -= 1;
3242
3243 treeNodePtr.p->m_lookup_data.m_outstanding--;
3244
3245 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3246 && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
3247 && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3248 {
3249 jam();
3250 // We have received all rows for this operation in this batch.
3251 reportBatchComplete(signal, requestPtr, treeNodePtr);
3252
3253 // Prepare for next batch.
3254 treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3255 treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3256 }
3257
3258 checkBatchComplete(signal, requestPtr, 1);
3259 }
3260
3261 void
lookup_parent_row(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)3262 Dbspj::lookup_parent_row(Signal* signal,
3263 Ptr<Request> requestPtr,
3264 Ptr<TreeNode> treeNodePtr,
3265 const RowPtr & rowRef)
3266 {
3267 /**
3268 * Here we need to...
3269 * 1) construct a key
3270 * 2) compute hash (normally TC)
3271 * 3) get node for row (normally TC)
3272 */
3273 Uint32 err;
3274 const LqhKeyReq* src = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
3275 const Uint32 tableId = LqhKeyReq::getTableId(src->tableSchemaVersion);
3276 const Uint32 corrVal = rowRef.m_src_correlation;
3277
3278 DEBUG("::lookup_parent_row");
3279
3280 do
3281 {
3282 Uint32 ptrI = RNIL;
3283 if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED)
3284 {
3285 jam();
3286 DEBUG("parent_row w/ T_KEYINFO_CONSTRUCTED");
3287 /**
3288 * Get key-pattern
3289 */
3290 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3291 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
3292
3293 bool keyIsNull;
3294 err = expand(ptrI, pattern, rowRef, keyIsNull);
3295 if (unlikely(err != 0))
3296 break;
3297
3298 if (keyIsNull)
3299 {
3300 jam();
3301 DEBUG("Key contain NULL values");
3302 /**
3303 * When the key contains NULL values, an EQ-match is impossible!
3304 * Entire lookup request can therefore be eliminate as it is known
3305 * to be REFused with errorCode = 626 (Row not found).
3306 * Different handling is required depening of request being a
3307 * scan or lookup:
3308 */
3309 if (requestPtr.p->isScan())
3310 {
3311 /**
3312 * Scan request: We can simply ignore lookup operation:
3313 * As rowCount in SCANCONF will not include this KEYREQ,
3314 * we dont have to send a KEYREF either.
3315 */
3316 jam();
3317 DEBUG("..Ignore impossible KEYREQ");
3318 if (ptrI != RNIL)
3319 {
3320 releaseSection(ptrI);
3321 }
3322 return; // Bailout, KEYREQ would have returned KEYREF(626) anyway
3323 }
3324 else // isLookup()
3325 {
3326 /**
3327 * Ignored lookup request need a faked KEYREF for the lookup operation.
3328 * Furthermore, if this is a leaf treeNode, a KEYCONF is also
3329 * expected by the API.
3330 *
3331 * TODO: Not implemented yet as we believe
3332 * elimination of NULL key access for scan request
3333 * will have the most performance impact.
3334 */
3335 jam();
3336 }
3337 } // keyIsNull
3338
3339 /**
3340 * NOTE:
3341 * The logic below contradicts 'keyIsNull' logic above and should
3342 * be removed.
3343 * However, it's likely that scanIndex should have similar
3344 * logic as 'Null as wildcard' may make sense for a range bound.
3345 * NOTE2:
3346 * Until 'keyIsNull' also cause bailout for request->isLookup()
3347 * createEmptySection *is* require to avoid crash due to empty keys.
3348 */
3349 if (ptrI == RNIL) // TODO: remove when keyIsNull is completely handled
3350 {
3351 jam();
3352 /**
3353 * We constructed a null-key...construct a zero-length key (even if we don't support it *now*)
3354 *
3355 * (we actually did prior to joining mysql where null was treated as any other
3356 * value in a key). But mysql treats null in unique key as *wildcard*
3357 * which we don't support so well...and do nasty tricks in handler
3358 *
3359 * NOTE: should be *after* check for error
3360 */
3361 err = createEmptySection(ptrI);
3362 if (unlikely(err != 0))
3363 break;
3364 }
3365
3366 treeNodePtr.p->m_send.m_keyInfoPtrI = ptrI;
3367 }
3368
3369 BuildKeyReq tmp;
3370 err = computeHash(signal, tmp, tableId, treeNodePtr.p->m_send.m_keyInfoPtrI);
3371 if (unlikely(err != 0))
3372 break;
3373
3374 err = getNodes(signal, tmp, tableId);
3375 if (unlikely(err != 0))
3376 break;
3377
3378 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3379 if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED)
3380 {
3381 jam();
3382 Uint32 tmp = RNIL;
3383 ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
3384
3385 Uint32 org_size;
3386 {
3387 SegmentedSectionPtr ptr;
3388 getSection(ptr, tmp);
3389 org_size = ptr.sz;
3390 }
3391
3392 bool hasNull;
3393 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
3394 Local_pattern_store pattern(pool, treeNodePtr.p->m_attrParamPattern);
3395 err = expand(tmp, pattern, rowRef, hasNull);
3396 if (unlikely(err != 0))
3397 break;
3398 // ndbrequire(!hasNull);
3399
3400 /**
3401 * Update size of subsrouting section, which contains arguments
3402 */
3403 SegmentedSectionPtr ptr;
3404 getSection(ptr, tmp);
3405 Uint32 new_size = ptr.sz;
3406 Uint32 * sectionptrs = ptr.p->theData;
3407 sectionptrs[4] = new_size - org_size;
3408
3409 treeNodePtr.p->m_send.m_attrInfoPtrI = tmp;
3410 }
3411
3412 /**
3413 * Now send...
3414 */
3415
3416 /**
3417 * TODO merge better with lookup_start (refactor)
3418 */
3419 {
3420 /* We set the upper half word of m_correlation to the tuple ID
3421 * of the parent, such that the API can match this tuple with its
3422 * parent.
3423 * Then we re-use the tuple ID of the parent as the
3424 * tuple ID for this tuple also. Since the tuple ID
3425 * is unique within this batch and SPJ block for the parent operation,
3426 * it must also be unique for this operation.
3427 * This ensures that lookup operations with no user projection will
3428 * work, since such operations will have the same tuple ID as their
3429 * parents. The API will then be able to match a tuple with its
3430 * grandparent, even if it gets no tuple for the parent operation.*/
3431 treeNodePtr.p->m_send.m_correlation =
3432 (corrVal << 16) + (corrVal & 0xffff);
3433
3434 treeNodePtr.p->m_send.m_ref = tmp.receiverRef;
3435 LqhKeyReq * dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
3436 dst->hashValue = tmp.hashInfo[0];
3437 dst->fragmentData = tmp.fragId;
3438 Uint32 attrLen = 0;
3439 LqhKeyReq::setDistributionKey(attrLen, tmp.fragDistKey);
3440 dst->attrLen = attrLen;
3441 lookup_send(signal, requestPtr, treeNodePtr);
3442
3443 if (treeNodePtr.p->m_bits & TreeNode::T_ATTRINFO_CONSTRUCTED)
3444 {
3445 jam();
3446 // restore
3447 treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
3448 }
3449 }
3450 return;
3451 } while (0);
3452
3453 ndbrequire(false);
3454 }
3455
3456 void
lookup_parent_batch_complete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3457 Dbspj::lookup_parent_batch_complete(Signal* signal,
3458 Ptr<Request> requestPtr,
3459 Ptr<TreeNode> treeNodePtr)
3460 {
3461 jam();
3462
3463 /**
3464 * lookups are performed directly...so we're not really interested in
3465 * parent_batch_complete...we only pass-through
3466 */
3467
3468 /**
3469 * but this method should only be called if we have T_REPORT_BATCH_COMPLETE
3470 */
3471 ndbassert(treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE);
3472
3473 ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete);
3474 treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true;
3475 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
3476 && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
3477 {
3478 jam();
3479 // We have received all rows for this operation in this batch.
3480 reportBatchComplete(signal, requestPtr, treeNodePtr);
3481
3482 // Prepare for next batch.
3483 treeNodePtr.p->m_lookup_data.m_parent_batch_complete = false;
3484 treeNodePtr.p->m_lookup_data.m_outstanding = 0;
3485 }
3486 }
3487
3488 void
lookup_abort(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3489 Dbspj::lookup_abort(Signal* signal,
3490 Ptr<Request> requestPtr,
3491 Ptr<TreeNode> treeNodePtr)
3492 {
3493 jam();
3494 }
3495
3496 Uint32
lookup_execNODE_FAILREP(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,NdbNodeBitmask mask)3497 Dbspj::lookup_execNODE_FAILREP(Signal* signal,
3498 Ptr<Request> requestPtr,
3499 Ptr<TreeNode> treeNodePtr,
3500 NdbNodeBitmask mask)
3501 {
3502 jam();
3503 Uint32 node = 0;
3504 Uint32 sum = 0;
3505 while (requestPtr.p->m_outstanding &&
3506 ((node = mask.find(node + 1)) != NdbNodeBitmask::NotFound))
3507 {
3508 Uint32 cnt = requestPtr.p->m_lookup_node_data[node];
3509 sum += cnt;
3510 requestPtr.p->m_lookup_node_data[node] = 0;
3511 }
3512
3513 if (sum)
3514 {
3515 jam();
3516 ndbrequire(requestPtr.p->m_outstanding >= sum);
3517 requestPtr.p->m_outstanding -= sum;
3518 }
3519
3520 return sum;
3521 }
3522
3523 void
lookup_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3524 Dbspj::lookup_cleanup(Ptr<Request> requestPtr,
3525 Ptr<TreeNode> treeNodePtr)
3526 {
3527 cleanup_common(requestPtr, treeNodePtr);
3528 }
3529
3530
3531 Uint32
handle_special_hash(Uint32 tableId,Uint32 dstHash[4],const Uint64 * src,Uint32 srcLen,const KeyDescriptor * desc)3532 Dbspj::handle_special_hash(Uint32 tableId, Uint32 dstHash[4],
3533 const Uint64* src,
3534 Uint32 srcLen, // Len in #32bit words
3535 const KeyDescriptor* desc)
3536 {
3537 const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3538 (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3539 Uint64 alignedWorkspace[MAX_KEY_SIZE_IN_LONG_WORDS * MAX_XFRM_MULTIPLY];
3540 const bool hasVarKeys = desc->noOfVarKeys > 0;
3541 const bool hasCharAttr = desc->hasCharAttr;
3542 const bool compute_distkey = desc->noOfDistrKeys > 0;
3543
3544 const Uint64 *hashInput = 0;
3545 Uint32 inputLen = 0;
3546 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
3547 Uint32 * keyPartLenPtr;
3548
3549 /* Normalise KeyInfo into workspace if necessary */
3550 if (hasCharAttr || (compute_distkey && hasVarKeys))
3551 {
3552 hashInput = alignedWorkspace;
3553 keyPartLenPtr = keyPartLen;
3554 inputLen = xfrm_key(tableId,
3555 (Uint32*)src,
3556 (Uint32*)alignedWorkspace,
3557 sizeof(alignedWorkspace) >> 2,
3558 keyPartLenPtr);
3559 if (unlikely(inputLen == 0))
3560 {
3561 return 290; // 'Corrupt key in TC, unable to xfrm'
3562 }
3563 }
3564 else
3565 {
3566 /* Keyinfo already suitable for hash */
3567 hashInput = src;
3568 inputLen = srcLen;
3569 keyPartLenPtr = 0;
3570 }
3571
3572 /* Calculate primary key hash */
3573 md5_hash(dstHash, hashInput, inputLen);
3574
3575 /* If the distribution key != primary key then we have to
3576 * form a distribution key from the primary key and calculate
3577 * a separate distribution hash based on this
3578 */
3579 if (compute_distkey)
3580 {
3581 jam();
3582
3583 Uint32 distrKeyHash[4];
3584 /* Reshuffle primary key columns to get just distribution key */
3585 Uint32 len = create_distr_key(tableId, (Uint32*)hashInput, (Uint32*)alignedWorkspace, keyPartLenPtr);
3586 /* Calculate distribution key hash */
3587 md5_hash(distrKeyHash, alignedWorkspace, len);
3588
3589 /* Just one word used for distribution */
3590 dstHash[1] = distrKeyHash[1];
3591 }
3592 return 0;
3593 }
3594
3595 Uint32
computeHash(Signal * signal,BuildKeyReq & dst,Uint32 tableId,Uint32 ptrI)3596 Dbspj::computeHash(Signal* signal,
3597 BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3598 {
3599 /**
3600 * Essentially the same code as in Dbtc::hash().
3601 * The code for user defined partitioning has been removed though.
3602 */
3603 SegmentedSectionPtr ptr;
3604 getSection(ptr, ptrI);
3605
3606 /* NOTE: md5_hash below require 64-bit alignment
3607 */
3608 const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3609 (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3610 Uint64 tmp64[MAX_KEY_SIZE_IN_LONG_WORDS];
3611 Uint32 *tmp32 = (Uint32*)tmp64;
3612 copy(tmp32, ptr);
3613
3614 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3615 ndbrequire(desc != NULL);
3616
3617 bool need_special_hash = desc->hasCharAttr | (desc->noOfDistrKeys > 0);
3618 if (need_special_hash)
3619 {
3620 jam();
3621 return handle_special_hash(tableId, dst.hashInfo, tmp64, ptr.sz, desc);
3622 }
3623 else
3624 {
3625 jam();
3626 md5_hash(dst.hashInfo, tmp64, ptr.sz);
3627 return 0;
3628 }
3629 }
3630
3631 /**
3632 * This function differs from computeHash in that *ptrI*
3633 * only contains partition key (packed) and not full primary key
3634 */
3635 Uint32
computePartitionHash(Signal * signal,BuildKeyReq & dst,Uint32 tableId,Uint32 ptrI)3636 Dbspj::computePartitionHash(Signal* signal,
3637 BuildKeyReq& dst, Uint32 tableId, Uint32 ptrI)
3638 {
3639 SegmentedSectionPtr ptr;
3640 getSection(ptr, ptrI);
3641
3642 /* NOTE: md5_hash below require 64-bit alignment
3643 */
3644 const Uint32 MAX_KEY_SIZE_IN_LONG_WORDS=
3645 (MAX_KEY_SIZE_IN_WORDS + 1) / 2;
3646 Uint64 _space[MAX_KEY_SIZE_IN_LONG_WORDS];
3647 Uint64 *tmp64 = _space;
3648 Uint32 *tmp32 = (Uint32*)tmp64;
3649 Uint32 sz = ptr.sz;
3650 copy(tmp32, ptr);
3651
3652 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3653 ndbrequire(desc != NULL);
3654
3655 bool need_xfrm = desc->hasCharAttr || desc->noOfVarKeys;
3656 if (need_xfrm)
3657 {
3658 jam();
3659 /**
3660 * xfrm distribution key
3661 */
3662 Uint32 srcPos = 0;
3663 Uint32 dstPos = 0;
3664 Uint32 * src = tmp32;
3665 Uint32 * dst = signal->theData+24;
3666 for (Uint32 i = 0; i < desc->noOfKeyAttr; i++)
3667 {
3668 const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3669 if (AttributeDescriptor::getDKey(keyAttr.attributeDescriptor))
3670 {
3671 xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3672 src, srcPos, dst, dstPos,
3673 NDB_ARRAY_SIZE(signal->theData) - 24);
3674 }
3675 }
3676 tmp64 = (Uint64*)dst;
3677 sz = dstPos;
3678 }
3679
3680 md5_hash(dst.hashInfo, tmp64, sz);
3681 return 0;
3682 }
3683
3684 Uint32
getNodes(Signal * signal,BuildKeyReq & dst,Uint32 tableId)3685 Dbspj::getNodes(Signal* signal, BuildKeyReq& dst, Uint32 tableId)
3686 {
3687 Uint32 err;
3688 DiGetNodesReq * req = (DiGetNodesReq *)&signal->theData[0];
3689 req->tableId = tableId;
3690 req->hashValue = dst.hashInfo[1];
3691 req->distr_key_indicator = 0; // userDefinedPartitioning not supported!
3692 * (EmulatedJamBuffer**)req->jamBuffer = jamBuffer();
3693
3694 #if 1
3695 EXECUTE_DIRECT(DBDIH, GSN_DIGETNODESREQ, signal,
3696 DiGetNodesReq::SignalLength, 0);
3697 #else
3698 sendSignal(DBDIH_REF, GSN_DIGETNODESREQ, signal,
3699 DiGetNodesReq::SignalLength, JBB);
3700 jamEntry();
3701
3702 #endif
3703
3704 DiGetNodesConf * conf = (DiGetNodesConf *)&signal->theData[0];
3705 err = signal->theData[0];
3706 Uint32 Tdata2 = conf->reqinfo;
3707 Uint32 nodeId = conf->nodes[0];
3708 Uint32 instanceKey = (Tdata2 >> 24) & 127;
3709
3710 DEBUG("HASH to nodeId:" << nodeId << ", instanceKey:" << instanceKey);
3711
3712 jamEntry();
3713 if (unlikely(err != 0))
3714 goto error;
3715
3716 dst.fragId = conf->fragId;
3717 dst.fragDistKey = (Tdata2 >> 16) & 255;
3718 dst.receiverRef = numberToRef(DBLQH, instanceKey, nodeId);
3719
3720 return 0;
3721
3722 error:
3723 /**
3724 * TODO handle error
3725 */
3726 ndbrequire(false);
3727 return err;
3728 }
3729
3730 /**
3731 * END - MODULE LOOKUP
3732 */
3733
3734 /**
3735 * MODULE SCAN FRAG
3736 *
3737 * NOTE: This may only be root node
3738 */
3739 const Dbspj::OpInfo
3740 Dbspj::g_ScanFragOpInfo =
3741 {
3742 &Dbspj::scanFrag_build,
3743 0, // prepare
3744 &Dbspj::scanFrag_start,
3745 &Dbspj::scanFrag_execTRANSID_AI,
3746 0, // execLQHKEYREF
3747 0, // execLQHKEYCONF
3748 &Dbspj::scanFrag_execSCAN_FRAGREF,
3749 &Dbspj::scanFrag_execSCAN_FRAGCONF,
3750 0, // parent row
3751 0, // parent batch complete
3752 0, // parent batch repeat
3753 0, // Dbspj::scanFrag_parent_batch_cleanup,
3754 &Dbspj::scanFrag_execSCAN_NEXTREQ,
3755 0, // Dbspj::scanFrag_complete
3756 &Dbspj::scanFrag_abort,
3757 0, // execNODE_FAILREP,
3758 &Dbspj::scanFrag_cleanup
3759 };
3760
3761 Uint32
scanFrag_build(Build_context & ctx,Ptr<Request> requestPtr,const QueryNode * qn,const QueryNodeParameters * qp)3762 Dbspj::scanFrag_build(Build_context& ctx,
3763 Ptr<Request> requestPtr,
3764 const QueryNode* qn,
3765 const QueryNodeParameters* qp)
3766 {
3767 Uint32 err = 0;
3768 Ptr<TreeNode> treeNodePtr;
3769 const QN_ScanFragNode * node = (const QN_ScanFragNode*)qn;
3770 const QN_ScanFragParameters * param = (const QN_ScanFragParameters*)qp;
3771
3772 do
3773 {
3774 err = createNode(ctx, requestPtr, treeNodePtr);
3775 if (unlikely(err != 0))
3776 break;
3777
3778 treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = RNIL;
3779 Ptr<ScanFragHandle> scanFragHandlePtr;
3780 if (unlikely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena,
3781 scanFragHandlePtr) != true))
3782 {
3783 err = DbspjErr::OutOfQueryMemory;
3784 break;
3785 }
3786
3787 scanFragHandlePtr.p->m_treeNodePtrI = treeNodePtr.i;
3788 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
3789 treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI = scanFragHandlePtr.i;
3790
3791 requestPtr.p->m_bits |= Request::RT_SCAN;
3792 treeNodePtr.p->m_info = &g_ScanFragOpInfo;
3793 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
3794 treeNodePtr.p->m_batch_size = ctx.m_batch_size_rows;
3795
3796 ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
3797 dst->senderData = scanFragHandlePtr.i;
3798 dst->resultRef = reference();
3799 dst->resultData = treeNodePtr.i;
3800 dst->savePointId = ctx.m_savepointId;
3801
3802 Uint32 transId1 = requestPtr.p->m_transId[0];
3803 Uint32 transId2 = requestPtr.p->m_transId[1];
3804 dst->transId1 = transId1;
3805 dst->transId2 = transId2;
3806
3807 Uint32 treeBits = node->requestInfo;
3808 Uint32 paramBits = param->requestInfo;
3809 //ndbout_c("Dbspj::scanFrag_build() treeBits=%.8x paramBits=%.8x",
3810 // treeBits, paramBits);
3811 Uint32 requestInfo = 0;
3812 ScanFragReq::setReadCommittedFlag(requestInfo, 1);
3813 ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
3814 ScanFragReq::setCorrFactorFlag(requestInfo, 1);
3815 ScanFragReq::setNoDiskFlag(requestInfo,
3816 (treeBits & DABits::NI_LINKED_DISK) == 0 &&
3817 (paramBits & DABits::PI_DISK_ATTR) == 0);
3818 dst->requestInfo = requestInfo;
3819
3820 err = DbspjErr::InvalidTreeNodeSpecification;
3821 DEBUG("scanFrag_build: len=" << node->len);
3822 if (unlikely(node->len < QN_ScanFragNode::NodeSize))
3823 break;
3824
3825 dst->tableId = node->tableId;
3826 dst->schemaVersion = node->tableVersion;
3827
3828 err = DbspjErr::InvalidTreeParametersSpecification;
3829 DEBUG("param len: " << param->len);
3830 if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
3831 {
3832 jam();
3833 DEBUG_CRASH();
3834 break;
3835 }
3836
3837 ctx.m_resultData = param->resultData;
3838
3839 /**
3840 * Parse stuff common lookup/scan-frag
3841 */
3842 struct DABuffer nodeDA, paramDA;
3843 nodeDA.ptr = node->optional;
3844 nodeDA.end = nodeDA.ptr + (node->len - QN_ScanFragNode::NodeSize);
3845 paramDA.ptr = param->optional;
3846 paramDA.end = paramDA.ptr + (param->len - QN_ScanFragParameters::NodeSize);
3847 err = parseDA(ctx, requestPtr, treeNodePtr,
3848 nodeDA, treeBits, paramDA, paramBits);
3849 if (unlikely(err != 0))
3850 {
3851 jam();
3852 DEBUG_CRASH();
3853 break;
3854 }
3855
3856 ctx.m_scan_cnt++;
3857 ctx.m_scans.set(treeNodePtr.p->m_node_no);
3858
3859 if (ctx.m_start_signal)
3860 {
3861 jam();
3862 Signal* signal = ctx.m_start_signal;
3863 const ScanFragReq* src = (const ScanFragReq*)(signal->getDataPtr());
3864
3865 #if NOT_YET
3866 Uint32 instanceNo =
3867 blockToInstance(signal->header.theReceiversBlockNumber);
3868 treeNodePtr.p->m_send.m_ref = numberToRef(DBLQH,
3869 instanceNo, getOwnNodeId());
3870 #else
3871 treeNodePtr.p->m_send.m_ref =
3872 numberToRef(DBLQH, getInstanceKey(src->tableId,
3873 src->fragmentNoKeyLen),
3874 getOwnNodeId());
3875 #endif
3876
3877 Uint32 fragId = src->fragmentNoKeyLen;
3878 Uint32 requestInfo = src->requestInfo;
3879 Uint32 batch_size_bytes = src->batch_size_bytes;
3880 Uint32 batch_size_rows = src->batch_size_rows;
3881
3882 #ifdef VM_TRACE
3883 Uint32 savePointId = src->savePointId;
3884 Uint32 tableId = src->tableId;
3885 Uint32 schemaVersion = src->schemaVersion;
3886 Uint32 transId1 = src->transId1;
3887 Uint32 transId2 = src->transId2;
3888 #endif
3889 ndbassert(ScanFragReq::getLockMode(requestInfo) == 0);
3890 ndbassert(ScanFragReq::getHoldLockFlag(requestInfo) == 0);
3891 ndbassert(ScanFragReq::getKeyinfoFlag(requestInfo) == 0);
3892 ndbassert(ScanFragReq::getReadCommittedFlag(requestInfo) == 1);
3893 ndbassert(ScanFragReq::getLcpScanFlag(requestInfo) == 0);
3894 //ScanFragReq::getAttrLen(requestInfo); // ignore
3895 ndbassert(ScanFragReq::getReorgFlag(requestInfo) == 0);
3896
3897 Uint32 tupScanFlag = ScanFragReq::getTupScanFlag(requestInfo);
3898 Uint32 rangeScanFlag = ScanFragReq::getRangeScanFlag(requestInfo);
3899 Uint32 descendingFlag = ScanFragReq::getDescendingFlag(requestInfo);
3900 Uint32 scanPrio = ScanFragReq::getScanPrio(requestInfo);
3901
3902 Uint32 dst_requestInfo = dst->requestInfo;
3903
3904 ScanFragReq::setTupScanFlag(dst_requestInfo,tupScanFlag);
3905 ScanFragReq::setRangeScanFlag(dst_requestInfo,rangeScanFlag);
3906 ScanFragReq::setDescendingFlag(dst_requestInfo,descendingFlag);
3907 ScanFragReq::setScanPrio(dst_requestInfo,scanPrio);
3908
3909 /**
3910 * 'NoDiskFlag' should agree with information in treeNode
3911 */
3912 ndbassert(ScanFragReq::getNoDiskFlag(requestInfo) ==
3913 ScanFragReq::getNoDiskFlag(dst_requestInfo));
3914
3915 dst->fragmentNoKeyLen = fragId;
3916 dst->requestInfo = dst_requestInfo;
3917 dst->batch_size_bytes = batch_size_bytes;
3918 dst->batch_size_rows = batch_size_rows;
3919
3920 #ifdef VM_TRACE
3921 ndbassert(dst->savePointId == savePointId);
3922 ndbassert(dst->tableId == tableId);
3923 ndbassert(dst->schemaVersion == schemaVersion);
3924 ndbassert(dst->transId1 == transId1);
3925 ndbassert(dst->transId2 == transId2);
3926 #endif
3927
3928 treeNodePtr.p->m_send.m_keyInfoPtrI = ctx.m_keyPtr.i;
3929 treeNodePtr.p->m_bits |= TreeNode::T_ONE_SHOT;
3930
3931 if (rangeScanFlag)
3932 {
3933 c_Counters.incr_counter(CI_RANGE_SCANS_RECEIVED, 1);
3934 }
3935 else
3936 {
3937 c_Counters.incr_counter(CI_TABLE_SCANS_RECEIVED, 1);
3938 }
3939 }
3940 else
3941 {
3942 ndbrequire(false);
3943 }
3944
3945 return 0;
3946 } while (0);
3947
3948 return err;
3949 }
3950
3951 void
scanFrag_start(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3952 Dbspj::scanFrag_start(Signal* signal,
3953 Ptr<Request> requestPtr,
3954 Ptr<TreeNode> treeNodePtr)
3955 {
3956 scanFrag_send(signal, requestPtr, treeNodePtr);
3957 }
3958
3959 void
scanFrag_send(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)3960 Dbspj::scanFrag_send(Signal* signal,
3961 Ptr<Request> requestPtr,
3962 Ptr<TreeNode> treeNodePtr)
3963 {
3964 jam();
3965
3966 requestPtr.p->m_outstanding++;
3967 requestPtr.p->m_cnt_active++;
3968 treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
3969 Ptr<ScanFragHandle> scanFragHandlePtr;
3970 m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
3971 m_scanfrag_data.m_scanFragHandlePtrI);
3972
3973 ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
3974
3975 memcpy(req, treeNodePtr.p->m_scanfrag_data.m_scanFragReq,
3976 sizeof(treeNodePtr.p->m_scanfrag_data.m_scanFragReq));
3977 req->variableData[0] = treeNodePtr.p->m_send.m_correlation;
3978 req->variableData[1] = requestPtr.p->m_rootResultData;
3979
3980 SectionHandle handle(this);
3981
3982 Uint32 ref = treeNodePtr.p->m_send.m_ref;
3983 Uint32 keyInfoPtrI = treeNodePtr.p->m_send.m_keyInfoPtrI;
3984 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
3985
3986 /**
3987 * ScanFrag may only be used as root-node, i.e T_ONE_SHOT
3988 */
3989 ndbrequire(treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT);
3990
3991 /**
3992 * Pass sections to send
3993 */
3994 treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
3995 treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
3996
3997 getSection(handle.m_ptr[0], attrInfoPtrI);
3998 handle.m_cnt = 1;
3999
4000 if (keyInfoPtrI != RNIL)
4001 {
4002 jam();
4003 getSection(handle.m_ptr[1], keyInfoPtrI);
4004 handle.m_cnt = 2;
4005 }
4006
4007 #ifdef DEBUG_SCAN_FRAGREQ
4008 ndbout_c("SCAN_FRAGREQ to %x", ref);
4009 printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
4010 NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4011 DBLQH);
4012 printf("ATTRINFO: ");
4013 print(handle.m_ptr[0], stdout);
4014 if (handle.m_cnt > 1)
4015 {
4016 printf("KEYINFO: ");
4017 print(handle.m_ptr[1], stdout);
4018 }
4019 #endif
4020
4021 if (ScanFragReq::getRangeScanFlag(req->requestInfo))
4022 {
4023 c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
4024 }
4025 else
4026 {
4027 c_Counters.incr_counter(CI_LOCAL_TABLE_SCANS_SENT, 1);
4028 }
4029
4030 ndbrequire(refToNode(ref) == getOwnNodeId());
4031 sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
4032 NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
4033 JBB, &handle);
4034
4035 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4036 treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4037 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4038 }
4039
4040 void
scanFrag_execTRANSID_AI(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)4041 Dbspj::scanFrag_execTRANSID_AI(Signal* signal,
4042 Ptr<Request> requestPtr,
4043 Ptr<TreeNode> treeNodePtr,
4044 const RowPtr & rowRef)
4045 {
4046 jam();
4047 treeNodePtr.p->m_scanfrag_data.m_rows_received++;
4048
4049 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4050 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
4051 Dependency_map::ConstDataBufferIterator it;
4052
4053 {
4054 for (list.first(it); !it.isNull(); list.next(it))
4055 {
4056 jam();
4057 Ptr<TreeNode> childPtr;
4058 m_treenode_pool.getPtr(childPtr, * it.data);
4059 ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
4060 (this->*(childPtr.p->m_info->m_parent_row))(signal,
4061 requestPtr, childPtr,rowRef);
4062 }
4063 }
4064
4065 if (treeNodePtr.p->m_scanfrag_data.m_rows_received ==
4066 treeNodePtr.p->m_scanfrag_data.m_rows_expecting)
4067 {
4068 jam();
4069
4070 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
4071 {
4072 jam();
4073 reportBatchComplete(signal, requestPtr, treeNodePtr);
4074 }
4075
4076 checkBatchComplete(signal, requestPtr, 1);
4077 return;
4078 }
4079 }
4080
4081 void
scanFrag_execSCAN_FRAGREF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> scanFragHandlePtr)4082 Dbspj::scanFrag_execSCAN_FRAGREF(Signal* signal,
4083 Ptr<Request> requestPtr,
4084 Ptr<TreeNode> treeNodePtr,
4085 Ptr<ScanFragHandle> scanFragHandlePtr)
4086 {
4087 const ScanFragRef* rep =
4088 reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
4089 Uint32 errCode = rep->errorCode;
4090
4091 DEBUG("scanFrag_execSCAN_FRAGREF, rep->senderData:" << rep->senderData
4092 << ", requestPtr.p->m_senderData:" << requestPtr.p->m_senderData);
4093 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4094 ndbrequire(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
4095 ndbrequire(requestPtr.p->m_cnt_active);
4096 requestPtr.p->m_cnt_active--;
4097 ndbrequire(requestPtr.p->m_outstanding);
4098 requestPtr.p->m_outstanding--;
4099 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4100
4101 abort(signal, requestPtr, errCode);
4102 }
4103
4104
4105 void
scanFrag_execSCAN_FRAGCONF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> scanFragHandlePtr)4106 Dbspj::scanFrag_execSCAN_FRAGCONF(Signal* signal,
4107 Ptr<Request> requestPtr,
4108 Ptr<TreeNode> treeNodePtr,
4109 Ptr<ScanFragHandle> scanFragHandlePtr)
4110 {
4111 const ScanFragConf * conf =
4112 reinterpret_cast<const ScanFragConf*>(signal->getDataPtr());
4113 Uint32 rows = conf->completedOps;
4114 Uint32 done = conf->fragmentCompleted;
4115
4116 Uint32 state = scanFragHandlePtr.p->m_state;
4117 if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
4118 {
4119 jam();
4120 /**
4121 * We sent an explicit close request...ignore this...a close will come later
4122 */
4123 return;
4124 }
4125
4126 ndbrequire(done <= 2); // 0, 1, 2 (=ZSCAN_FRAG_CLOSED)
4127
4128 ndbassert(treeNodePtr.p->m_scanfrag_data.m_rows_expecting == ~Uint32(0));
4129 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = rows;
4130 if (treeNodePtr.p->isLeaf())
4131 {
4132 /**
4133 * If this is a leaf node, then no rows will be sent to the SPJ block,
4134 * as there are no child operations to instantiate.
4135 */
4136 treeNodePtr.p->m_scanfrag_data.m_rows_received = rows;
4137 }
4138
4139 requestPtr.p->m_rows += rows;
4140 if (done)
4141 {
4142 jam();
4143
4144 ndbrequire(requestPtr.p->m_cnt_active);
4145 requestPtr.p->m_cnt_active--;
4146 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4147 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
4148 }
4149 else
4150 {
4151 jam();
4152 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
4153 }
4154
4155 if (treeNodePtr.p->m_scanfrag_data.m_rows_expecting ==
4156 treeNodePtr.p->m_scanfrag_data.m_rows_received ||
4157 (state == ScanFragHandle::SFH_WAIT_CLOSE))
4158 {
4159 jam();
4160
4161 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
4162 {
4163 jam();
4164 reportBatchComplete(signal, requestPtr, treeNodePtr);
4165 }
4166
4167 checkBatchComplete(signal, requestPtr, 1);
4168 return;
4169 }
4170 }
4171
4172 void
scanFrag_execSCAN_NEXTREQ(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4173 Dbspj::scanFrag_execSCAN_NEXTREQ(Signal* signal,
4174 Ptr<Request> requestPtr,
4175 Ptr<TreeNode> treeNodePtr)
4176 {
4177 jamEntry();
4178
4179 Ptr<ScanFragHandle> scanFragHandlePtr;
4180 m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
4181 m_scanfrag_data.m_scanFragHandlePtrI);
4182
4183 const ScanFragReq * org =
4184 (ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
4185
4186 ScanFragNextReq* req =
4187 reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
4188 req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4189 req->requestInfo = 0;
4190 req->transId1 = requestPtr.p->m_transId[0];
4191 req->transId2 = requestPtr.p->m_transId[1];
4192 req->batch_size_rows = org->batch_size_rows;
4193 req->batch_size_bytes = org->batch_size_bytes;
4194
4195 DEBUG("scanFrag_execSCAN_NEXTREQ to: " << hex << treeNodePtr.p->m_send.m_ref
4196 << ", senderData: " << req->senderData);
4197 #ifdef DEBUG_SCAN_FRAGREQ
4198 printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
4199 ScanFragNextReq::SignalLength, DBLQH);
4200 #endif
4201
4202 sendSignal(treeNodePtr.p->m_send.m_ref,
4203 GSN_SCAN_NEXTREQ,
4204 signal,
4205 ScanFragNextReq::SignalLength,
4206 JBB);
4207
4208 treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;
4209 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4210 requestPtr.p->m_outstanding++;
4211 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_SCANNING;
4212 }//Dbspj::scanFrag_execSCAN_NEXTREQ()
4213
4214 void
scanFrag_abort(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4215 Dbspj::scanFrag_abort(Signal* signal,
4216 Ptr<Request> requestPtr,
4217 Ptr<TreeNode> treeNodePtr)
4218 {
4219 jam();
4220
4221 Ptr<ScanFragHandle> scanFragHandlePtr;
4222 m_scanfraghandle_pool.getPtr(scanFragHandlePtr, treeNodePtr.p->
4223 m_scanfrag_data.m_scanFragHandlePtrI);
4224 if (treeNodePtr.p->m_state == TreeNode::TN_ACTIVE)
4225 {
4226 jam();
4227
4228 switch(scanFragHandlePtr.p->m_state){
4229 case ScanFragHandle::SFH_NOT_STARTED:
4230 case ScanFragHandle::SFH_COMPLETE:
4231 ndbrequire(false); // we shouldnt be TN_ACTIVE then...
4232
4233 case ScanFragHandle::SFH_WAIT_CLOSE:
4234 jam();
4235 // close already sent
4236 return;
4237 case ScanFragHandle::SFH_WAIT_NEXTREQ:
4238 jam();
4239 // we were idle
4240 requestPtr.p->m_outstanding++;
4241 break;
4242 case ScanFragHandle::SFH_SCANNING:
4243 jam();
4244 break;
4245 }
4246
4247 treeNodePtr.p->m_scanfrag_data.m_rows_expecting = ~Uint32(0);
4248 scanFragHandlePtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
4249
4250 ScanFragNextReq* req =
4251 reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
4252 req->senderData = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4253 req->requestInfo = ScanFragNextReq::ZCLOSE;
4254 req->transId1 = requestPtr.p->m_transId[0];
4255 req->transId2 = requestPtr.p->m_transId[1];
4256 req->batch_size_rows = 0;
4257 req->batch_size_bytes = 0;
4258
4259 sendSignal(treeNodePtr.p->m_send.m_ref,
4260 GSN_SCAN_NEXTREQ,
4261 signal,
4262 ScanFragNextReq::SignalLength,
4263 JBB);
4264 }
4265 }
4266
4267
4268 void
scanFrag_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4269 Dbspj::scanFrag_cleanup(Ptr<Request> requestPtr,
4270 Ptr<TreeNode> treeNodePtr)
4271 {
4272 Uint32 ptrI = treeNodePtr.p->m_scanfrag_data.m_scanFragHandlePtrI;
4273 if (ptrI != RNIL)
4274 {
4275 m_scanfraghandle_pool.release(ptrI);
4276 }
4277 cleanup_common(requestPtr, treeNodePtr);
4278 }
4279
4280 /**
4281 * END - MODULE SCAN FRAG
4282 */
4283
4284 /**
4285 * MODULE SCAN INDEX
4286 *
4287 * NOTE: This may not be root-node
4288 */
4289 const Dbspj::OpInfo
4290 Dbspj::g_ScanIndexOpInfo =
4291 {
4292 &Dbspj::scanIndex_build,
4293 &Dbspj::scanIndex_prepare,
4294 0, // start
4295 &Dbspj::scanIndex_execTRANSID_AI,
4296 0, // execLQHKEYREF
4297 0, // execLQHKEYCONF
4298 &Dbspj::scanIndex_execSCAN_FRAGREF,
4299 &Dbspj::scanIndex_execSCAN_FRAGCONF,
4300 &Dbspj::scanIndex_parent_row,
4301 &Dbspj::scanIndex_parent_batch_complete,
4302 &Dbspj::scanIndex_parent_batch_repeat,
4303 &Dbspj::scanIndex_parent_batch_cleanup,
4304 &Dbspj::scanIndex_execSCAN_NEXTREQ,
4305 &Dbspj::scanIndex_complete,
4306 &Dbspj::scanIndex_abort,
4307 &Dbspj::scanIndex_execNODE_FAILREP,
4308 &Dbspj::scanIndex_cleanup
4309 };
4310
4311 Uint32
scanIndex_build(Build_context & ctx,Ptr<Request> requestPtr,const QueryNode * qn,const QueryNodeParameters * qp)4312 Dbspj::scanIndex_build(Build_context& ctx,
4313 Ptr<Request> requestPtr,
4314 const QueryNode* qn,
4315 const QueryNodeParameters* qp)
4316 {
4317 Uint32 err = 0;
4318 Ptr<TreeNode> treeNodePtr;
4319 const QN_ScanIndexNode * node = (const QN_ScanIndexNode*)qn;
4320 const QN_ScanIndexParameters * param = (const QN_ScanIndexParameters*)qp;
4321
4322 do
4323 {
4324 err = createNode(ctx, requestPtr, treeNodePtr);
4325 if (unlikely(err != 0))
4326 break;
4327
4328 Uint32 batchSize = param->batchSize;
4329
4330 requestPtr.p->m_bits |= Request::RT_SCAN;
4331 requestPtr.p->m_bits |= Request::RT_NEED_PREPARE;
4332 requestPtr.p->m_bits |= Request::RT_NEED_COMPLETE;
4333 treeNodePtr.p->m_info = &g_ScanIndexOpInfo;
4334 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
4335 treeNodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED;
4336 treeNodePtr.p->m_batch_size =
4337 batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4338
4339 ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
4340 dst->senderData = treeNodePtr.i;
4341 dst->resultRef = reference();
4342 dst->resultData = treeNodePtr.i;
4343 dst->savePointId = ctx.m_savepointId;
4344 dst->batch_size_rows =
4345 batchSize & ~(0xFFFFFFFF << QN_ScanIndexParameters::BatchRowBits);
4346 dst->batch_size_bytes = batchSize >> QN_ScanIndexParameters::BatchRowBits;
4347
4348 Uint32 transId1 = requestPtr.p->m_transId[0];
4349 Uint32 transId2 = requestPtr.p->m_transId[1];
4350 dst->transId1 = transId1;
4351 dst->transId2 = transId2;
4352
4353 Uint32 treeBits = node->requestInfo;
4354 Uint32 paramBits = param->requestInfo;
4355 Uint32 requestInfo = 0;
4356 ScanFragReq::setRangeScanFlag(requestInfo, 1);
4357 ScanFragReq::setReadCommittedFlag(requestInfo, 1);
4358 ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
4359 ScanFragReq::setNoDiskFlag(requestInfo,
4360 (treeBits & DABits::NI_LINKED_DISK) == 0 &&
4361 (paramBits & DABits::PI_DISK_ATTR) == 0);
4362 ScanFragReq::setCorrFactorFlag(requestInfo, 1);
4363 dst->requestInfo = requestInfo;
4364
4365 err = DbspjErr::InvalidTreeNodeSpecification;
4366 DEBUG("scanIndex_build: len=" << node->len);
4367 if (unlikely(node->len < QN_ScanIndexNode::NodeSize))
4368 break;
4369
4370 dst->tableId = node->tableId;
4371 dst->schemaVersion = node->tableVersion;
4372
4373 err = DbspjErr::InvalidTreeParametersSpecification;
4374 DEBUG("param len: " << param->len);
4375 if (unlikely(param->len < QN_ScanIndexParameters::NodeSize))
4376 {
4377 jam();
4378 DEBUG_CRASH();
4379 break;
4380 }
4381
4382 ctx.m_resultData = param->resultData;
4383
4384 /**
4385 * Parse stuff
4386 */
4387 struct DABuffer nodeDA, paramDA;
4388 nodeDA.ptr = node->optional;
4389 nodeDA.end = nodeDA.ptr + (node->len - QN_ScanIndexNode::NodeSize);
4390 paramDA.ptr = param->optional;
4391 paramDA.end = paramDA.ptr + (param->len - QN_ScanIndexParameters::NodeSize);
4392
4393 err = parseScanIndex(ctx, requestPtr, treeNodePtr,
4394 nodeDA, treeBits, paramDA, paramBits);
4395
4396 if (unlikely(err != 0))
4397 {
4398 jam();
4399 DEBUG_CRASH();
4400 break;
4401 }
4402
4403 /**
4404 * Since we T_NEED_REPORT_BATCH_COMPLETED, we set
4405 * this on all our parents...
4406 */
4407 Ptr<TreeNode> nodePtr;
4408 nodePtr.i = treeNodePtr.p->m_parentPtrI;
4409 while (nodePtr.i != RNIL)
4410 {
4411 jam();
4412 m_treenode_pool.getPtr(nodePtr);
4413 nodePtr.p->m_bits |= TreeNode::T_REPORT_BATCH_COMPLETE;
4414 nodePtr.p->m_bits |= TreeNode::T_NEED_REPORT_BATCH_COMPLETED;
4415 nodePtr.i = nodePtr.p->m_parentPtrI;
4416 }
4417
4418 /**
4419 * If there exists other scan TreeNodes not being among
4420 * my ancestors, results from this scanIndex may be repeated
4421 * as part of an X-scan.
4422 *
4423 * NOTE: The scan nodes being along the left deep ancestor chain
4424 * are not 'repeatable' as they are driving the
4425 * repeated X-scan and are thus not repeated themself.
4426 */
4427 if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT &&
4428 !treeNodePtr.p->m_ancestors.contains(ctx.m_scans))
4429 {
4430 treeNodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
4431 }
4432
4433 ctx.m_scan_cnt++;
4434 ctx.m_scans.set(treeNodePtr.p->m_node_no);
4435
4436 return 0;
4437 } while (0);
4438
4439 return err;
4440 }
4441
4442 Uint32
parseScanIndex(Build_context & ctx,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,DABuffer tree,Uint32 treeBits,DABuffer param,Uint32 paramBits)4443 Dbspj::parseScanIndex(Build_context& ctx,
4444 Ptr<Request> requestPtr,
4445 Ptr<TreeNode> treeNodePtr,
4446 DABuffer tree, Uint32 treeBits,
4447 DABuffer param, Uint32 paramBits)
4448 {
4449 Uint32 err = 0;
4450
4451 typedef QN_ScanIndexNode Node;
4452 typedef QN_ScanIndexParameters Params;
4453
4454 do
4455 {
4456 jam();
4457
4458 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4459 data.m_fragments.init();
4460 data.m_frags_outstanding = 0;
4461 data.m_frags_complete = 0;
4462 data.m_frags_not_started = 0;
4463 data.m_parallelismStat.init();
4464 data.m_firstExecution = true;
4465 data.m_batch_chunks = 0;
4466
4467 err = parseDA(ctx, requestPtr, treeNodePtr,
4468 tree, treeBits, param, paramBits);
4469 if (unlikely(err != 0))
4470 break;
4471
4472 if (treeBits & Node::SI_PRUNE_PATTERN)
4473 {
4474 Uint32 len_cnt = * tree.ptr ++;
4475 Uint32 len = len_cnt & 0xFFFF; // length of pattern in words
4476 Uint32 cnt = len_cnt >> 16; // no of parameters
4477
4478 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4479 ndbrequire((cnt==0) == ((treeBits & Node::SI_PRUNE_PARAMS) ==0));
4480 ndbrequire((cnt==0) == ((paramBits & Params::SIP_PRUNE_PARAMS)==0));
4481
4482 if (treeBits & Node::SI_PRUNE_LINKED)
4483 {
4484 jam();
4485 DEBUG("LINKED-PRUNE PATTERN w/ " << cnt << " PARAM values");
4486
4487 data.m_prunePattern.init();
4488 Local_pattern_store pattern(pool, data.m_prunePattern);
4489
4490 /**
4491 * Expand pattern into a new pattern (with linked values)
4492 */
4493 err = expand(pattern, treeNodePtr, tree, len, param, cnt);
4494 if (unlikely(err != 0))
4495 break;
4496
4497 treeNodePtr.p->m_bits |= TreeNode::T_PRUNE_PATTERN;
4498 c_Counters.incr_counter(CI_PRUNED_RANGE_SCANS_RECEIVED, 1);
4499 }
4500 else
4501 {
4502 jam();
4503 DEBUG("FIXED-PRUNE w/ " << cnt << " PARAM values");
4504
4505 /**
4506 * Expand pattern directly into
4507 * This means a "fixed" pruning from here on
4508 * i.e guaranteed single partition
4509 */
4510 Uint32 prunePtrI = RNIL;
4511 bool hasNull;
4512 err = expand(prunePtrI, tree, len, param, cnt, hasNull);
4513 if (unlikely(err != 0))
4514 break;
4515
4516 if (unlikely(hasNull))
4517 {
4518 /* API should have elliminated requests w/ const-NULL keys */
4519 jam();
4520 DEBUG("BEWARE: T_CONST_PRUNE-key contain NULL values");
4521 // treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE;
4522 // break;
4523 ndbrequire(false);
4524 }
4525 ndbrequire(prunePtrI != RNIL); /* todo: can we allow / take advantage of NULLs in range scan? */
4526 data.m_constPrunePtrI = prunePtrI;
4527
4528 /**
4529 * We may not compute the partition for the hash-key here
4530 * as we have not yet opened a read-view
4531 */
4532 treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE;
4533 c_Counters.incr_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED, 1);
4534 }
4535 } //SI_PRUNE_PATTERN
4536
4537 if ((treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE) == 0 &&
4538 ((treeBits & Node::SI_PARALLEL) ||
4539 ((paramBits & Params::SIP_PARALLEL))))
4540 {
4541 jam();
4542 treeNodePtr.p->m_bits |= TreeNode::T_SCAN_PARALLEL;
4543 }
4544
4545 return 0;
4546 } while(0);
4547
4548 DEBUG_CRASH();
4549 return err;
4550 }
4551
4552 void
scanIndex_prepare(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4553 Dbspj::scanIndex_prepare(Signal * signal,
4554 Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
4555 {
4556 jam();
4557
4558 treeNodePtr.p->m_state = TreeNode::TN_PREPARING;
4559 ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
4560
4561 DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend();
4562 req->senderRef = reference();
4563 req->senderData = treeNodePtr.i;
4564 req->tableId = dst->tableId;
4565 req->schemaTransId = 0;
4566 sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
4567 DihScanTabReq::SignalLength, JBB);
4568
4569 requestPtr.p->m_outstanding++;
4570 }
4571
4572 void
execDIH_SCAN_TAB_REF(Signal * signal)4573 Dbspj::execDIH_SCAN_TAB_REF(Signal* signal)
4574 {
4575 jamEntry();
4576 ndbrequire(false);
4577 }
4578
4579 void
execDIH_SCAN_TAB_CONF(Signal * signal)4580 Dbspj::execDIH_SCAN_TAB_CONF(Signal* signal)
4581 {
4582 jamEntry();
4583 DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr();
4584
4585 Ptr<TreeNode> treeNodePtr;
4586 m_treenode_pool.getPtr(treeNodePtr, conf->senderData);
4587 ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4588
4589 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4590
4591 Uint32 cookie = conf->scanCookie;
4592 Uint32 fragCount = conf->fragmentCount;
4593 ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq;
4594
4595 if (conf->reorgFlag)
4596 {
4597 jam();
4598 ScanFragReq::setReorgFlag(dst->requestInfo, 1);
4599 }
4600
4601 data.m_fragCount = fragCount;
4602 data.m_scanCookie = cookie;
4603
4604 const Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
4605 bool pruned = (treeNodePtr.p->m_bits & prunemask) != 0;
4606
4607 Ptr<Request> requestPtr;
4608 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4609
4610 Ptr<ScanFragHandle> fragPtr;
4611 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4612 if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
4613 {
4614 jam();
4615 fragPtr.p->init(0);
4616 fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4617 list.addLast(fragPtr);
4618 }
4619 else
4620 {
4621 jam();
4622 goto error1;
4623 }
4624
4625 if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE)
4626 {
4627 jam();
4628
4629 // TODO we need a different variant of computeHash here,
4630 // since m_constPrunePtrI does not contain full primary key
4631 // but only parts in distribution key
4632
4633 BuildKeyReq tmp;
4634 Uint32 indexId = dst->tableId;
4635 Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4636 Uint32 err = computePartitionHash(signal, tmp, tableId, data.m_constPrunePtrI);
4637 if (unlikely(err != 0))
4638 goto error;
4639
4640 releaseSection(data.m_constPrunePtrI);
4641 data.m_constPrunePtrI = RNIL;
4642
4643 err = getNodes(signal, tmp, tableId);
4644 if (unlikely(err != 0))
4645 goto error;
4646
4647 fragPtr.p->m_fragId = tmp.fragId;
4648 fragPtr.p->m_ref = tmp.receiverRef;
4649 data.m_fragCount = 1;
4650 }
4651 else if (fragCount == 1)
4652 {
4653 jam();
4654 /**
4655 * This is roughly equivalent to T_CONST_PRUNE
4656 * pretend that it is const-pruned
4657 */
4658 if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
4659 {
4660 jam();
4661 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4662 Local_pattern_store pattern(pool, data.m_prunePattern);
4663 pattern.release();
4664 }
4665 data.m_constPrunePtrI = RNIL;
4666 Uint32 clear = TreeNode::T_PRUNE_PATTERN | TreeNode::T_SCAN_PARALLEL;
4667 treeNodePtr.p->m_bits &= ~clear;
4668 treeNodePtr.p->m_bits |= TreeNode::T_CONST_PRUNE;
4669
4670 /**
4671 * We must get fragPtr.p->m_ref...so set pruned=false
4672 */
4673 pruned = false;
4674 }
4675 else
4676 {
4677 for (Uint32 i = 1; i<fragCount; i++)
4678 {
4679 jam();
4680 Ptr<ScanFragHandle> fragPtr;
4681 if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
4682 {
4683 jam();
4684 fragPtr.p->init(i);
4685 fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
4686 list.addLast(fragPtr);
4687 }
4688 else
4689 {
4690 goto error1;
4691 }
4692 }
4693 }
4694 data.m_frags_complete = data.m_fragCount;
4695
4696 if (!pruned)
4697 {
4698 jam();
4699 Uint32 tableId = ((ScanFragReq*)data.m_scanFragReq)->tableId;
4700 DihScanGetNodesReq * req = (DihScanGetNodesReq*)signal->getDataPtrSend();
4701 req->senderRef = reference();
4702 req->tableId = tableId;
4703 req->scanCookie = cookie;
4704
4705 Uint32 cnt = 0;
4706 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4707 {
4708 jam();
4709 req->senderData = fragPtr.i;
4710 req->fragId = fragPtr.p->m_fragId;
4711 sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
4712 DihScanGetNodesReq::SignalLength, JBB);
4713 cnt++;
4714 }
4715 data.m_frags_outstanding = cnt;
4716 requestPtr.p->m_outstanding++;
4717 }
4718 else
4719 {
4720 jam();
4721 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4722 }
4723 checkPrepareComplete(signal, requestPtr, 1);
4724
4725 return;
4726
4727 error1:
4728 error:
4729 ndbrequire(false);
4730 }
4731
4732 void
execDIH_SCAN_GET_NODES_REF(Signal * signal)4733 Dbspj::execDIH_SCAN_GET_NODES_REF(Signal* signal)
4734 {
4735 jamEntry();
4736 ndbrequire(false);
4737 }
4738
4739 void
execDIH_SCAN_GET_NODES_CONF(Signal * signal)4740 Dbspj::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
4741 {
4742 jamEntry();
4743
4744 DihScanGetNodesConf * conf = (DihScanGetNodesConf*)signal->getDataPtr();
4745
4746 Uint32 senderData = conf->senderData;
4747 Uint32 node = conf->nodes[0];
4748 Uint32 instanceKey = conf->instanceKey;
4749
4750 Ptr<ScanFragHandle> fragPtr;
4751 m_scanfraghandle_pool.getPtr(fragPtr, senderData);
4752 Ptr<TreeNode> treeNodePtr;
4753 m_treenode_pool.getPtr(treeNodePtr, fragPtr.p->m_treeNodePtrI);
4754 ndbrequire(treeNodePtr.p->m_info == &g_ScanIndexOpInfo);
4755 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4756 ndbrequire(data.m_frags_outstanding > 0);
4757 data.m_frags_outstanding--;
4758
4759 fragPtr.p->m_ref = numberToRef(DBLQH, instanceKey, node);
4760
4761 if (data.m_frags_outstanding == 0)
4762 {
4763 jam();
4764
4765 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
4766
4767 Ptr<Request> requestPtr;
4768 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
4769 checkPrepareComplete(signal, requestPtr, 1);
4770 }
4771 }
4772
4773 Uint32
scanIndex_findFrag(Local_ScanFragHandle_list & list,Ptr<ScanFragHandle> & fragPtr,Uint32 fragId)4774 Dbspj::scanIndex_findFrag(Local_ScanFragHandle_list & list,
4775 Ptr<ScanFragHandle> & fragPtr, Uint32 fragId)
4776 {
4777 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
4778 {
4779 jam();
4780 if (fragPtr.p->m_fragId == fragId)
4781 {
4782 jam();
4783 return 0;
4784 }
4785 }
4786
4787 return 99; // TODO
4788 }
4789
4790 void
scanIndex_parent_row(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)4791 Dbspj::scanIndex_parent_row(Signal* signal,
4792 Ptr<Request> requestPtr,
4793 Ptr<TreeNode> treeNodePtr,
4794 const RowPtr & rowRef)
4795 {
4796 jam();
4797
4798 Uint32 err;
4799 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4800
4801 /**
4802 * Construct range definition,
4803 * and if prune pattern enabled
4804 * stuff it onto correct scanindexFrag
4805 */
4806 do
4807 {
4808 Ptr<ScanFragHandle> fragPtr;
4809 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4810 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
4811 if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
4812 {
4813 jam();
4814
4815 /**
4816 * TODO: Expand into linear memory instead
4817 * of expanding into sections, and then copy
4818 * section into linear
4819 */
4820 Local_pattern_store pattern(pool, data.m_prunePattern);
4821 Uint32 pruneKeyPtrI = RNIL;
4822 bool hasNull;
4823 err = expand(pruneKeyPtrI, pattern, rowRef, hasNull);
4824 if (unlikely(err != 0))
4825 {
4826 DEBUG_CRASH();
4827 break;
4828 }
4829
4830 if (unlikely(hasNull))
4831 {
4832 jam();
4833 DEBUG("T_PRUNE_PATTERN-key contain NULL values");
4834
4835 // Ignore this request as 'NULL == <column>' will never give a match
4836 if (pruneKeyPtrI != RNIL)
4837 {
4838 releaseSection(pruneKeyPtrI);
4839 }
4840 return; // Bailout, SCANREQ would have returned 0 rows anyway
4841 }
4842
4843 // TODO we need a different variant of computeHash here,
4844 // since pruneKeyPtrI does not contain full primary key
4845 // but only parts in distribution key
4846
4847 BuildKeyReq tmp;
4848 ScanFragReq * dst = (ScanFragReq*)data.m_scanFragReq;
4849 Uint32 indexId = dst->tableId;
4850 Uint32 tableId = g_key_descriptor_pool.getPtr(indexId)->primaryTableId;
4851 err = computePartitionHash(signal, tmp, tableId, pruneKeyPtrI);
4852 releaseSection(pruneKeyPtrI); // see ^ TODO
4853 if (unlikely(err != 0))
4854 {
4855 DEBUG_CRASH();
4856 break;
4857 }
4858
4859 err = getNodes(signal, tmp, tableId);
4860 if (unlikely(err != 0))
4861 {
4862 DEBUG_CRASH();
4863 break;
4864 }
4865
4866 err = scanIndex_findFrag(list, fragPtr, tmp.fragId);
4867 if (unlikely(err != 0))
4868 {
4869 DEBUG_CRASH();
4870 break;
4871 }
4872
4873 /**
4874 * NOTE: We can get different receiverRef's here
4875 * for different keys. E.g during node-recovery where
4876 * primary-fragment is switched.
4877 *
4878 * Use latest that we receive
4879 *
4880 * TODO: Also double check table-reorg
4881 */
4882 fragPtr.p->m_ref = tmp.receiverRef;
4883 }
4884 else
4885 {
4886 jam();
4887 /**
4888 * If const prune, or no-prune, store on first fragment,
4889 * and send to 1 or all resp.
4890 */
4891 list.first(fragPtr);
4892 }
4893
4894 Uint32 ptrI = fragPtr.p->m_rangePtrI;
4895 bool hasNull;
4896 if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED)
4897 {
4898 jam();
4899 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
4900 err = expand(ptrI, pattern, rowRef, hasNull);
4901 if (unlikely(err != 0))
4902 {
4903 DEBUG_CRASH();
4904 break;
4905 }
4906 }
4907 else
4908 {
4909 jam();
4910 // Fixed key...fix later...
4911 ndbrequire(false);
4912 }
4913 // ndbrequire(!hasNull); // FIXME, can't ignore request as we already added it to keyPattern
4914 fragPtr.p->m_rangePtrI = ptrI;
4915 scanIndex_fixupBound(fragPtr, ptrI, rowRef.m_src_correlation);
4916
4917 if (treeNodePtr.p->m_bits & TreeNode::T_ONE_SHOT)
4918 {
4919 jam();
4920 /**
4921 * We being a T_ONE_SHOT means that we're only be called
4922 * with parent_row once, i.e batch is complete
4923 */
4924 scanIndex_parent_batch_complete(signal, requestPtr, treeNodePtr);
4925 }
4926
4927 return;
4928 } while (0);
4929
4930 ndbrequire(false);
4931 }
4932
4933
4934 void
scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr,Uint32 ptrI,Uint32 corrVal)4935 Dbspj::scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr,
4936 Uint32 ptrI, Uint32 corrVal)
4937 {
4938 /**
4939 * Index bounds...need special tender and care...
4940 *
4941 * 1) Set #bound no, bound-size, and renumber attributes
4942 */
4943 SectionReader r0(ptrI, getSectionSegmentPool());
4944 ndbrequire(r0.step(fragPtr.p->m_range_builder.m_range_size));
4945 Uint32 boundsz = r0.getSize() - fragPtr.p->m_range_builder.m_range_size;
4946 Uint32 boundno = fragPtr.p->m_range_builder.m_range_cnt + 1;
4947
4948 Uint32 tmp;
4949 ndbrequire(r0.peekWord(&tmp));
4950 tmp |= (boundsz << 16) | ((corrVal & 0xFFF) << 4);
4951 ndbrequire(r0.updateWord(tmp));
4952 ndbrequire(r0.step(1)); // Skip first BoundType
4953
4954 // TODO: Renumbering below assume there are only EQ-bounds !!
4955 Uint32 id = 0;
4956 Uint32 len32;
4957 do
4958 {
4959 ndbrequire(r0.peekWord(&tmp));
4960 AttributeHeader ah(tmp);
4961 Uint32 len = ah.getByteSize();
4962 AttributeHeader::init(&tmp, id++, len);
4963 ndbrequire(r0.updateWord(tmp));
4964 len32 = (len + 3) >> 2;
4965 } while (r0.step(2 + len32)); // Skip AttributeHeader(1) + Attribute(len32) + next BoundType(1)
4966
4967 fragPtr.p->m_range_builder.m_range_cnt = boundno;
4968 fragPtr.p->m_range_builder.m_range_size = r0.getSize();
4969 }
4970
4971 void
scanIndex_parent_batch_complete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)4972 Dbspj::scanIndex_parent_batch_complete(Signal* signal,
4973 Ptr<Request> requestPtr,
4974 Ptr<TreeNode> treeNodePtr)
4975 {
4976 jam();
4977
4978 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
4979 data.m_rows_received = 0;
4980 data.m_rows_expecting = 0;
4981 ndbassert(data.m_frags_outstanding == 0);
4982 ndbassert(data.m_frags_complete == data.m_fragCount);
4983 data.m_frags_complete = 0;
4984
4985 Ptr<ScanFragHandle> fragPtr;
4986 {
4987 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
4988 list.first(fragPtr);
4989
4990 if ((treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) == 0)
4991 {
4992 if (fragPtr.p->m_rangePtrI == RNIL)
4993 {
4994 // No keys found
4995 jam();
4996 data.m_frags_complete = data.m_fragCount;
4997 }
4998 }
4999 else
5000 {
5001 while(!fragPtr.isNull())
5002 {
5003 if (fragPtr.p->m_rangePtrI == RNIL)
5004 {
5005 jam();
5006 /**
5007 * This is a pruned scan, so we must scan those fragments that
5008 * some distribution key hashed to.
5009 */
5010 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5011 data.m_frags_complete++;
5012 }
5013 list.next(fragPtr);
5014 }
5015 }
5016 }
5017 data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
5018
5019 if (data.m_frags_complete == data.m_fragCount)
5020 {
5021 jam();
5022 /**
5023 * No keys was produced...
5024 */
5025 return;
5026 }
5027
5028 /**
5029 * When parent's batch is complete, we send our batch
5030 */
5031 const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
5032 ndbrequire(org->batch_size_rows > 0);
5033
5034 if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
5035 {
5036 jam();
5037 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5038 org->batch_size_rows);
5039 }
5040 else if (data.m_firstExecution)
5041 {
5042 /**
5043 * Having a high parallelism would allow us to fetch data from many
5044 * fragments in parallel and thus reduce the number of round trips.
5045 * On the other hand, we should set parallelism so low that we can fetch
5046 * all data from a fragment in one batch if possible.
5047 * Since this is the first execution, we do not know how many rows or bytes
5048 * this operation is likely to return. Therefore we set parallelism to 1,
5049 * since this gives the lowest penalty if our guess is wrong.
5050 */
5051 jam();
5052 data.m_parallelism = 1;
5053 }
5054 else
5055 {
5056 jam();
5057 /**
5058 * Use statistics from earlier runs of this operation to estimate the
5059 * initial parallelism. We use the mean minus two times the standard
5060 * deviation to have a low risk of setting parallelism to high (as erring
5061 * in the other direction is more costly).
5062 */
5063 Int32 parallelism =
5064 static_cast<Int32>(MIN(data.m_parallelismStat.getMean()
5065 - 2 * data.m_parallelismStat.getStdDev(),
5066 org->batch_size_rows));
5067
5068 if (parallelism < 1)
5069 {
5070 jam();
5071 parallelism = 1;
5072 }
5073 else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
5074 {
5075 jam();
5076 /**
5077 * Set parallelism such that we can expect to have similar
5078 * parallelism in each batch. For example if there are 8 remaining
5079 * fragments, then we should fecth 2 times 4 fragments rather than
5080 * 7+1.
5081 */
5082 const Int32 roundTrips =
5083 1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
5084 parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
5085 }
5086
5087 data.m_parallelism = static_cast<Uint32>(parallelism);
5088
5089 #ifdef DEBUG_SCAN_FRAGREQ
5090 DEBUG("::scanIndex_send() starting index scan with parallelism="
5091 << data.m_parallelism);
5092 #endif
5093 }
5094 ndbrequire(data.m_parallelism > 0);
5095
5096 const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
5097 const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
5098 ndbassert(bs_rows > 0);
5099 ndbassert(bs_bytes > 0);
5100
5101 data.m_largestBatchRows = 0;
5102 data.m_largestBatchBytes = 0;
5103 data.m_totalRows = 0;
5104 data.m_totalBytes = 0;
5105
5106 {
5107 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5108 Ptr<ScanFragHandle> fragPtr;
5109 list.first(fragPtr);
5110
5111 while(!fragPtr.isNull())
5112 {
5113 ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
5114 fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
5115 fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
5116 list.next(fragPtr);
5117 }
5118 }
5119
5120 Uint32 batchRange = 0;
5121 scanIndex_send(signal,
5122 requestPtr,
5123 treeNodePtr,
5124 data.m_parallelism,
5125 bs_bytes,
5126 bs_rows,
5127 batchRange);
5128
5129 data.m_firstExecution = false;
5130
5131 ndbrequire(static_cast<Uint32>(data.m_frags_outstanding +
5132 data.m_frags_complete) <=
5133 data.m_fragCount);
5134
5135 data.m_batch_chunks = 1;
5136 requestPtr.p->m_cnt_active++;
5137 requestPtr.p->m_outstanding++;
5138 treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
5139 }
5140
5141 void
scanIndex_parent_batch_repeat(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5142 Dbspj::scanIndex_parent_batch_repeat(Signal* signal,
5143 Ptr<Request> requestPtr,
5144 Ptr<TreeNode> treeNodePtr)
5145 {
5146 jam();
5147 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5148
5149 DEBUG("scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no
5150 << ", m_batch_chunks: " << data.m_batch_chunks);
5151
5152 ndbassert(treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE);
5153
5154 /**
5155 * Register index-scans to be restarted if we didn't get all
5156 * previously fetched parent related child rows in a single batch.
5157 */
5158 if (data.m_batch_chunks > 1)
5159 {
5160 jam();
5161 DEBUG("Register TreeNode for restart, m_node_no: " << treeNodePtr.p->m_node_no);
5162 ndbrequire(treeNodePtr.p->m_state != TreeNode::TN_ACTIVE);
5163 registerActiveCursor(requestPtr, treeNodePtr);
5164 data.m_batch_chunks = 0;
5165 }
5166 }
5167
5168 /**
5169 * Ask for the first batch for a number of fragments.
5170 */
5171 void
scanIndex_send(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Uint32 noOfFrags,Uint32 bs_bytes,Uint32 bs_rows,Uint32 & batchRange)5172 Dbspj::scanIndex_send(Signal* signal,
5173 Ptr<Request> requestPtr,
5174 Ptr<TreeNode> treeNodePtr,
5175 Uint32 noOfFrags,
5176 Uint32 bs_bytes,
5177 Uint32 bs_rows,
5178 Uint32& batchRange)
5179 {
5180 /**
5181 * if (m_bits & prunemask):
5182 * - Range keys sliced out to each ScanFragHandle
5183 * - Else, range keys kept on first (and only) ScanFragHandle
5184 */
5185 const bool prune = treeNodePtr.p->m_bits &
5186 (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
5187
5188 /**
5189 * If scan is repeatable, we must make sure not to release range keys so
5190 * that we canuse them again in the next repetition.
5191 */
5192 const bool repeatable =
5193 (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
5194
5195 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5196 ndbassert(noOfFrags > 0);
5197 ndbassert(data.m_frags_not_started >= noOfFrags);
5198 ScanFragReq* const req =
5199 reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
5200 const ScanFragReq * const org
5201 = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
5202 memcpy(req, org, sizeof(data.m_scanFragReq));
5203 // req->variableData[0] // set below
5204 req->variableData[1] = requestPtr.p->m_rootResultData;
5205 req->batch_size_bytes = bs_bytes;
5206 req->batch_size_rows = bs_rows;
5207
5208 Uint32 requestsSent = 0;
5209 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5210 Ptr<ScanFragHandle> fragPtr;
5211 list.first(fragPtr);
5212 Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
5213 ndbrequire(prune || keyInfoPtrI != RNIL);
5214 /**
5215 * Iterate over the list of fragments until we have sent as many
5216 * SCAN_FRAGREQs as we should.
5217 */
5218 while (requestsSent < noOfFrags)
5219 {
5220 jam();
5221 ndbassert(!fragPtr.isNull());
5222
5223 if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
5224 {
5225 // Skip forward to the frags that we should send.
5226 jam();
5227 list.next(fragPtr);
5228 continue;
5229 }
5230
5231 const Uint32 ref = fragPtr.p->m_ref;
5232
5233 if (noOfFrags==1 && !prune &&
5234 data.m_frags_not_started == data.m_fragCount &&
5235 refToNode(ref) != getOwnNodeId() &&
5236 list.hasNext(fragPtr))
5237 {
5238 /**
5239 * If we are doing a scan with adaptive parallelism and start with
5240 * parallelism=1 then it makes sense to fetch a batch from a fragment on
5241 * the local data node. The reason for this is that if that fragment
5242 * contains few rows, we may be able to read from several fragments in
5243 * parallel. Then we minimize the total number of round trips (to remote
5244 * data nodes) if we fetch the first fragment batch locally.
5245 */
5246 jam();
5247 list.next(fragPtr);
5248 continue;
5249 }
5250
5251 SectionHandle handle(this);
5252
5253 Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
5254
5255 /**
5256 * Set data specific for this fragment
5257 */
5258 req->senderData = fragPtr.i;
5259 req->fragmentNoKeyLen = fragPtr.p->m_fragId;
5260
5261 if (prune)
5262 {
5263 jam();
5264 keyInfoPtrI = fragPtr.p->m_rangePtrI;
5265 if (keyInfoPtrI == RNIL)
5266 {
5267 /**
5268 * Since we use pruning, we can see that no parent rows would hash
5269 * to this fragment.
5270 */
5271 jam();
5272 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5273 list.next(fragPtr);
5274 continue;
5275 }
5276
5277 if (!repeatable)
5278 {
5279 /**
5280 * If we'll use sendSignal() and we need to send the attrInfo several
5281 * times, we need to copy them. (For repeatable or unpruned scans
5282 * we use sendSignalNoRelease(), so then we do not need to copy.)
5283 */
5284 jam();
5285 Uint32 tmp = RNIL;
5286 ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
5287 attrInfoPtrI = tmp;
5288 }
5289 }
5290
5291 req->variableData[0] = batchRange;
5292 getSection(handle.m_ptr[0], attrInfoPtrI);
5293 getSection(handle.m_ptr[1], keyInfoPtrI);
5294 handle.m_cnt = 2;
5295
5296 #if defined DEBUG_SCAN_FRAGREQ
5297 ndbout_c("SCAN_FRAGREQ to %x", ref);
5298 printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
5299 NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
5300 DBLQH);
5301 printf("ATTRINFO: ");
5302 print(handle.m_ptr[0], stdout);
5303 printf("KEYINFO: ");
5304 print(handle.m_ptr[1], stdout);
5305 #endif
5306
5307 if (refToNode(ref) == getOwnNodeId())
5308 {
5309 c_Counters.incr_counter(CI_LOCAL_RANGE_SCANS_SENT, 1);
5310 }
5311 else
5312 {
5313 c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
5314 }
5315
5316 if (prune && !repeatable)
5317 {
5318 /**
5319 * For a non-repeatable pruned scan, key info is unique for each
5320 * fragment and therefore cannot be reused, so we release key info
5321 * right away.
5322 */
5323 jam();
5324 sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
5325 NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5326 fragPtr.p->m_rangePtrI = RNIL;
5327 fragPtr.p->reset_ranges();
5328 }
5329 else
5330 {
5331 /**
5332 * Reuse key info for multiple fragments and/or multiple repetitions
5333 * of the scan.
5334 */
5335 jam();
5336 sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
5337 NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
5338 }
5339 handle.clear();
5340
5341 fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
5342 data.m_frags_outstanding++;
5343 batchRange += bs_rows;
5344 requestsSent++;
5345 list.next(fragPtr);
5346 } // while (requestsSent < noOfFrags)
5347
5348 data.m_frags_not_started -= requestsSent;
5349 }
5350
5351 void
scanIndex_execTRANSID_AI(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,const RowPtr & rowRef)5352 Dbspj::scanIndex_execTRANSID_AI(Signal* signal,
5353 Ptr<Request> requestPtr,
5354 Ptr<TreeNode> treeNodePtr,
5355 const RowPtr & rowRef)
5356 {
5357 jam();
5358
5359 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
5360 Local_dependency_map list(pool, treeNodePtr.p->m_dependent_nodes);
5361 Dependency_map::ConstDataBufferIterator it;
5362
5363 {
5364 for (list.first(it); !it.isNull(); list.next(it))
5365 {
5366 jam();
5367 Ptr<TreeNode> childPtr;
5368 m_treenode_pool.getPtr(childPtr, * it.data);
5369 ndbrequire(childPtr.p->m_info != 0&&childPtr.p->m_info->m_parent_row!=0);
5370 (this->*(childPtr.p->m_info->m_parent_row))(signal,
5371 requestPtr, childPtr,rowRef);
5372 }
5373 }
5374
5375 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5376 data.m_rows_received++;
5377
5378 if (data.m_frags_outstanding == 0 &&
5379 data.m_rows_received == data.m_rows_expecting)
5380 {
5381 jam();
5382 /**
5383 * Finished...
5384 */
5385 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
5386 {
5387 jam();
5388 reportBatchComplete(signal, requestPtr, treeNodePtr);
5389 }
5390
5391 checkBatchComplete(signal, requestPtr, 1);
5392 return;
5393 }
5394 }
5395
5396 void
scanIndex_execSCAN_FRAGCONF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> fragPtr)5397 Dbspj::scanIndex_execSCAN_FRAGCONF(Signal* signal,
5398 Ptr<Request> requestPtr,
5399 Ptr<TreeNode> treeNodePtr,
5400 Ptr<ScanFragHandle> fragPtr)
5401 {
5402 jam();
5403
5404 const ScanFragConf * conf = (const ScanFragConf*)(signal->getDataPtr());
5405
5406 Uint32 rows = conf->completedOps;
5407 Uint32 done = conf->fragmentCompleted;
5408
5409 Uint32 state = fragPtr.p->m_state;
5410 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5411
5412 if (state == ScanFragHandle::SFH_WAIT_CLOSE && done == 0)
5413 {
5414 jam();
5415 /**
5416 * We sent an explicit close request...ignore this...a close will come later
5417 */
5418 return;
5419 }
5420
5421 requestPtr.p->m_rows += rows;
5422 data.m_totalRows += rows;
5423 data.m_totalBytes += conf->total_len;
5424 data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
5425 data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
5426
5427 if (!treeNodePtr.p->isLeaf())
5428 {
5429 jam();
5430 data.m_rows_expecting += rows;
5431 }
5432 ndbrequire(data.m_frags_outstanding);
5433 ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5434 state == ScanFragHandle::SFH_WAIT_CLOSE);
5435
5436 data.m_frags_outstanding--;
5437 fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_NEXTREQ;
5438
5439 if (done)
5440 {
5441 jam();
5442 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5443 ndbrequire(data.m_frags_complete < data.m_fragCount);
5444 data.m_frags_complete++;
5445
5446 if (data.m_frags_complete == data.m_fragCount ||
5447 ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
5448 data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
5449 {
5450 jam();
5451 ndbrequire(requestPtr.p->m_cnt_active);
5452 requestPtr.p->m_cnt_active--;
5453 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5454 }
5455 }
5456
5457
5458 if (data.m_frags_outstanding == 0)
5459 {
5460 const ScanFragReq * const org
5461 = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
5462
5463 if (data.m_frags_complete == data.m_fragCount)
5464 {
5465 jam();
5466 /**
5467 * Calculate what would have been the optimal parallelism for the
5468 * scan instance that we have just completed, and update
5469 * 'parallelismStat' with this value. We then use this statistics to set
5470 * the initial parallelism for the next instance of this operation.
5471 */
5472 double parallelism = data.m_fragCount;
5473 if (data.m_totalRows > 0)
5474 {
5475 parallelism = MIN(parallelism,
5476 double(org->batch_size_rows) / data.m_totalRows);
5477 }
5478 if (data.m_totalBytes > 0)
5479 {
5480 parallelism = MIN(parallelism,
5481 double(org->batch_size_bytes) / data.m_totalBytes);
5482 }
5483 data.m_parallelismStat.update(parallelism);
5484 }
5485
5486 /**
5487 * Don't reportBatchComplete to children if we're aborting...
5488 */
5489 if (state == ScanFragHandle::SFH_WAIT_CLOSE)
5490 {
5491 jam();
5492 ndbrequire((requestPtr.p->m_state & Request::RS_ABORTING) != 0);
5493 }
5494 else if (! (data.m_rows_received == data.m_rows_expecting))
5495 {
5496 jam();
5497 return;
5498 }
5499 else
5500 {
5501 if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE)
5502 {
5503 jam();
5504 reportBatchComplete(signal, requestPtr, treeNodePtr);
5505 }
5506 }
5507
5508 checkBatchComplete(signal, requestPtr, 1);
5509 return;
5510 }
5511 }
5512
5513 void
scanIndex_execSCAN_FRAGREF(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,Ptr<ScanFragHandle> fragPtr)5514 Dbspj::scanIndex_execSCAN_FRAGREF(Signal* signal,
5515 Ptr<Request> requestPtr,
5516 Ptr<TreeNode> treeNodePtr,
5517 Ptr<ScanFragHandle> fragPtr)
5518 {
5519 jam();
5520
5521 const ScanFragRef * rep = CAST_CONSTPTR(ScanFragRef, signal->getDataPtr());
5522 const Uint32 errCode = rep->errorCode;
5523
5524 Uint32 state = fragPtr.p->m_state;
5525 ndbrequire(state == ScanFragHandle::SFH_SCANNING ||
5526 state == ScanFragHandle::SFH_WAIT_CLOSE);
5527
5528 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5529
5530 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5531 ndbrequire(data.m_frags_complete < data.m_fragCount);
5532 data.m_frags_complete++;
5533 ndbrequire(data.m_frags_outstanding > 0);
5534 data.m_frags_outstanding--;
5535
5536 if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5537 {
5538 jam();
5539 ndbrequire(requestPtr.p->m_cnt_active);
5540 requestPtr.p->m_cnt_active--;
5541 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5542 }
5543
5544 if (data.m_frags_outstanding == 0)
5545 {
5546 jam();
5547 ndbrequire(requestPtr.p->m_outstanding);
5548 requestPtr.p->m_outstanding--;
5549 }
5550
5551 abort(signal, requestPtr, errCode);
5552 }
5553
5554 void
scanIndex_execSCAN_NEXTREQ(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5555 Dbspj::scanIndex_execSCAN_NEXTREQ(Signal* signal,
5556 Ptr<Request> requestPtr,
5557 Ptr<TreeNode> treeNodePtr)
5558 {
5559 jam();
5560
5561 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5562 const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
5563
5564 data.m_rows_received = 0;
5565 data.m_rows_expecting = 0;
5566 ndbassert(data.m_frags_outstanding == 0);
5567
5568 ndbrequire(data.m_frags_complete < data.m_fragCount);
5569 if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
5570 {
5571 jam();
5572 /**
5573 * Since fetching few but large batches is more efficient, we
5574 * set parallelism to the lowest value where we can still expect each
5575 * batch to be full.
5576 */
5577 if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
5578 data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
5579 {
5580 jam();
5581 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5582 org->batch_size_rows);
5583 if (data.m_largestBatchRows > 0)
5584 {
5585 jam();
5586 data.m_parallelism =
5587 MIN(org->batch_size_rows / data.m_largestBatchRows,
5588 data.m_parallelism);
5589 }
5590 if (data.m_largestBatchBytes > 0)
5591 {
5592 jam();
5593 data.m_parallelism =
5594 MIN(data.m_parallelism,
5595 org->batch_size_bytes/data.m_largestBatchBytes);
5596 }
5597 if (data.m_frags_complete == 0 &&
5598 data.m_frags_not_started % data.m_parallelism != 0)
5599 {
5600 jam();
5601 /**
5602 * Set parallelism such that we can expect to have similar
5603 * parallelism in each batch. For example if there are 8 remaining
5604 * fragments, then we should fecth 2 times 4 fragments rather than
5605 * 7+1.
5606 */
5607 const Uint32 roundTrips =
5608 1 + data.m_frags_not_started / data.m_parallelism;
5609 data.m_parallelism = data.m_frags_not_started / roundTrips;
5610 }
5611 }
5612 else
5613 {
5614 jam();
5615 // We get full batches, so we should lower parallelism.
5616 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5617 MAX(1, data.m_parallelism/2));
5618 }
5619 ndbassert(data.m_parallelism > 0);
5620 #ifdef DEBUG_SCAN_FRAGREQ
5621 DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
5622 data.m_parallelism <<
5623 " fragments with " << org->batch_size_rows/data.m_parallelism <<
5624 " rows and " << org->batch_size_bytes/data.m_parallelism <<
5625 " bytes.");
5626 #endif
5627 }
5628 else
5629 {
5630 jam();
5631 data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
5632 org->batch_size_rows);
5633 }
5634
5635 const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
5636 ndbassert(bs_rows > 0);
5637 ScanFragNextReq* req =
5638 reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
5639 req->requestInfo = 0;
5640 ScanFragNextReq::setCorrFactorFlag(req->requestInfo);
5641 req->transId1 = requestPtr.p->m_transId[0];
5642 req->transId2 = requestPtr.p->m_transId[1];
5643 req->batch_size_rows = bs_rows;
5644 req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
5645
5646 Uint32 batchRange = 0;
5647 Ptr<ScanFragHandle> fragPtr;
5648 Uint32 sentFragCount = 0;
5649 {
5650 /**
5651 * First, ask for more data from fragments that are already started.
5652 */
5653 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5654 list.first(fragPtr);
5655 while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
5656 {
5657 jam();
5658 ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
5659 fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
5660 fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
5661 if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
5662 {
5663 jam();
5664
5665 data.m_frags_outstanding++;
5666 req->variableData[0] = batchRange;
5667 fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
5668 batchRange += bs_rows;
5669
5670 DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
5671 << treeNodePtr.p->m_send.m_ref
5672 << ", m_node_no=" << treeNodePtr.p->m_node_no
5673 << ", senderData: " << req->senderData);
5674
5675 #ifdef DEBUG_SCAN_FRAGREQ
5676 printSCANFRAGNEXTREQ(stdout, &signal->theData[0],
5677 ScanFragNextReq:: SignalLength + 1, DBLQH);
5678 #endif
5679
5680 req->senderData = fragPtr.i;
5681 sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5682 ScanFragNextReq::SignalLength + 1,
5683 JBB);
5684 sentFragCount++;
5685 }
5686 list.next(fragPtr);
5687 }
5688 }
5689
5690 if (sentFragCount < data.m_parallelism)
5691 {
5692 /**
5693 * Then start new fragments until we reach data.m_parallelism.
5694 */
5695 jam();
5696 ndbassert(data.m_frags_not_started != 0);
5697 scanIndex_send(signal,
5698 requestPtr,
5699 treeNodePtr,
5700 data.m_parallelism - sentFragCount,
5701 org->batch_size_bytes/data.m_parallelism,
5702 bs_rows,
5703 batchRange);
5704 }
5705 /**
5706 * cursor should not have been positioned here...
5707 * unless we actually had something more to send.
5708 * so require that we did actually send something
5709 */
5710 ndbrequire(data.m_frags_outstanding > 0);
5711 ndbrequire(data.m_batch_chunks > 0);
5712 data.m_batch_chunks++;
5713
5714 requestPtr.p->m_outstanding++;
5715 ndbassert(treeNodePtr.p->m_state == TreeNode::TN_ACTIVE);
5716 }
5717
5718 void
scanIndex_complete(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5719 Dbspj::scanIndex_complete(Signal* signal,
5720 Ptr<Request> requestPtr,
5721 Ptr<TreeNode> treeNodePtr)
5722 {
5723 jam();
5724 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5725 ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanindex_data.m_scanFragReq;
5726 if (!data.m_fragments.isEmpty())
5727 {
5728 jam();
5729 DihScanTabCompleteRep* rep=(DihScanTabCompleteRep*)signal->getDataPtrSend();
5730 rep->tableId = dst->tableId;
5731 rep->scanCookie = data.m_scanCookie;
5732 sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP,
5733 signal, DihScanTabCompleteRep::SignalLength, JBB);
5734 }
5735 }
5736
5737 void
scanIndex_abort(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5738 Dbspj::scanIndex_abort(Signal* signal,
5739 Ptr<Request> requestPtr,
5740 Ptr<TreeNode> treeNodePtr)
5741 {
5742 jam();
5743
5744 switch(treeNodePtr.p->m_state){
5745 case TreeNode::TN_BUILDING:
5746 case TreeNode::TN_PREPARING:
5747 case TreeNode::TN_INACTIVE:
5748 case TreeNode::TN_COMPLETING:
5749 case TreeNode::TN_END:
5750 ndbout_c("H'%.8x H'%.8x scanIndex_abort state: %u",
5751 requestPtr.p->m_transId[0],
5752 requestPtr.p->m_transId[1],
5753 treeNodePtr.p->m_state);
5754 return;
5755
5756 case TreeNode::TN_ACTIVE:
5757 jam();
5758 break;
5759 }
5760
5761 ScanFragNextReq* req = CAST_PTR(ScanFragNextReq, signal->getDataPtrSend());
5762 req->requestInfo = ScanFragNextReq::ZCLOSE;
5763 req->transId1 = requestPtr.p->m_transId[0];
5764 req->transId2 = requestPtr.p->m_transId[1];
5765 req->batch_size_rows = 0;
5766 req->batch_size_bytes = 0;
5767
5768 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5769 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5770 Ptr<ScanFragHandle> fragPtr;
5771
5772 Uint32 cnt_waiting = 0;
5773 Uint32 cnt_scanning = 0;
5774 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5775 {
5776 switch(fragPtr.p->m_state){
5777 case ScanFragHandle::SFH_NOT_STARTED:
5778 case ScanFragHandle::SFH_COMPLETE:
5779 case ScanFragHandle::SFH_WAIT_CLOSE:
5780 jam();
5781 break;
5782 case ScanFragHandle::SFH_WAIT_NEXTREQ:
5783 jam();
5784 cnt_waiting++; // was idle...
5785 data.m_frags_outstanding++; // is closing
5786 goto do_abort;
5787 case ScanFragHandle::SFH_SCANNING:
5788 jam();
5789 cnt_scanning++;
5790 goto do_abort;
5791 do_abort:
5792 req->senderData = fragPtr.i;
5793 sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
5794 ScanFragNextReq::SignalLength, JBB);
5795
5796 fragPtr.p->m_state = ScanFragHandle::SFH_WAIT_CLOSE;
5797 break;
5798 }
5799 }
5800
5801 if (cnt_scanning == 0)
5802 {
5803 if (cnt_waiting > 0)
5804 {
5805 /**
5806 * If all were waiting...this should increase m_outstanding
5807 */
5808 jam();
5809 requestPtr.p->m_outstanding++;
5810 }
5811 else
5812 {
5813 /**
5814 * All fragments are either complete or not yet started, so there is
5815 * nothing to abort.
5816 */
5817 jam();
5818 ndbassert(data.m_frags_not_started > 0);
5819 ndbrequire(requestPtr.p->m_cnt_active);
5820 requestPtr.p->m_cnt_active--;
5821 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5822 }
5823 }
5824 }
5825
5826 Uint32
scanIndex_execNODE_FAILREP(Signal * signal,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,NdbNodeBitmask nodes)5827 Dbspj::scanIndex_execNODE_FAILREP(Signal* signal,
5828 Ptr<Request> requestPtr,
5829 Ptr<TreeNode> treeNodePtr,
5830 NdbNodeBitmask nodes)
5831 {
5832 jam();
5833
5834 switch(treeNodePtr.p->m_state){
5835 case TreeNode::TN_PREPARING:
5836 case TreeNode::TN_INACTIVE:
5837 return 1;
5838
5839 case TreeNode::TN_BUILDING:
5840 case TreeNode::TN_COMPLETING:
5841 case TreeNode::TN_END:
5842 return 0;
5843
5844 case TreeNode::TN_ACTIVE:
5845 jam();
5846 break;
5847 }
5848
5849
5850 Uint32 sum = 0;
5851 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5852 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5853 Ptr<ScanFragHandle> fragPtr;
5854
5855 Uint32 save0 = data.m_frags_outstanding;
5856 Uint32 save1 = data.m_frags_complete;
5857
5858 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5859 {
5860 if (nodes.get(refToNode(fragPtr.p->m_ref)) == false)
5861 {
5862 jam();
5863 /**
5864 * No action needed
5865 */
5866 continue;
5867 }
5868
5869 switch(fragPtr.p->m_state){
5870 case ScanFragHandle::SFH_NOT_STARTED:
5871 jam();
5872 ndbrequire(data.m_frags_complete < data.m_fragCount);
5873 data.m_frags_complete++;
5874 ndbrequire(data.m_frags_not_started > 0);
5875 data.m_frags_not_started--;
5876 // fall through
5877 case ScanFragHandle::SFH_COMPLETE:
5878 jam();
5879 sum++; // indicate that we should abort
5880 /**
5881 * we could keep list of all fragments...
5882 * or execute DIGETNODES again...
5883 * but for now, we don't
5884 */
5885 break;
5886 case ScanFragHandle::SFH_WAIT_CLOSE:
5887 case ScanFragHandle::SFH_SCANNING:
5888 jam();
5889 ndbrequire(data.m_frags_outstanding > 0);
5890 data.m_frags_outstanding--;
5891 // fall through
5892 case ScanFragHandle::SFH_WAIT_NEXTREQ:
5893 jam();
5894 sum++;
5895 ndbrequire(data.m_frags_complete < data.m_fragCount);
5896 data.m_frags_complete++;
5897 break;
5898 }
5899 fragPtr.p->m_ref = 0;
5900 fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
5901 }
5902
5903 if (save0 != 0 && data.m_frags_outstanding == 0)
5904 {
5905 jam();
5906 ndbrequire(requestPtr.p->m_outstanding);
5907 requestPtr.p->m_outstanding--;
5908 }
5909
5910 if (save1 != 0 &&
5911 data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
5912 {
5913 jam();
5914 ndbrequire(requestPtr.p->m_cnt_active);
5915 requestPtr.p->m_cnt_active--;
5916 treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
5917 }
5918
5919 return sum;
5920 }
5921
5922 void
scanIndex_release_rangekeys(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5923 Dbspj::scanIndex_release_rangekeys(Ptr<Request> requestPtr,
5924 Ptr<TreeNode> treeNodePtr)
5925 {
5926 jam();
5927 DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
5928 << " m_node_no: " << treeNodePtr.p->m_node_no);
5929
5930 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5931 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5932 Ptr<ScanFragHandle> fragPtr;
5933
5934 if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
5935 {
5936 jam();
5937 for (list.first(fragPtr); !fragPtr.isNull(); list.next(fragPtr))
5938 {
5939 if (fragPtr.p->m_rangePtrI != RNIL)
5940 {
5941 releaseSection(fragPtr.p->m_rangePtrI);
5942 fragPtr.p->m_rangePtrI = RNIL;
5943 }
5944 fragPtr.p->reset_ranges();
5945 }
5946 }
5947 else
5948 {
5949 jam();
5950 list.first(fragPtr);
5951 if (fragPtr.p->m_rangePtrI != RNIL)
5952 {
5953 releaseSection(fragPtr.p->m_rangePtrI);
5954 fragPtr.p->m_rangePtrI = RNIL;
5955 }
5956 fragPtr.p->reset_ranges();
5957 }
5958 }
5959
5960 /**
5961 * Parent batch has completed, and will not refetch (X-joined) results
5962 * from its childs. Release & reset range keys which are unsent or we
5963 * have kept for possible resubmits.
5964 */
5965 void
scanIndex_parent_batch_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5966 Dbspj::scanIndex_parent_batch_cleanup(Ptr<Request> requestPtr,
5967 Ptr<TreeNode> treeNodePtr)
5968 {
5969 DEBUG("scanIndex_parent_batch_cleanup");
5970 scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5971 }
5972
5973 void
scanIndex_cleanup(Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr)5974 Dbspj::scanIndex_cleanup(Ptr<Request> requestPtr,
5975 Ptr<TreeNode> treeNodePtr)
5976 {
5977 ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
5978 DEBUG("scanIndex_cleanup");
5979
5980 /**
5981 * Range keys has been collected wherever there are uncompleted
5982 * parent batches...release them to avoid memleak.
5983 */
5984 scanIndex_release_rangekeys(requestPtr,treeNodePtr);
5985
5986 {
5987 Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
5988 list.remove();
5989 }
5990 if (treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN)
5991 {
5992 jam();
5993 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
5994 Local_pattern_store pattern(pool, data.m_prunePattern);
5995 pattern.release();
5996 }
5997 else if (treeNodePtr.p->m_bits & TreeNode::T_CONST_PRUNE)
5998 {
5999 jam();
6000 if (data.m_constPrunePtrI != RNIL)
6001 {
6002 jam();
6003 releaseSection(data.m_constPrunePtrI);
6004 data.m_constPrunePtrI = RNIL;
6005 }
6006 }
6007
6008 cleanup_common(requestPtr, treeNodePtr);
6009 }
6010
6011 /**
6012 * END - MODULE SCAN INDEX
6013 */
6014
6015 /**
6016 * Static OpInfo handling
6017 */
6018 const Dbspj::OpInfo*
getOpInfo(Uint32 op)6019 Dbspj::getOpInfo(Uint32 op)
6020 {
6021 DEBUG("getOpInfo(" << op << ")");
6022 switch(op){
6023 case QueryNode::QN_LOOKUP:
6024 return &Dbspj::g_LookupOpInfo;
6025 case QueryNode::QN_SCAN_FRAG:
6026 return &Dbspj::g_ScanFragOpInfo;
6027 case QueryNode::QN_SCAN_INDEX:
6028 return &Dbspj::g_ScanIndexOpInfo;
6029 default:
6030 return 0;
6031 }
6032 }
6033
6034 /**
6035 * MODULE COMMON PARSE/UNPACK
6036 */
6037
6038 /**
6039 * @returns dstLen + 1 on error
6040 */
6041 static
6042 Uint32
unpackList(Uint32 dstLen,Uint32 * dst,Dbspj::DABuffer & buffer)6043 unpackList(Uint32 dstLen, Uint32 * dst, Dbspj::DABuffer & buffer)
6044 {
6045 const Uint32 * ptr = buffer.ptr;
6046 if (likely(ptr != buffer.end))
6047 {
6048 Uint32 tmp = * ptr++;
6049 Uint32 cnt = tmp & 0xFFFF;
6050
6051 * dst ++ = (tmp >> 16); // Store first
6052 DEBUG("cnt: " << cnt << " first: " << (tmp >> 16));
6053
6054 if (cnt > 1)
6055 {
6056 Uint32 len = cnt / 2;
6057 if (unlikely(cnt >= dstLen || (ptr + len > buffer.end)))
6058 goto error;
6059
6060 cnt --; // subtract item stored in header
6061
6062 for (Uint32 i = 0; i < cnt/2; i++)
6063 {
6064 * dst++ = (* ptr) & 0xFFFF;
6065 * dst++ = (* ptr) >> 16;
6066 ptr++;
6067 }
6068
6069 if (cnt & 1)
6070 {
6071 * dst ++ = * ptr & 0xFFFF;
6072 ptr++;
6073 }
6074
6075 cnt ++; // readd item stored in header
6076 }
6077 buffer.ptr = ptr;
6078 return cnt;
6079 }
6080 return 0;
6081
6082 error:
6083 return dstLen + 1;
6084 }
6085
6086 /**
6087 * This fuctions takes an array of attrinfo, and builds "header"
6088 * which can be used to do random access inside the row
6089 */
6090 Uint32
buildRowHeader(RowPtr::Header * header,SegmentedSectionPtr ptr)6091 Dbspj::buildRowHeader(RowPtr::Header * header, SegmentedSectionPtr ptr)
6092 {
6093 Uint32 tmp, len;
6094 Uint32 * dst = header->m_offset;
6095 const Uint32 * const save = dst;
6096 SectionReader r0(ptr, getSectionSegmentPool());
6097 Uint32 offset = 0;
6098 do
6099 {
6100 * dst++ = offset;
6101 r0.getWord(&tmp);
6102 len = AttributeHeader::getDataSize(tmp);
6103 offset += 1 + len;
6104 } while (r0.step(len));
6105
6106 return header->m_len = static_cast<Uint32>(dst - save);
6107 }
6108
6109 /**
6110 * This fuctions takes an array of attrinfo, and builds "header"
6111 * which can be used to do random access inside the row
6112 */
6113 Uint32
buildRowHeader(RowPtr::Header * header,const Uint32 * & src,Uint32 len)6114 Dbspj::buildRowHeader(RowPtr::Header * header, const Uint32 *& src, Uint32 len)
6115 {
6116 Uint32 * dst = header->m_offset;
6117 const Uint32 * save = dst;
6118 Uint32 offset = 0;
6119 for (Uint32 i = 0; i<len; i++)
6120 {
6121 * dst ++ = offset;
6122 Uint32 tmp = * src++;
6123 Uint32 tmp_len = AttributeHeader::getDataSize(tmp);
6124 offset += 1 + tmp_len;
6125 src += tmp_len;
6126 }
6127
6128 return header->m_len = static_cast<Uint32>(dst - save);
6129 }
6130
6131 Uint32
appendToPattern(Local_pattern_store & pattern,DABuffer & tree,Uint32 len)6132 Dbspj::appendToPattern(Local_pattern_store & pattern,
6133 DABuffer & tree, Uint32 len)
6134 {
6135 if (unlikely(tree.ptr + len > tree.end))
6136 return DbspjErr::InvalidTreeNodeSpecification;
6137
6138 if (unlikely(pattern.append(tree.ptr, len)==0))
6139 return DbspjErr::OutOfQueryMemory;
6140
6141 tree.ptr += len;
6142 return 0;
6143 }
6144
6145 Uint32
appendParamToPattern(Local_pattern_store & dst,const RowPtr::Linear & row,Uint32 col)6146 Dbspj::appendParamToPattern(Local_pattern_store& dst,
6147 const RowPtr::Linear & row, Uint32 col)
6148 {
6149 /**
6150 * TODO handle errors
6151 */
6152 Uint32 offset = row.m_header->m_offset[col];
6153 const Uint32 * ptr = row.m_data + offset;
6154 Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6155 /* Param COL's converted to DATA when appended to pattern */
6156 Uint32 info = QueryPattern::data(len);
6157 return dst.append(&info,1) && dst.append(ptr,len) ? 0 : DbspjErr::OutOfQueryMemory;
6158 }
6159
6160 Uint32
appendParamHeadToPattern(Local_pattern_store & dst,const RowPtr::Linear & row,Uint32 col)6161 Dbspj::appendParamHeadToPattern(Local_pattern_store& dst,
6162 const RowPtr::Linear & row, Uint32 col)
6163 {
6164 /**
6165 * TODO handle errors
6166 */
6167 Uint32 offset = row.m_header->m_offset[col];
6168 const Uint32 * ptr = row.m_data + offset;
6169 Uint32 len = AttributeHeader::getDataSize(*ptr);
6170 /* Param COL's converted to DATA when appended to pattern */
6171 Uint32 info = QueryPattern::data(len+1);
6172 return dst.append(&info,1) && dst.append(ptr,len+1) ? 0 : DbspjErr::OutOfQueryMemory;
6173 }
6174
6175 Uint32
appendTreeToSection(Uint32 & ptrI,SectionReader & tree,Uint32 len)6176 Dbspj::appendTreeToSection(Uint32 & ptrI, SectionReader & tree, Uint32 len)
6177 {
6178 /**
6179 * TODO handle errors
6180 */
6181 Uint32 SZ = 16;
6182 Uint32 tmp[16];
6183 while (len > SZ)
6184 {
6185 jam();
6186 tree.getWords(tmp, SZ);
6187 ndbrequire(appendToSection(ptrI, tmp, SZ));
6188 len -= SZ;
6189 }
6190
6191 tree.getWords(tmp, len);
6192 return appendToSection(ptrI, tmp, len) ? 0 : /** todo error code */ 1;
6193 #if TODO
6194 err:
6195 return 1;
6196 #endif
6197 }
6198
6199 void
getCorrelationData(const RowPtr::Section & row,Uint32 col,Uint32 & correlationNumber)6200 Dbspj::getCorrelationData(const RowPtr::Section & row,
6201 Uint32 col,
6202 Uint32& correlationNumber)
6203 {
6204 /**
6205 * TODO handle errors
6206 */
6207 SegmentedSectionPtr ptr(row.m_dataPtr);
6208 SectionReader reader(ptr, getSectionSegmentPool());
6209 Uint32 offset = row.m_header->m_offset[col];
6210 ndbrequire(reader.step(offset));
6211 Uint32 tmp;
6212 ndbrequire(reader.getWord(&tmp));
6213 Uint32 len = AttributeHeader::getDataSize(tmp);
6214 ndbrequire(len == 1);
6215 ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32);
6216 ndbrequire(reader.getWord(&correlationNumber));
6217 }
6218
6219 void
getCorrelationData(const RowPtr::Linear & row,Uint32 col,Uint32 & correlationNumber)6220 Dbspj::getCorrelationData(const RowPtr::Linear & row,
6221 Uint32 col,
6222 Uint32& correlationNumber)
6223 {
6224 /**
6225 * TODO handle errors
6226 */
6227 Uint32 offset = row.m_header->m_offset[col];
6228 Uint32 tmp = row.m_data[offset];
6229 Uint32 len = AttributeHeader::getDataSize(tmp);
6230 ndbrequire(len == 1);
6231 ndbrequire(AttributeHeader::getAttributeId(tmp) == AttributeHeader::CORR_FACTOR32);
6232 correlationNumber = row.m_data[offset+1];
6233 }
6234
6235 Uint32
appendColToSection(Uint32 & dst,const RowPtr::Section & row,Uint32 col,bool & hasNull)6236 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Section & row,
6237 Uint32 col, bool& hasNull)
6238 {
6239 /**
6240 * TODO handle errors
6241 */
6242 SegmentedSectionPtr ptr(row.m_dataPtr);
6243 SectionReader reader(ptr, getSectionSegmentPool());
6244 Uint32 offset = row.m_header->m_offset[col];
6245 ndbrequire(reader.step(offset));
6246 Uint32 tmp;
6247 ndbrequire(reader.getWord(&tmp));
6248 Uint32 len = AttributeHeader::getDataSize(tmp);
6249 if (unlikely(len==0))
6250 {
6251 jam();
6252 hasNull = true; // NULL-value in key
6253 return 0;
6254 }
6255 return appendTreeToSection(dst, reader, len);
6256 }
6257
6258 Uint32
appendColToSection(Uint32 & dst,const RowPtr::Linear & row,Uint32 col,bool & hasNull)6259 Dbspj::appendColToSection(Uint32 & dst, const RowPtr::Linear & row,
6260 Uint32 col, bool& hasNull)
6261 {
6262 /**
6263 * TODO handle errors
6264 */
6265 Uint32 offset = row.m_header->m_offset[col];
6266 const Uint32 * ptr = row.m_data + offset;
6267 Uint32 len = AttributeHeader::getDataSize(* ptr ++);
6268 if (unlikely(len==0))
6269 {
6270 jam();
6271 hasNull = true; // NULL-value in key
6272 return 0;
6273 }
6274 return appendToSection(dst, ptr, len) ? 0 : DbspjErr::InvalidPattern;
6275 }
6276
6277 Uint32
appendAttrinfoToSection(Uint32 & dst,const RowPtr::Linear & row,Uint32 col,bool & hasNull)6278 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Linear & row,
6279 Uint32 col, bool& hasNull)
6280 {
6281 /**
6282 * TODO handle errors
6283 */
6284 Uint32 offset = row.m_header->m_offset[col];
6285 const Uint32 * ptr = row.m_data + offset;
6286 Uint32 len = AttributeHeader::getDataSize(* ptr);
6287 if (unlikely(len==0))
6288 {
6289 jam();
6290 hasNull = true; // NULL-value in key
6291 }
6292 return appendToSection(dst, ptr, 1 + len) ? 0 : DbspjErr::InvalidPattern;
6293 }
6294
6295 Uint32
appendAttrinfoToSection(Uint32 & dst,const RowPtr::Section & row,Uint32 col,bool & hasNull)6296 Dbspj::appendAttrinfoToSection(Uint32 & dst, const RowPtr::Section & row,
6297 Uint32 col, bool& hasNull)
6298 {
6299 /**
6300 * TODO handle errors
6301 */
6302 SegmentedSectionPtr ptr(row.m_dataPtr);
6303 SectionReader reader(ptr, getSectionSegmentPool());
6304 Uint32 offset = row.m_header->m_offset[col];
6305 ndbrequire(reader.step(offset));
6306 Uint32 tmp;
6307 ndbrequire(reader.peekWord(&tmp));
6308 Uint32 len = AttributeHeader::getDataSize(tmp);
6309 if (unlikely(len==0))
6310 {
6311 jam();
6312 hasNull = true; // NULL-value in key
6313 }
6314 return appendTreeToSection(dst, reader, 1 + len);
6315 }
6316
6317 /**
6318 * 'PkCol' is the composite NDB$PK column in an unique index consisting of
6319 * a fragment id and the composite PK value (all PK columns concatenated)
6320 */
6321 Uint32
appendPkColToSection(Uint32 & dst,const RowPtr::Section & row,Uint32 col)6322 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Section & row, Uint32 col)
6323 {
6324 /**
6325 * TODO handle errors
6326 */
6327 SegmentedSectionPtr ptr(row.m_dataPtr);
6328 SectionReader reader(ptr, getSectionSegmentPool());
6329 Uint32 offset = row.m_header->m_offset[col];
6330 ndbrequire(reader.step(offset));
6331 Uint32 tmp;
6332 ndbrequire(reader.getWord(&tmp));
6333 Uint32 len = AttributeHeader::getDataSize(tmp);
6334 ndbrequire(len>1); // NULL-value in PkKey is an error
6335 ndbrequire(reader.step(1)); // Skip fragid
6336 return appendTreeToSection(dst, reader, len-1);
6337 }
6338
6339 /**
6340 * 'PkCol' is the composite NDB$PK column in an unique index consisting of
6341 * a fragment id and the composite PK value (all PK columns concatenated)
6342 */
6343 Uint32
appendPkColToSection(Uint32 & dst,const RowPtr::Linear & row,Uint32 col)6344 Dbspj::appendPkColToSection(Uint32 & dst, const RowPtr::Linear & row, Uint32 col)
6345 {
6346 Uint32 offset = row.m_header->m_offset[col];
6347 Uint32 tmp = row.m_data[offset];
6348 Uint32 len = AttributeHeader::getDataSize(tmp);
6349 ndbrequire(len>1); // NULL-value in PkKey is an error
6350 return appendToSection(dst, row.m_data+offset+2, len - 1) ? 0 : /** todo error code */ 1;
6351 }
6352
6353 Uint32
appendFromParent(Uint32 & dst,Local_pattern_store & pattern,Local_pattern_store::ConstDataBufferIterator & it,Uint32 levels,const RowPtr & rowptr,bool & hasNull)6354 Dbspj::appendFromParent(Uint32 & dst, Local_pattern_store& pattern,
6355 Local_pattern_store::ConstDataBufferIterator& it,
6356 Uint32 levels, const RowPtr & rowptr,
6357 bool& hasNull)
6358 {
6359 Ptr<TreeNode> treeNodePtr;
6360 m_treenode_pool.getPtr(treeNodePtr, rowptr.m_src_node_ptrI);
6361 Uint32 corrVal = rowptr.m_src_correlation;
6362 RowPtr targetRow;
6363 while (levels--)
6364 {
6365 jam();
6366 if (unlikely(treeNodePtr.p->m_parentPtrI == RNIL))
6367 {
6368 DEBUG_CRASH();
6369 return DbspjErr::InvalidPattern;
6370 }
6371 m_treenode_pool.getPtr(treeNodePtr, treeNodePtr.p->m_parentPtrI);
6372 if (unlikely((treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER_MAP) == 0))
6373 {
6374 DEBUG_CRASH();
6375 return DbspjErr::InvalidPattern;
6376 }
6377
6378 RowRef ref;
6379 treeNodePtr.p->m_row_map.copyto(ref);
6380 Uint32 allocator = ref.m_allocator;
6381 const Uint32 * mapptr;
6382 if (allocator == 0)
6383 {
6384 jam();
6385 mapptr = get_row_ptr_stack(ref);
6386 }
6387 else
6388 {
6389 jam();
6390 mapptr = get_row_ptr_var(ref);
6391 }
6392
6393 Uint32 pos = corrVal >> 16; // parent corr-val
6394 if (unlikely(! (pos < treeNodePtr.p->m_row_map.m_size)))
6395 {
6396 DEBUG_CRASH();
6397 return DbspjErr::InvalidPattern;
6398 }
6399
6400 // load ref to parent row
6401 treeNodePtr.p->m_row_map.load(mapptr, pos, ref);
6402
6403 const Uint32 * rowptr;
6404 if (allocator == 0)
6405 {
6406 jam();
6407 rowptr = get_row_ptr_stack(ref);
6408 }
6409 else
6410 {
6411 jam();
6412 rowptr = get_row_ptr_var(ref);
6413 }
6414 setupRowPtr(treeNodePtr, targetRow, ref, rowptr);
6415
6416 if (levels)
6417 {
6418 jam();
6419 getCorrelationData(targetRow.m_row_data.m_linear,
6420 targetRow.m_row_data.m_linear.m_header->m_len - 1,
6421 corrVal);
6422 }
6423 }
6424
6425 if (unlikely(it.isNull()))
6426 {
6427 DEBUG_CRASH();
6428 return DbspjErr::InvalidPattern;
6429 }
6430
6431 Uint32 info = *it.data;
6432 Uint32 type = QueryPattern::getType(info);
6433 Uint32 val = QueryPattern::getLength(info);
6434 pattern.next(it);
6435 switch(type){
6436 case QueryPattern::P_COL:
6437 jam();
6438 return appendColToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6439 break;
6440 case QueryPattern::P_UNQ_PK:
6441 jam();
6442 return appendPkColToSection(dst, targetRow.m_row_data.m_linear, val);
6443 break;
6444 case QueryPattern::P_ATTRINFO:
6445 jam();
6446 return appendAttrinfoToSection(dst, targetRow.m_row_data.m_linear, val, hasNull);
6447 break;
6448 case QueryPattern::P_DATA:
6449 jam();
6450 // retreiving DATA from parent...is...an error
6451 break;
6452 case QueryPattern::P_PARENT:
6453 jam();
6454 // no point in nesting P_PARENT...an error
6455 break;
6456 case QueryPattern::P_PARAM:
6457 case QueryPattern::P_PARAM_HEADER:
6458 jam();
6459 // should have been expanded during build
6460 break;
6461 }
6462
6463 DEBUG_CRASH();
6464 return DbspjErr::InvalidPattern;
6465 }
6466
6467 Uint32
appendDataToSection(Uint32 & ptrI,Local_pattern_store & pattern,Local_pattern_store::ConstDataBufferIterator & it,Uint32 len,bool & hasNull)6468 Dbspj::appendDataToSection(Uint32 & ptrI,
6469 Local_pattern_store& pattern,
6470 Local_pattern_store::ConstDataBufferIterator& it,
6471 Uint32 len, bool& hasNull)
6472 {
6473 if (unlikely(len==0))
6474 {
6475 jam();
6476 hasNull = true;
6477 return 0;
6478 }
6479
6480 #if 0
6481 /**
6482 * TODO handle errors
6483 */
6484 Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6485 while (len > NDB_SECTION_SEGMENT_SZ)
6486 {
6487 pattern.copyout(tmp, NDB_SECTION_SEGMENT_SZ, it);
6488 appendToSection(ptrI, tmp, NDB_SECTION_SEGMENT_SZ);
6489 len -= NDB_SECTION_SEGMENT_SZ;
6490 }
6491
6492 pattern.copyout(tmp, len, it);
6493 appendToSection(ptrI, tmp, len);
6494 return 0;
6495 #else
6496 Uint32 remaining = len;
6497 Uint32 dstIdx = 0;
6498 Uint32 tmp[NDB_SECTION_SEGMENT_SZ];
6499
6500 while (remaining > 0 && !it.isNull())
6501 {
6502 tmp[dstIdx] = *it.data;
6503 remaining--;
6504 dstIdx++;
6505 pattern.next(it);
6506 if (dstIdx == NDB_SECTION_SEGMENT_SZ || remaining == 0)
6507 {
6508 if (!appendToSection(ptrI, tmp, dstIdx))
6509 {
6510 DEBUG_CRASH();
6511 return DbspjErr::InvalidPattern;
6512 }
6513 dstIdx = 0;
6514 }
6515 }
6516 if (remaining > 0)
6517 {
6518 DEBUG_CRASH();
6519 return DbspjErr::InvalidPattern;
6520 }
6521 else
6522 {
6523 return 0;
6524 }
6525 #endif
6526 }
6527
6528 Uint32
createEmptySection(Uint32 & dst)6529 Dbspj::createEmptySection(Uint32 & dst)
6530 {
6531 Uint32 tmp;
6532 SegmentedSectionPtr ptr;
6533 if (likely(import(ptr, &tmp, 0)))
6534 {
6535 jam();
6536 dst = ptr.i;
6537 return 0;
6538 }
6539
6540 jam();
6541 return DbspjErr::OutOfSectionMemory;
6542 }
6543
6544 /**
6545 * This function takes a pattern and a row and expands it into a section
6546 */
6547 Uint32
expandS(Uint32 & _dst,Local_pattern_store & pattern,const RowPtr & row,bool & hasNull)6548 Dbspj::expandS(Uint32 & _dst, Local_pattern_store& pattern,
6549 const RowPtr & row, bool& hasNull)
6550 {
6551 Uint32 err;
6552 Uint32 dst = _dst;
6553 hasNull = false;
6554 Local_pattern_store::ConstDataBufferIterator it;
6555 pattern.first(it);
6556 while (!it.isNull())
6557 {
6558 Uint32 info = *it.data;
6559 Uint32 type = QueryPattern::getType(info);
6560 Uint32 val = QueryPattern::getLength(info);
6561 pattern.next(it);
6562 switch(type){
6563 case QueryPattern::P_COL:
6564 jam();
6565 err = appendColToSection(dst, row.m_row_data.m_section, val, hasNull);
6566 break;
6567 case QueryPattern::P_UNQ_PK:
6568 jam();
6569 err = appendPkColToSection(dst, row.m_row_data.m_section, val);
6570 break;
6571 case QueryPattern::P_ATTRINFO:
6572 jam();
6573 err = appendAttrinfoToSection(dst, row.m_row_data.m_section, val, hasNull);
6574 break;
6575 case QueryPattern::P_DATA:
6576 jam();
6577 err = appendDataToSection(dst, pattern, it, val, hasNull);
6578 break;
6579 case QueryPattern::P_PARENT:
6580 jam();
6581 // P_PARENT is a prefix to another pattern token
6582 // that permits code to access rows from earlier than immediate parent.
6583 // val is no of levels to move up the tree
6584 err = appendFromParent(dst, pattern, it, val, row, hasNull);
6585 break;
6586 // PARAM's was converted to DATA by ::expand(pattern...)
6587 case QueryPattern::P_PARAM:
6588 case QueryPattern::P_PARAM_HEADER:
6589 default:
6590 jam();
6591 err = DbspjErr::InvalidPattern;
6592 DEBUG_CRASH();
6593 }
6594 if (unlikely(err != 0))
6595 {
6596 jam();
6597 DEBUG_CRASH();
6598 goto error;
6599 }
6600 }
6601
6602 _dst = dst;
6603 return 0;
6604 error:
6605 jam();
6606 return err;
6607 }
6608
6609 /**
6610 * This function takes a pattern and a row and expands it into a section
6611 */
6612 Uint32
expandL(Uint32 & _dst,Local_pattern_store & pattern,const RowPtr & row,bool & hasNull)6613 Dbspj::expandL(Uint32 & _dst, Local_pattern_store& pattern,
6614 const RowPtr & row, bool& hasNull)
6615 {
6616 Uint32 err;
6617 Uint32 dst = _dst;
6618 hasNull = false;
6619 Local_pattern_store::ConstDataBufferIterator it;
6620 pattern.first(it);
6621 while (!it.isNull())
6622 {
6623 Uint32 info = *it.data;
6624 Uint32 type = QueryPattern::getType(info);
6625 Uint32 val = QueryPattern::getLength(info);
6626 pattern.next(it);
6627 switch(type){
6628 case QueryPattern::P_COL:
6629 jam();
6630 err = appendColToSection(dst, row.m_row_data.m_linear, val, hasNull);
6631 break;
6632 case QueryPattern::P_UNQ_PK:
6633 jam();
6634 err = appendPkColToSection(dst, row.m_row_data.m_linear, val);
6635 break;
6636 case QueryPattern::P_ATTRINFO:
6637 jam();
6638 err = appendAttrinfoToSection(dst, row.m_row_data.m_linear, val, hasNull);
6639 break;
6640 case QueryPattern::P_DATA:
6641 jam();
6642 err = appendDataToSection(dst, pattern, it, val, hasNull);
6643 break;
6644 case QueryPattern::P_PARENT:
6645 jam();
6646 // P_PARENT is a prefix to another pattern token
6647 // that permits code to access rows from earlier than immediate parent
6648 // val is no of levels to move up the tree
6649 err = appendFromParent(dst, pattern, it, val, row, hasNull);
6650 break;
6651 // PARAM's was converted to DATA by ::expand(pattern...)
6652 case QueryPattern::P_PARAM:
6653 case QueryPattern::P_PARAM_HEADER:
6654 default:
6655 jam();
6656 err = DbspjErr::InvalidPattern;
6657 DEBUG_CRASH();
6658 }
6659 if (unlikely(err != 0))
6660 {
6661 jam();
6662 DEBUG_CRASH();
6663 goto error;
6664 }
6665 }
6666
6667 _dst = dst;
6668 return 0;
6669 error:
6670 jam();
6671 return err;
6672 }
6673
6674 Uint32
expand(Uint32 & ptrI,DABuffer & pattern,Uint32 len,DABuffer & param,Uint32 paramCnt,bool & hasNull)6675 Dbspj::expand(Uint32 & ptrI, DABuffer& pattern, Uint32 len,
6676 DABuffer& param, Uint32 paramCnt, bool& hasNull)
6677 {
6678 /**
6679 * TODO handle error
6680 */
6681 Uint32 err;
6682 Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6683 struct RowPtr::Linear row;
6684 row.m_data = param.ptr;
6685 row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6686 buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6687
6688 Uint32 dst = ptrI;
6689 const Uint32 * ptr = pattern.ptr;
6690 const Uint32 * end = ptr + len;
6691 hasNull = false;
6692
6693 for (; ptr < end; )
6694 {
6695 Uint32 info = * ptr++;
6696 Uint32 type = QueryPattern::getType(info);
6697 Uint32 val = QueryPattern::getLength(info);
6698 switch(type){
6699 case QueryPattern::P_PARAM:
6700 jam();
6701 ndbassert(val < paramCnt);
6702 err = appendColToSection(dst, row, val, hasNull);
6703 break;
6704 case QueryPattern::P_PARAM_HEADER:
6705 jam();
6706 ndbassert(val < paramCnt);
6707 err = appendAttrinfoToSection(dst, row, val, hasNull);
6708 break;
6709 case QueryPattern::P_DATA:
6710 if (unlikely(val==0))
6711 {
6712 jam();
6713 hasNull = true;
6714 err = 0;
6715 }
6716 else if (likely(appendToSection(dst, ptr, val)))
6717 {
6718 jam();
6719 err = 0;
6720 }
6721 else
6722 {
6723 jam();
6724 err = DbspjErr::InvalidPattern;
6725 }
6726 ptr += val;
6727 break;
6728 case QueryPattern::P_COL: // (linked) COL's not expected here
6729 case QueryPattern::P_PARENT: // Prefix to P_COL
6730 case QueryPattern::P_ATTRINFO:
6731 case QueryPattern::P_UNQ_PK:
6732 default:
6733 jam();
6734 jamLine(type);
6735 err = DbspjErr::InvalidPattern;
6736 DEBUG_CRASH();
6737 }
6738 if (unlikely(err != 0))
6739 {
6740 jam();
6741 DEBUG_CRASH();
6742 goto error;
6743 }
6744 }
6745
6746 /**
6747 * Iterate forward
6748 */
6749 pattern.ptr = end;
6750
6751 error:
6752 jam();
6753 ptrI = dst;
6754 return err;
6755 }
6756
6757 Uint32
expand(Local_pattern_store & dst,Ptr<TreeNode> treeNodePtr,DABuffer & pattern,Uint32 len,DABuffer & param,Uint32 paramCnt)6758 Dbspj::expand(Local_pattern_store& dst, Ptr<TreeNode> treeNodePtr,
6759 DABuffer& pattern, Uint32 len,
6760 DABuffer& param, Uint32 paramCnt)
6761 {
6762 /**
6763 * TODO handle error
6764 */
6765 Uint32 err;
6766 Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
6767 struct RowPtr::Linear row;
6768 row.m_header = CAST_PTR(RowPtr::Header, &tmp[0]);
6769 row.m_data = param.ptr;
6770 buildRowHeader(CAST_PTR(RowPtr::Header, &tmp[0]), param.ptr, paramCnt);
6771
6772 const Uint32 * end = pattern.ptr + len;
6773 for (; pattern.ptr < end; )
6774 {
6775 Uint32 info = *pattern.ptr;
6776 Uint32 type = QueryPattern::getType(info);
6777 Uint32 val = QueryPattern::getLength(info);
6778 switch(type){
6779 case QueryPattern::P_COL:
6780 case QueryPattern::P_UNQ_PK:
6781 case QueryPattern::P_ATTRINFO:
6782 jam();
6783 err = appendToPattern(dst, pattern, 1);
6784 break;
6785 case QueryPattern::P_DATA:
6786 jam();
6787 err = appendToPattern(dst, pattern, val+1);
6788 break;
6789 case QueryPattern::P_PARAM:
6790 jam();
6791 // NOTE: Converted to P_DATA by appendParamToPattern
6792 ndbassert(val < paramCnt);
6793 err = appendParamToPattern(dst, row, val);
6794 pattern.ptr++;
6795 break;
6796 case QueryPattern::P_PARAM_HEADER:
6797 jam();
6798 // NOTE: Converted to P_DATA by appendParamHeadToPattern
6799 ndbassert(val < paramCnt);
6800 err = appendParamHeadToPattern(dst, row, val);
6801 pattern.ptr++;
6802 break;
6803 case QueryPattern::P_PARENT: // Prefix to P_COL
6804 {
6805 jam();
6806 err = appendToPattern(dst, pattern, 1);
6807
6808 // Locate requested grandparent and request it to
6809 // T_ROW_BUFFER its result rows
6810 Ptr<TreeNode> parentPtr;
6811 m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
6812 while (val--)
6813 {
6814 jam();
6815 ndbassert(parentPtr.p->m_parentPtrI != RNIL);
6816 m_treenode_pool.getPtr(parentPtr, parentPtr.p->m_parentPtrI);
6817 parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER;
6818 parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER_MAP;
6819 }
6820 Ptr<Request> requestPtr;
6821 m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
6822 requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS;
6823 break;
6824 }
6825 default:
6826 jam();
6827 err = DbspjErr::InvalidPattern;
6828 DEBUG_CRASH();
6829 }
6830
6831 if (unlikely(err != 0))
6832 {
6833 DEBUG_CRASH();
6834 goto error;
6835 }
6836 }
6837 return 0;
6838
6839 error:
6840 jam();
6841 return err;
6842 }
6843
6844 Uint32
parseDA(Build_context & ctx,Ptr<Request> requestPtr,Ptr<TreeNode> treeNodePtr,DABuffer & tree,Uint32 treeBits,DABuffer & param,Uint32 paramBits)6845 Dbspj::parseDA(Build_context& ctx,
6846 Ptr<Request> requestPtr,
6847 Ptr<TreeNode> treeNodePtr,
6848 DABuffer& tree, Uint32 treeBits,
6849 DABuffer& param, Uint32 paramBits)
6850 {
6851 Uint32 err;
6852 Uint32 attrInfoPtrI = RNIL;
6853 Uint32 attrParamPtrI = RNIL;
6854
6855 do
6856 {
6857 if (treeBits & DABits::NI_REPEAT_SCAN_RESULT)
6858 {
6859 jam();
6860 DEBUG("use REPEAT_SCAN_RESULT when returning results");
6861 requestPtr.p->m_bits |= Request::RT_REPEAT_SCAN_RESULT;
6862 } // DABits::NI_HAS_PARENT
6863
6864 if (treeBits & DABits::NI_HAS_PARENT)
6865 {
6866 jam();
6867 DEBUG("NI_HAS_PARENT");
6868 /**
6869 * OPTIONAL PART 1:
6870 *
6871 * Parent nodes are stored first in optional part
6872 * this is a list of 16-bit numbers refering to
6873 * *earlier* nodes in tree
6874 * the list stores length of list as first 16-bit
6875 */
6876 err = DbspjErr::InvalidTreeNodeSpecification;
6877 Uint32 dst[63];
6878 Uint32 cnt = unpackList(NDB_ARRAY_SIZE(dst), dst, tree);
6879 if (unlikely(cnt > NDB_ARRAY_SIZE(dst)))
6880 {
6881 DEBUG_CRASH();
6882 break;
6883 }
6884
6885 err = 0;
6886
6887 if (unlikely(cnt!=1))
6888 {
6889 /**
6890 * Only a single parent supported for now, i.e only trees
6891 */
6892 DEBUG_CRASH();
6893 }
6894
6895 for (Uint32 i = 0; i<cnt; i++)
6896 {
6897 DEBUG("adding " << dst[i] << " as parent");
6898 Ptr<TreeNode> parentPtr = ctx.m_node_list[dst[i]];
6899 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
6900 Local_dependency_map map(pool, parentPtr.p->m_dependent_nodes);
6901 if (unlikely(!map.append(&treeNodePtr.i, 1)))
6902 {
6903 err = DbspjErr::OutOfQueryMemory;
6904 DEBUG_CRASH();
6905 break;
6906 }
6907 parentPtr.p->m_bits &= ~(Uint32)TreeNode::T_LEAF;
6908 treeNodePtr.p->m_parentPtrI = parentPtr.i;
6909
6910 // Build Bitmask of all ancestors to treeNode
6911 treeNodePtr.p->m_ancestors = parentPtr.p->m_ancestors;
6912 treeNodePtr.p->m_ancestors.set(parentPtr.p->m_node_no);
6913 }
6914
6915 if (unlikely(err != 0))
6916 break;
6917 } // DABits::NI_HAS_PARENT
6918
6919 err = DbspjErr::InvalidTreeParametersSpecificationKeyParamBitsMissmatch;
6920 if (unlikely( ((treeBits & DABits::NI_KEY_PARAMS)==0) !=
6921 ((paramBits & DABits::PI_KEY_PARAMS)==0)))
6922 {
6923 DEBUG_CRASH();
6924 break;
6925 }
6926
6927 if (treeBits & (DABits::NI_KEY_PARAMS
6928 | DABits::NI_KEY_LINKED
6929 | DABits::NI_KEY_CONSTS))
6930 {
6931 jam();
6932 DEBUG("NI_KEY_PARAMS | NI_KEY_LINKED | NI_KEY_CONSTS");
6933
6934 /**
6935 * OPTIONAL PART 2:
6936 *
6937 * If keys are parametrized or linked
6938 * DATA0[LO/HI] - Length of key pattern/#parameters to key
6939 */
6940 Uint32 len_cnt = * tree.ptr ++;
6941 Uint32 len = len_cnt & 0xFFFF; // length of pattern in words
6942 Uint32 cnt = len_cnt >> 16; // no of parameters
6943
6944 LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
6945 Local_pattern_store pattern(pool, treeNodePtr.p->m_keyPattern);
6946
6947 err = DbspjErr::InvalidTreeParametersSpecificationIncorrectKeyParamCount;
6948 if (unlikely( ((cnt==0) != ((treeBits & DABits::NI_KEY_PARAMS) == 0)) ||
6949 ((cnt==0) != ((paramBits & DABits::PI_KEY_PARAMS) == 0))))
6950 {
6951 DEBUG_CRASH();
6952 break;
6953 }
6954
6955 if (treeBits & DABits::NI_KEY_LINKED)
6956 {
6957 jam();
6958 DEBUG("LINKED-KEY PATTERN w/ " << cnt << " PARAM values");
6959 /**
6960 * Expand pattern into a new pattern (with linked values)
6961 */
6962 err = expand(pattern, treeNodePtr, tree, len, param, cnt);
6963
6964 /**
6965 * This node constructs a new key for each send
6966 */
6967 treeNodePtr.p->m_bits |= TreeNode::T_KEYINFO_CONSTRUCTED;
6968 }
6969 else
6970 {
6971 jam();
6972 DEBUG("FIXED-KEY w/ " << cnt << " PARAM values");
6973 /**
6974 * Expand pattern directly into keyinfo
6975 * This means a "fixed" key from here on
6976 */
6977 bool hasNull;
6978 Uint32 keyInfoPtrI = RNIL;
6979 err = expand(keyInfoPtrI, tree, len, param, cnt, hasNull);
6980 if (unlikely(hasNull))
6981 {
6982 /* API should have elliminated requests w/ const-NULL keys */
6983 jam();
6984 DEBUG("BEWARE: FIXED-key contain NULL values");
6985 // treeNodePtr.p->m_bits |= TreeNode::T_NULL_PRUNE;
6986 // break;
6987 ndbrequire(false);
6988 }
6989 treeNodePtr.p->m_send.m_keyInfoPtrI = keyInfoPtrI;
6990 }
6991
6992 if (unlikely(err != 0))
6993 {
6994 DEBUG_CRASH();
6995 break;
6996 }
6997 } // DABits::NI_KEY_...
6998
6999 const Uint32 mask =
7000 DABits::NI_LINKED_ATTR | DABits::NI_ATTR_INTERPRET |
7001 DABits::NI_ATTR_LINKED | DABits::NI_ATTR_PARAMS;
7002
7003 if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0)
7004 {
7005 jam();
7006 /**
7007 * OPTIONAL PART 3: attrinfo handling
7008 * - NI_LINKED_ATTR - these are attributes to be passed to children
7009 * - PI_ATTR_LIST - this is "user-columns" (passed as parameters)
7010
7011 * - NI_ATTR_INTERPRET - tree contains interpreted program
7012 * - NI_ATTR_LINKED - means that the attr-info contains linked-values
7013 * - NI_ATTR_PARAMS - means that the attr-info is parameterized
7014 * PI_ATTR_PARAMS - means that the parameters contains attr parameters
7015 *
7016 * IF NI_ATTR_INTERPRET
7017 * DATA0[LO/HI] = Length of program / total #arguments to program
7018 * DATA1..N = Program
7019 *
7020 * IF NI_ATTR_PARAMS
7021 * DATA0[LO/HI] = Length / #param
7022 * DATA1..N = PARAM-0...PARAM-M
7023 *
7024 * IF PI_ATTR_INTERPRET
7025 * DATA0[LO/HI] = Length of program / Length of subroutine-part
7026 * DATA1..N = Program (scan filter)
7027 *
7028 * IF NI_ATTR_LINKED
7029 * DATA0[LO/HI] = Length / #
7030 *
7031 *
7032 */
7033 Uint32 sections[5] = { 0, 0, 0, 0, 0 };
7034 Uint32 * sectionptrs = 0;
7035
7036 bool interpreted =
7037 (treeBits & DABits::NI_ATTR_INTERPRET) ||
7038 (paramBits & DABits::PI_ATTR_INTERPRET) ||
7039 (treeNodePtr.p->m_bits & TreeNode::T_ATTR_INTERPRETED);
7040
7041 if (interpreted)
7042 {
7043 /**
7044 * Add section headers for interpreted execution
7045 * and create pointer so that they can be updated later
7046 */
7047 jam();
7048 err = DbspjErr::OutOfSectionMemory;
7049 if (unlikely(!appendToSection(attrInfoPtrI, sections, 5)))
7050 {
7051 DEBUG_CRASH();
7052 break;
7053 }
7054
7055 SegmentedSectionPtr ptr;
7056 getSection(ptr, attrInfoPtrI);
7057 sectionptrs = ptr.p->theData;
7058
7059 if (treeBits & DABits::NI_ATTR_INTERPRET)
7060 {
7061 jam();
7062
7063 /**
7064 * Having two interpreter programs is an error.
7065 */
7066 err = DbspjErr::BothTreeAndParametersContainInterpretedProgram;
7067 if (unlikely(paramBits & DABits::PI_ATTR_INTERPRET))
7068 {
7069 DEBUG_CRASH();
7070 break;
7071 }
7072
7073 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7074 Uint32 len2 = * tree.ptr++;
7075 Uint32 len_prg = len2 & 0xFFFF; // Length of interpret program
7076 Uint32 len_pattern = len2 >> 16;// Length of attr param pattern
7077 err = DbspjErr::OutOfSectionMemory;
7078 if (unlikely(!appendToSection(attrInfoPtrI, tree.ptr, len_prg)))
7079 {
7080 DEBUG_CRASH();
7081 break;
7082 }
7083
7084 tree.ptr += len_prg;
7085 sectionptrs[1] = len_prg; // size of interpret program
7086
7087 Uint32 tmp = * tree.ptr ++; // attr-pattern header
7088 Uint32 cnt = tmp & 0xFFFF;
7089
7090 if (treeBits & DABits::NI_ATTR_LINKED)
7091 {
7092 jam();
7093 /**
7094 * Expand pattern into a new pattern (with linked values)
7095 */
7096 LocalArenaPoolImpl pool(requestPtr.p->m_arena,
7097 m_dependency_map_pool);
7098 Local_pattern_store pattern(pool,treeNodePtr.p->m_attrParamPattern);
7099 err = expand(pattern, treeNodePtr, tree, len_pattern, param, cnt);
7100 if (unlikely(err))
7101 {
7102 DEBUG_CRASH();
7103 break;
7104 }
7105 /**
7106 * This node constructs a new attr-info for each send
7107 */
7108 treeNodePtr.p->m_bits |= TreeNode::T_ATTRINFO_CONSTRUCTED;
7109 }
7110 else
7111 {
7112 jam();
7113 /**
7114 * Expand pattern directly into attr-info param
7115 * This means a "fixed" attr-info param from here on
7116 */
7117 bool hasNull;
7118 err = expand(attrParamPtrI, tree, len_pattern, param, cnt, hasNull);
7119 if (unlikely(err))
7120 {
7121 DEBUG_CRASH();
7122 break;
7123 }
7124 // ndbrequire(!hasNull);
7125 }
7126 }
7127 else // if (treeBits & DABits::NI_ATTR_INTERPRET)
7128 {
7129 jam();
7130 /**
7131 * Only relevant for interpreted stuff
7132 */
7133 ndbrequire((treeBits & DABits::NI_ATTR_PARAMS) == 0);
7134 ndbrequire((paramBits & DABits::PI_ATTR_PARAMS) == 0);
7135 ndbrequire((treeBits & DABits::NI_ATTR_LINKED) == 0);
7136
7137 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7138
7139 if (! (paramBits & DABits::PI_ATTR_INTERPRET))
7140 {
7141 jam();
7142
7143 /**
7144 * Tree node has interpreted execution,
7145 * but no interpreted program specified
7146 * auto-add Exit_ok (i.e return each row)
7147 */
7148 Uint32 tmp = Interpreter::ExitOK();
7149 err = DbspjErr::OutOfSectionMemory;
7150 if (unlikely(!appendToSection(attrInfoPtrI, &tmp, 1)))
7151 {
7152 DEBUG_CRASH();
7153 break;
7154 }
7155 sectionptrs[1] = 1;
7156 }
7157 } // if (treeBits & DABits::NI_ATTR_INTERPRET)
7158 } // if (interpreted)
7159
7160 if (paramBits & DABits::PI_ATTR_INTERPRET)
7161 {
7162 jam();
7163
7164 /**
7165 * Add the interpreted code that represents the scan filter.
7166 */
7167 const Uint32 len2 = * param.ptr++;
7168 Uint32 program_len = len2 & 0xFFFF;
7169 Uint32 subroutine_len = len2 >> 16;
7170 err = DbspjErr::OutOfSectionMemory;
7171 if (unlikely(!appendToSection(attrInfoPtrI, param.ptr, program_len)))
7172 {
7173 DEBUG_CRASH();
7174 break;
7175 }
7176 /**
7177 * The interpreted code is added is in the "Interpreted execute region"
7178 * of the attrinfo (see Dbtup::interpreterStartLab() for details).
7179 * It will thus execute before reading the attributes that constitutes
7180 * the projections.
7181 */
7182 sectionptrs[1] = program_len;
7183 param.ptr += program_len;
7184
7185 if (subroutine_len)
7186 {
7187 if (unlikely(!appendToSection(attrParamPtrI,
7188 param.ptr, subroutine_len)))
7189 {
7190 DEBUG_CRASH();
7191 break;
7192 }
7193 sectionptrs[4] = subroutine_len;
7194 param.ptr += subroutine_len;
7195 }
7196 treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
7197 }
7198
7199 Uint32 sum_read = 0;
7200 Uint32 dst[MAX_ATTRIBUTES_IN_TABLE + 2];
7201
7202 if (paramBits & DABits::PI_ATTR_LIST)
7203 {
7204 jam();
7205 Uint32 len = * param.ptr++;
7206 DEBUG("PI_ATTR_LIST");
7207
7208 treeNodePtr.p->m_bits |= TreeNode::T_USER_PROJECTION;
7209 err = DbspjErr::OutOfSectionMemory;
7210 if (!appendToSection(attrInfoPtrI, param.ptr, len))
7211 {
7212 DEBUG_CRASH();
7213 break;
7214 }
7215
7216 param.ptr += len;
7217
7218 /**
7219 * Insert a flush of this partial result set
7220 */
7221 Uint32 flush[4];
7222 flush[0] = AttributeHeader::FLUSH_AI << 16;
7223 flush[1] = ctx.m_resultRef;
7224 flush[2] = ctx.m_resultData;
7225 flush[3] = ctx.m_senderRef; // RouteRef
7226 if (!appendToSection(attrInfoPtrI, flush, 4))
7227 {
7228 DEBUG_CRASH();
7229 break;
7230 }
7231
7232 sum_read += len + 4;
7233 }
7234
7235 if (treeBits & DABits::NI_LINKED_ATTR)
7236 {
7237 jam();
7238 DEBUG("NI_LINKED_ATTR");
7239 err = DbspjErr::InvalidTreeNodeSpecification;
7240 Uint32 cnt = unpackList(MAX_ATTRIBUTES_IN_TABLE, dst, tree);
7241 if (unlikely(cnt > MAX_ATTRIBUTES_IN_TABLE))
7242 {
7243 DEBUG_CRASH();
7244 break;
7245 }
7246
7247 /**
7248 * AttributeHeader contains attrId in 16-higher bits
7249 */
7250 for (Uint32 i = 0; i<cnt; i++)
7251 dst[i] <<= 16;
7252
7253 /**
7254 * Read correlation factor
7255 */
7256 dst[cnt++] = AttributeHeader::CORR_FACTOR32 << 16;
7257
7258 err = DbspjErr::OutOfSectionMemory;
7259 if (!appendToSection(attrInfoPtrI, dst, cnt))
7260 {
7261 DEBUG_CRASH();
7262 break;
7263 }
7264
7265 sum_read += cnt;
7266 }
7267
7268 if (interpreted)
7269 {
7270 jam();
7271 /**
7272 * Let reads be performed *after* interpreted program
7273 * i.e in "final read"-section
7274 */
7275 sectionptrs[3] = sum_read;
7276
7277 if (attrParamPtrI != RNIL)
7278 {
7279 jam();
7280 ndbrequire(!(treeNodePtr.p->m_bits&TreeNode::T_ATTRINFO_CONSTRUCTED));
7281
7282 SegmentedSectionPtr ptr;
7283 getSection(ptr, attrParamPtrI);
7284 {
7285 SectionReader r0(ptr, getSectionSegmentPool());
7286 err = appendTreeToSection(attrInfoPtrI, r0, ptr.sz);
7287 sectionptrs[4] = ptr.sz;
7288 if (unlikely(err != 0))
7289 {
7290 DEBUG_CRASH();
7291 break;
7292 }
7293 }
7294 releaseSection(attrParamPtrI);
7295 }
7296 }
7297
7298 treeNodePtr.p->m_send.m_attrInfoPtrI = attrInfoPtrI;
7299 } // if (((treeBits & mask) | (paramBits & DABits::PI_ATTR_LIST)) != 0)
7300
7301 return 0;
7302 } while (0);
7303
7304 return err;
7305 }
7306
7307 /**
7308 * END - MODULE COMMON PARSE/UNPACK
7309 */
7310
7311 /**
7312 * Process a scan request for an ndb$info table. (These are used for monitoring
7313 * purposes and do not contain application data.)
7314 */
execDBINFO_SCANREQ(Signal * signal)7315 void Dbspj::execDBINFO_SCANREQ(Signal *signal)
7316 {
7317 DbinfoScanReq req= * CAST_PTR(DbinfoScanReq, &signal->theData[0]);
7318 const Ndbinfo::ScanCursor* cursor =
7319 CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
7320 Ndbinfo::Ratelimit rl;
7321
7322 jamEntry();
7323
7324 switch(req.tableId){
7325
7326 // The SPJ block only implements the ndbinfo.counters table.
7327 case Ndbinfo::COUNTERS_TABLEID:
7328 {
7329 Ndbinfo::counter_entry counters[] = {
7330 { Ndbinfo::SPJ_READS_RECEIVED_COUNTER,
7331 c_Counters.get_counter(CI_READS_RECEIVED) },
7332 { Ndbinfo::SPJ_LOCAL_READS_SENT_COUNTER,
7333 c_Counters.get_counter(CI_LOCAL_READS_SENT) },
7334 { Ndbinfo::SPJ_REMOTE_READS_SENT_COUNTER,
7335 c_Counters.get_counter(CI_REMOTE_READS_SENT) },
7336 { Ndbinfo::SPJ_READS_NOT_FOUND_COUNTER,
7337 c_Counters.get_counter(CI_READS_NOT_FOUND) },
7338 { Ndbinfo::SPJ_TABLE_SCANS_RECEIVED_COUNTER,
7339 c_Counters.get_counter(CI_TABLE_SCANS_RECEIVED) },
7340 { Ndbinfo::SPJ_LOCAL_TABLE_SCANS_SENT_COUNTER,
7341 c_Counters.get_counter(CI_LOCAL_TABLE_SCANS_SENT) },
7342 { Ndbinfo::SPJ_RANGE_SCANS_RECEIVED_COUNTER,
7343 c_Counters.get_counter(CI_RANGE_SCANS_RECEIVED) },
7344 { Ndbinfo::SPJ_LOCAL_RANGE_SCANS_SENT_COUNTER,
7345 c_Counters.get_counter(CI_LOCAL_RANGE_SCANS_SENT) },
7346 { Ndbinfo::SPJ_REMOTE_RANGE_SCANS_SENT_COUNTER,
7347 c_Counters.get_counter(CI_REMOTE_RANGE_SCANS_SENT) },
7348 { Ndbinfo::SPJ_SCAN_BATCHES_RETURNED_COUNTER,
7349 c_Counters.get_counter(CI_SCAN_BATCHES_RETURNED) },
7350 { Ndbinfo::SPJ_SCAN_ROWS_RETURNED_COUNTER,
7351 c_Counters.get_counter(CI_SCAN_ROWS_RETURNED) },
7352 { Ndbinfo::SPJ_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7353 c_Counters.get_counter(CI_PRUNED_RANGE_SCANS_RECEIVED) },
7354 { Ndbinfo::SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER,
7355 c_Counters.get_counter(CI_CONST_PRUNED_RANGE_SCANS_RECEIVED) }
7356 };
7357 const size_t num_counters = sizeof(counters) / sizeof(counters[0]);
7358
7359 Uint32 i = cursor->data[0];
7360 const BlockNumber bn = blockToMain(number());
7361 while(i < num_counters)
7362 {
7363 jam();
7364 Ndbinfo::Row row(signal, req);
7365 row.write_uint32(getOwnNodeId());
7366 row.write_uint32(bn); // block number
7367 row.write_uint32(instance()); // block instance
7368 row.write_uint32(counters[i].id);
7369
7370 row.write_uint64(counters[i].val);
7371 ndbinfo_send_row(signal, req, row, rl);
7372 i++;
7373 if (rl.need_break(req))
7374 {
7375 jam();
7376 ndbinfo_send_scan_break(signal, req, rl, i);
7377 return;
7378 }
7379 }
7380 break;
7381 }
7382
7383 default:
7384 break;
7385 }
7386
7387 ndbinfo_send_scan_conf(signal, req, rl);
7388 } // Dbspj::execDBINFO_SCANREQ(Signal *signal)
7389
update(double sample)7390 void Dbspj::IncrementalStatistics::update(double sample)
7391 {
7392 // Prevent wrap-around
7393 if(m_noOfSamples < 0xffffffff)
7394 {
7395 m_noOfSamples++;
7396 const double delta = sample - m_mean;
7397 m_mean += delta/m_noOfSamples;
7398 m_sumSquare += delta * (sample - m_mean);
7399 }
7400 }
7401