1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUX_SCAN_CPP
26 #include "Dbtux.hpp"
27 #include <my_sys.h>
28 
29 /*
30  * Error handling:  Any seized scan op is released.  ACC_SCANREF is sent
31  * to LQH.  LQH sets error code, and treats this like ZEMPTY_FRAGMENT.
32  * Therefore scan is now closed on both sides.
33  */
34 void
execACC_SCANREQ(Signal * signal)35 Dbtux::execACC_SCANREQ(Signal* signal)
36 {
37   jamEntry();
38   const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
39   const AccScanReq* const req = &reqCopy;
40   Uint32 errorCode = 0;
41   ScanOpPtr scanPtr;
42   scanPtr.i = RNIL;
43   do {
44     // get the index
45     IndexPtr indexPtr;
46     c_indexPool.getPtr(indexPtr, req->tableId);
47     // get the fragment
48     FragPtr fragPtr;
49     findFrag(*indexPtr.p, req->fragmentNo, fragPtr);
50     ndbrequire(fragPtr.i != RNIL);
51     Frag& frag = *fragPtr.p;
52     // check for index not Online (i.e. Dropping)
53     if (unlikely(indexPtr.p->m_state != Index::Online)) {
54       jam();
55 #ifdef VM_TRACE
56       if (debugFlags & (DebugMeta | DebugScan)) {
57         debugOut << "Index dropping at ACC_SCANREQ " << indexPtr.i << " " << *indexPtr.p << endl;
58       }
59 #endif
60       errorCode = AccScanRef::TuxIndexNotOnline;
61       break;
62     }
63     // must be normal DIH/TC fragment
64     TreeHead& tree = frag.m_tree;
65     // check for empty fragment
66     if (tree.m_root == NullTupLoc) {
67       jam();
68       AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
69       conf->scanPtr = req->senderData;
70       conf->accPtr = RNIL;
71       conf->flag = AccScanConf::ZEMPTY_FRAGMENT;
72       sendSignal(req->senderRef, GSN_ACC_SCANCONF,
73           signal, AccScanConf::SignalLength, JBB);
74       return;
75     }
76     // seize from pool and link to per-fragment list
77     if (ERROR_INSERTED(12008) ||
78         ! frag.m_scanList.seize(scanPtr)) {
79       CLEAR_ERROR_INSERT_VALUE;
80       jam();
81       // should never happen but can be used to test error handling
82       errorCode = AccScanRef::TuxNoFreeScanOp;
83       break;
84     }
85     new (scanPtr.p) ScanOp;
86     scanPtr.p->m_state = ScanOp::First;
87     scanPtr.p->m_userPtr = req->senderData;
88     scanPtr.p->m_userRef = req->senderRef;
89     scanPtr.p->m_tableId = indexPtr.p->m_tableId;
90     scanPtr.p->m_indexId = indexPtr.i;
91     scanPtr.p->m_fragId = fragPtr.p->m_fragId;
92     scanPtr.p->m_fragPtrI = fragPtr.i;
93     scanPtr.p->m_transId1 = req->transId1;
94     scanPtr.p->m_transId2 = req->transId2;
95     scanPtr.p->m_savePointId = req->savePointId;
96     scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
97     scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
98     scanPtr.p->m_descending = AccScanReq::getDescendingFlag(req->requestInfo);
99     /*
100      * readCommitted lockMode keyInfo
101      * 1 0 0 - read committed (no lock)
102      * 0 0 0 - read latest (read lock)
103      * 0 1 1 - read exclusive (write lock)
104      */
105     const bool isStatScan = AccScanReq::getStatScanFlag(req->requestInfo);
106     if (unlikely(isStatScan)) {
107       jam();
108       if (!scanPtr.p->m_readCommitted) {
109         jam();
110         errorCode = AccScanRef::TuxInvalidLockMode;
111         break;
112       }
113       StatOpPtr statPtr;
114       if (!c_statOpPool.seize(statPtr)) {
115         jam();
116         errorCode = AccScanRef::TuxNoFreeStatOp;
117         break;
118       }
119       scanPtr.p->m_statOpPtrI = statPtr.i;
120       new (statPtr.p) StatOp(*indexPtr.p);
121       statPtr.p->m_scanOpPtrI = scanPtr.i;
122       // rest of StatOp is initialized in execTUX_BOUND_INFO
123 #ifdef VM_TRACE
124       if (debugFlags & DebugStat) {
125         debugOut << "Seize stat op" << endl;
126       }
127 #endif
128     }
129 #ifdef VM_TRACE
130     if (debugFlags & DebugScan) {
131       debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
132     }
133 #endif
134     // conf
135     AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
136     conf->scanPtr = req->senderData;
137     conf->accPtr = scanPtr.i;
138     conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
139     sendSignal(req->senderRef, GSN_ACC_SCANCONF,
140         signal, AccScanConf::SignalLength, JBB);
141     return;
142   } while (0);
143   if (scanPtr.i != RNIL) {
144     jam();
145     releaseScanOp(scanPtr);
146   }
147   // ref
148   ndbrequire(errorCode != 0);
149   AccScanRef* ref = (AccScanRef*)signal->getDataPtrSend();
150   ref->scanPtr = req->senderData;
151   ref->accPtr = RNIL;
152   ref->errorCode = errorCode;
153   sendSignal(req->senderRef, GSN_ACC_SCANREF,
154       signal, AccScanRef::SignalLength, JBB);
155 }
156 
157 /*
158  * Receive bounds for scan in single direct call.  The bounds can arrive
159  * in any order.  Attribute ids are those of index table.
160  *
161  * Replace EQ by equivalent LE + GE.  Check for conflicting bounds.
162  * Check that sets of lower and upper bounds are on initial sequences of
163  * keys and that all but possibly last bound is non-strict.
164  *
165  * Finally convert the sets of lower and upper bounds (i.e. start key
166  * and end key) to NdbPack format.  The data is saved in segmented
167  * memory.  The bound is reconstructed at use time via unpackBound().
168  *
169  * Error handling:  Error code is set in the scan and also returned in
170  * EXECUTE_DIRECT (the old way).
171  */
172 void
execTUX_BOUND_INFO(Signal * signal)173 Dbtux::execTUX_BOUND_INFO(Signal* signal)
174 {
175   jamEntry();
176   // get records
177   TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
178   ScanOpPtr scanPtr;
179   scanPtr.i = req->tuxScanPtrI;
180   c_scanOpPool.getPtr(scanPtr);
181   ScanOp& scan = *scanPtr.p;
182   const Index& index = *c_indexPool.getPtr(scan.m_indexId);
183   const DescHead& descHead = getDescHead(index);
184   const KeyType* keyTypes = getKeyTypes(descHead);
185   // data passed in Signal
186   const Uint32* const boundData = &req->data[0];
187   Uint32 boundLen = req->boundAiLength;
188   Uint32 boundOffset = 0;
189   // initialize stats scan
190   if (unlikely(scan.m_statOpPtrI != RNIL)) {
191     // stats options before bounds
192     StatOpPtr statPtr;
193     statPtr.i = scan.m_statOpPtrI;
194     c_statOpPool.getPtr(statPtr);
195     Uint32 usedLen = 0;
196     if (statScanInit(statPtr, boundData, boundLen, &usedLen) == -1) {
197       jam();
198       ndbrequire(scan.m_errorCode != 0);
199       req->errorCode = scan.m_errorCode;
200       return;
201     }
202     ndbrequire(usedLen <= boundLen);
203     boundLen -= usedLen;
204     boundOffset += usedLen;
205   }
206   // extract lower and upper bound in separate passes
207   for (unsigned idir = 0; idir <= 1; idir++) {
208     jam();
209     struct BoundInfo {
210       int type2;      // with EQ -> LE/GE
211       Uint32 offset;  // word offset in signal data
212       Uint32 bytes;
213     };
214     BoundInfo boundInfo[MaxIndexAttributes];
215     // largest attrId seen plus one
216     Uint32 maxAttrId = 0;
217     const Uint32* const data = &boundData[boundOffset];
218     Uint32 offset = 0;
219     while (offset + 2 <= boundLen) {
220       jam();
221       const Uint32 type = data[offset];
222       const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
223       const Uint32 attrId = ah->getAttributeId();
224       const Uint32 byteSize = ah->getByteSize();
225       const Uint32 dataSize = ah->getDataSize();
226       // check type
227       if (unlikely(type > 4)) {
228         jam();
229         scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
230         req->errorCode = scan.m_errorCode;
231         return;
232       }
233       Uint32 type2 = type;
234       if (type2 == 4) {
235         jam();
236         type2 = (idir << 1); // LE=0 GE=2
237       }
238       // check if attribute belongs to this bound
239       if ((type2 & 0x2) == (idir << 1)) {
240         if (unlikely(attrId >= index.m_numAttrs)) {
241           jam();
242           scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
243           req->errorCode = scan.m_errorCode;
244           return;
245         }
246         // mark entries in any gap as undefined
247         while (maxAttrId <= attrId) {
248           jam();
249           BoundInfo& b = boundInfo[maxAttrId];
250           b.type2 = -1;
251           maxAttrId++;
252         }
253         BoundInfo& b = boundInfo[attrId];
254         // duplicate no longer allowed (wl#4163)
255         if (unlikely(b.type2 != -1)) {
256           jam();
257           scan.m_errorCode = TuxBoundInfo::InvalidBounds;
258           req->errorCode = scan.m_errorCode;
259           return;
260         }
261         b.type2 = (int)type2;
262         b.offset = offset + 1; // poai
263         b.bytes = byteSize;
264       }
265       // jump to next
266       offset += 2 + dataSize;
267     }
268     if (unlikely(offset != boundLen)) {
269       jam();
270       scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
271       req->errorCode = scan.m_errorCode;
272       return;
273     }
274     // check and pack the bound data
275     KeyData searchBoundData(index.m_keySpec, true, 0);
276     KeyBound searchBound(searchBoundData);
277     searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
278     int strict = 0; // 0 or 1
279     Uint32 i;
280     for (i = 0; i < maxAttrId; i++) {
281       jam();
282       const BoundInfo& b = boundInfo[i];
283        // check for gap or strict bound before last
284        strict = (b.type2 & 0x1);
285        if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
286          jam();
287          scan.m_errorCode = TuxBoundInfo::InvalidBounds;
288          req->errorCode = scan.m_errorCode;
289          return;
290        }
291        Uint32 len;
292        if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
293            b.bytes != len)) {
294          jam();
295          scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
296          req->errorCode = scan.m_errorCode;
297          return;
298        }
299     }
300     int side = 0;
301     if (maxAttrId != 0) {
302       // arithmetic is faster
303       // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
304       side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
305     }
306     if (unlikely(searchBound.finalize(side) == -1)) {
307       jam();
308       scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
309       req->errorCode = scan.m_errorCode;
310       return;
311     }
312     ScanBound& scanBound = scan.m_scanBound[idir];
313     scanBound.m_cnt = maxAttrId;
314     scanBound.m_side = side;
315     // save data words in segmented memory
316     {
317       DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
318       LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
319       const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
320       Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
321       bool ok = b.append(data, size);
322       if (unlikely(!ok)) {
323         jam();
324         scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
325         req->errorCode = scan.m_errorCode;
326         return;
327       }
328     }
329   }
330   if (ERROR_INSERTED(12009)) {
331     jam();
332     CLEAR_ERROR_INSERT_VALUE;
333     scan.m_errorCode = TuxBoundInfo::InvalidBounds;
334     req->errorCode = scan.m_errorCode;
335     return;
336   }
337   // no error
338   req->errorCode = 0;
339 }
340 
341 void
execNEXT_SCANREQ(Signal * signal)342 Dbtux::execNEXT_SCANREQ(Signal* signal)
343 {
344   jamEntry();
345   const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
346   const NextScanReq* const req = &reqCopy;
347   ScanOpPtr scanPtr;
348   scanPtr.i = req->accPtr;
349   c_scanOpPool.getPtr(scanPtr);
350   ScanOp& scan = *scanPtr.p;
351   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
352 #ifdef VM_TRACE
353   if (debugFlags & DebugScan) {
354     debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
355   }
356 #endif
357   // handle unlock previous and close scan
358   switch (req->scanFlag) {
359   case NextScanReq::ZSCAN_NEXT:
360     jam();
361     break;
362   case NextScanReq::ZSCAN_NEXT_COMMIT:
363     jam();
364   case NextScanReq::ZSCAN_COMMIT:
365     jam();
366     if (! scan.m_readCommitted) {
367       jam();
368       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
369       lockReq->returnCode = RNIL;
370       lockReq->requestInfo = AccLockReq::Unlock;
371       lockReq->accOpPtr = req->accOperationPtr;
372       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
373       jamEntry();
374       ndbrequire(lockReq->returnCode == AccLockReq::Success);
375       removeAccLockOp(scanPtr, req->accOperationPtr);
376     }
377     if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
378       jam();
379       NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
380       conf->scanPtr = scan.m_userPtr;
381       unsigned signalLength = 1;
382       sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
383 		 signal, signalLength, JBB);
384       return;
385     }
386     break;
387   case NextScanReq::ZSCAN_CLOSE:
388     jam();
389     // unlink from tree node first to avoid state changes
390     if (scan.m_scanPos.m_loc != NullTupLoc) {
391       jam();
392       const TupLoc loc = scan.m_scanPos.m_loc;
393       NodeHandle node(frag);
394       selectNode(node, loc);
395       unlinkScan(node, scanPtr);
396       scan.m_scanPos.m_loc = NullTupLoc;
397     }
398     if (scan.m_lockwait) {
399       jam();
400       ndbrequire(scan.m_accLockOp != RNIL);
401       // use ACC_ABORTCONF to flush out any reply in job buffer
402       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
403       lockReq->returnCode = RNIL;
404       lockReq->requestInfo = AccLockReq::AbortWithConf;
405       lockReq->accOpPtr = scan.m_accLockOp;
406       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
407 		     AccLockReq::UndoSignalLength);
408       jamEntry();
409       ndbrequire(lockReq->returnCode == AccLockReq::Success);
410       scan.m_state = ScanOp::Aborting;
411       return;
412     }
413     if (scan.m_state == ScanOp::Locked) {
414       jam();
415       ndbrequire(scan.m_accLockOp != RNIL);
416       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
417       lockReq->returnCode = RNIL;
418       lockReq->requestInfo = AccLockReq::Abort;
419       lockReq->accOpPtr = scan.m_accLockOp;
420       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
421 		     AccLockReq::UndoSignalLength);
422       jamEntry();
423       ndbrequire(lockReq->returnCode == AccLockReq::Success);
424       scan.m_accLockOp = RNIL;
425     }
426     scan.m_state = ScanOp::Aborting;
427     scanClose(signal, scanPtr);
428     return;
429   case NextScanReq::ZSCAN_NEXT_ABORT:
430     jam();
431   default:
432     jam();
433     ndbrequire(false);
434     break;
435   }
436   // start looking for next scan result
437   AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
438   checkReq->accPtr = scanPtr.i;
439   checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
440   EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
441   jamEntry();
442 }
443 
444 void
execACC_CHECK_SCAN(Signal * signal)445 Dbtux::execACC_CHECK_SCAN(Signal* signal)
446 {
447   jamEntry();
448   const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
449   const AccCheckScan* const req = &reqCopy;
450   ScanOpPtr scanPtr;
451   scanPtr.i = req->accPtr;
452   c_scanOpPool.getPtr(scanPtr);
453   ScanOp& scan = *scanPtr.p;
454   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
455 #ifdef VM_TRACE
456   if (debugFlags & DebugScan) {
457     debugOut << "ACC_CHECK_SCAN scan " << scanPtr.i << " " << scan << endl;
458   }
459 #endif
460   if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
461     jam();
462     signal->theData[0] = scan.m_userPtr;
463     signal->theData[1] = true;
464     EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
465     jamEntry();
466     return;   // stop
467   }
468   if (scan.m_lockwait) {
469     jam();
470     // LQH asks if we are waiting for lock and we tell it to ask again
471     const TreeEnt ent = scan.m_scanEnt;
472     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
473     conf->scanPtr = scan.m_userPtr;
474     conf->accOperationPtr = RNIL;       // no tuple returned
475     conf->fragId = frag.m_fragId;
476     unsigned signalLength = 3;
477     // if TC has ordered scan close, it will be detected here
478     sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
479         signal, signalLength, JBB);
480     return;     // stop
481   }
482   // check index online
483   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
484   if (unlikely(index.m_state != Index::Online) &&
485       scanPtr.p->m_errorCode == 0) {
486     jam();
487 #ifdef VM_TRACE
488     if (debugFlags & (DebugMeta | DebugScan)) {
489       debugOut << "Index dropping at execACC_CHECK_SCAN " << scanPtr.i << " " << *scanPtr.p << endl;
490     }
491 #endif
492     scanPtr.p->m_errorCode = AccScanRef::TuxIndexNotOnline;
493   }
494   if (scan.m_state == ScanOp::First) {
495     jam();
496     // search is done only once in single range scan
497     scanFirst(scanPtr);
498   }
499   if (scan.m_state == ScanOp::Current ||
500       scan.m_state == ScanOp::Next) {
501     jam();
502     // look for next
503     scanFind(scanPtr);
504   }
505   // for reading tuple key in Found or Locked state
506   Uint32* pkData = c_ctx.c_dataBuffer;
507   unsigned pkSize = 0; // indicates not yet done
508   if (scan.m_state == ScanOp::Found) {
509     // found an entry to return
510     jam();
511     ndbrequire(scan.m_accLockOp == RNIL);
512     if (! scan.m_readCommitted) {
513       jam();
514       const TreeEnt ent = scan.m_scanEnt;
515       // read tuple key
516       readTablePk(frag, ent, pkData, pkSize);
517       // get read lock or exclusive lock
518       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
519       lockReq->returnCode = RNIL;
520       lockReq->requestInfo =
521         scan.m_lockMode == 0 ? AccLockReq::LockShared : AccLockReq::LockExclusive;
522       lockReq->accOpPtr = RNIL;
523       lockReq->userPtr = scanPtr.i;
524       lockReq->userRef = reference();
525       lockReq->tableId = scan.m_tableId;
526       lockReq->fragId = frag.m_fragId;
527       lockReq->fragPtrI = frag.m_accTableFragPtrI;
528       const Uint32* const buf32 = static_cast<Uint32*>(pkData);
529       const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
530       lockReq->hashValue = md5_hash(buf64, pkSize);
531       Uint32 lkey1, lkey2;
532       getTupAddr(frag, ent, lkey1, lkey2);
533       lockReq->page_id = lkey1;
534       lockReq->page_idx = lkey2;
535       lockReq->transId1 = scan.m_transId1;
536       lockReq->transId2 = scan.m_transId2;
537       // execute
538       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::LockSignalLength);
539       jamEntry();
540       switch (lockReq->returnCode) {
541       case AccLockReq::Success:
542         jam();
543         scan.m_state = ScanOp::Locked;
544         scan.m_accLockOp = lockReq->accOpPtr;
545 #ifdef VM_TRACE
546         if (debugFlags & (DebugScan | DebugLock)) {
547           debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl;
548         }
549 #endif
550         break;
551       case AccLockReq::IsBlocked:
552         jam();
553         // normal lock wait
554         scan.m_state = ScanOp::Blocked;
555         scan.m_lockwait = true;
556         scan.m_accLockOp = lockReq->accOpPtr;
557 #ifdef VM_TRACE
558         if (debugFlags & (DebugScan | DebugLock)) {
559           debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl;
560         }
561 #endif
562         // LQH will wake us up
563         signal->theData[0] = scan.m_userPtr;
564         signal->theData[1] = true;
565         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
566         jamEntry();
567         return;  // stop
568         break;
569       case AccLockReq::Refused:
570         jam();
571         // we cannot see deleted tuple (assert only)
572         ndbassert(false);
573         // skip it
574         scan.m_state = ScanOp::Next;
575         signal->theData[0] = scan.m_userPtr;
576         signal->theData[1] = true;
577         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
578         jamEntry();
579         return;  // stop
580         break;
581       case AccLockReq::NoFreeOp:
582         jam();
583         // max ops should depend on max scans (assert only)
584         ndbassert(false);
585         // stay in Found state
586         scan.m_state = ScanOp::Found;
587         signal->theData[0] = scan.m_userPtr;
588         signal->theData[1] = true;
589         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
590         jamEntry();
591         return;  // stop
592         break;
593       default:
594         ndbrequire(false);
595         break;
596       }
597     } else {
598       scan.m_state = ScanOp::Locked;
599     }
600   }
601   if (scan.m_state == ScanOp::Locked) {
602     // we have lock or do not need one
603     jam();
604     // read keys if not already done (uses signal)
605     const TreeEnt ent = scan.m_scanEnt;
606     // conf signal
607     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
608     conf->scanPtr = scan.m_userPtr;
609     // the lock is passed to LQH
610     Uint32 accLockOp = scan.m_accLockOp;
611     if (accLockOp != RNIL) {
612       scan.m_accLockOp = RNIL;
613       // remember it until LQH unlocks it
614       addAccLockOp(scanPtr, accLockOp);
615     } else {
616       ndbrequire(scan.m_readCommitted);
617       // operation RNIL in LQH would signal no tuple returned
618       accLockOp = (Uint32)-1;
619     }
620     conf->accOperationPtr = accLockOp;
621     conf->fragId = frag.m_fragId;
622     Uint32 lkey1, lkey2;
623     getTupAddr(frag, ent, lkey1, lkey2);
624     conf->localKey[0] = lkey1;
625     conf->localKey[1] = lkey2;
626     unsigned signalLength = 5;
627     // add key info
628     if (! scan.m_readCommitted) {
629       sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
630           signal, signalLength, JBB);
631     } else {
632       Uint32 blockNo = refToMain(scan.m_userRef);
633       EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
634     }
635     // next time look for next entry
636     scan.m_state = ScanOp::Next;
637     return;
638   }
639   // XXX in ACC this is checked before req->checkLcpStop
640   if (scan.m_state == ScanOp::Last) {
641     jam();
642     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
643     conf->scanPtr = scan.m_userPtr;
644     conf->accOperationPtr = RNIL;
645     conf->fragId = RNIL;
646     unsigned signalLength = 3;
647     sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
648         signal, signalLength, JBB);
649     return;
650   }
651   ndbrequire(false);
652 }
653 
654 /*
655  * Lock succeeded (after delay) in ACC.  If the lock is for current
656  * entry, set state to Locked.  If the lock is for an entry we were
657  * moved away from, simply unlock it.  Finally, if we are closing the
658  * scan, do nothing since we have already sent an abort request.
659  */
660 void
execACCKEYCONF(Signal * signal)661 Dbtux::execACCKEYCONF(Signal* signal)
662 {
663   jamEntry();
664   ScanOpPtr scanPtr;
665   scanPtr.i = signal->theData[0];
666   c_scanOpPool.getPtr(scanPtr);
667   ScanOp& scan = *scanPtr.p;
668 #ifdef VM_TRACE
669   if (debugFlags & (DebugScan | DebugLock)) {
670     debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl;
671   }
672 #endif
673   ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
674   scan.m_lockwait = false;
675   if (scan.m_state == ScanOp::Blocked) {
676     // the lock wait was for current entry
677     jam();
678     scan.m_state = ScanOp::Locked;
679     // LQH has the ball
680     return;
681   }
682   if (scan.m_state != ScanOp::Aborting) {
683     // we were moved, release lock
684     jam();
685     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
686     lockReq->returnCode = RNIL;
687     lockReq->requestInfo = AccLockReq::Abort;
688     lockReq->accOpPtr = scan.m_accLockOp;
689     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
690     jamEntry();
691     ndbrequire(lockReq->returnCode == AccLockReq::Success);
692     scan.m_accLockOp = RNIL;
693     // LQH has the ball
694     return;
695   }
696   // lose the lock
697   scan.m_accLockOp = RNIL;
698   // continue at ACC_ABORTCONF
699 }
700 
701 /*
702  * Lock failed (after delay) in ACC.  Probably means somebody ahead of
703  * us in lock queue deleted the tuple.
704  */
705 void
execACCKEYREF(Signal * signal)706 Dbtux::execACCKEYREF(Signal* signal)
707 {
708   jamEntry();
709   ScanOpPtr scanPtr;
710   scanPtr.i = signal->theData[0];
711   c_scanOpPool.getPtr(scanPtr);
712   ScanOp& scan = *scanPtr.p;
713 #ifdef VM_TRACE
714   if (debugFlags & (DebugScan | DebugLock)) {
715     debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl;
716   }
717 #endif
718   ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
719   scan.m_lockwait = false;
720   if (scan.m_state != ScanOp::Aborting) {
721     jam();
722     // release the operation
723     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
724     lockReq->returnCode = RNIL;
725     lockReq->requestInfo = AccLockReq::Abort;
726     lockReq->accOpPtr = scan.m_accLockOp;
727     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
728     jamEntry();
729     ndbrequire(lockReq->returnCode == AccLockReq::Success);
730     scan.m_accLockOp = RNIL;
731     // scan position should already have been moved (assert only)
732     if (scan.m_state == ScanOp::Blocked) {
733       jam();
734       // can happen when Dropping
735 #ifdef VM_TRACE
736       const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
737       const Index& index = *c_indexPool.getPtr(frag.m_indexId);
738       ndbassert(index.m_state != Index::Online);
739 #endif
740       scan.m_state = ScanOp::Next;
741     }
742     // LQH has the ball
743     return;
744   }
745   // lose the lock
746   scan.m_accLockOp = RNIL;
747   // continue at ACC_ABORTCONF
748 }
749 
750 /*
751  * Received when scan is closing.  This signal arrives after any
752  * ACCKEYCON or ACCKEYREF which may have been in job buffer.
753  */
754 void
execACC_ABORTCONF(Signal * signal)755 Dbtux::execACC_ABORTCONF(Signal* signal)
756 {
757   jamEntry();
758   ScanOpPtr scanPtr;
759   scanPtr.i = signal->theData[0];
760   c_scanOpPool.getPtr(scanPtr);
761   ScanOp& scan = *scanPtr.p;
762 #ifdef VM_TRACE
763   if (debugFlags & (DebugScan | DebugLock)) {
764     debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl;
765   }
766 #endif
767   ndbrequire(scan.m_state == ScanOp::Aborting);
768   // most likely we are still in lock wait
769   if (scan.m_lockwait) {
770     jam();
771     scan.m_lockwait = false;
772     scan.m_accLockOp = RNIL;
773   }
774   scanClose(signal, scanPtr);
775 }
776 
777 /*
778  * Find start position for single range scan.
779  */
780 void
scanFirst(ScanOpPtr scanPtr)781 Dbtux::scanFirst(ScanOpPtr scanPtr)
782 {
783   ScanOp& scan = *scanPtr.p;
784   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
785   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
786 #ifdef VM_TRACE
787   if (debugFlags & DebugScan) {
788     debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
789   }
790 #endif
791   // scan direction 0, 1
792   const unsigned idir = scan.m_descending;
793   // set up bound from segmented memory
794   const ScanBound& scanBound = scan.m_scanBound[idir];
795   KeyDataC searchBoundData(index.m_keySpec, true);
796   KeyBoundC searchBound(searchBoundData);
797   unpackBound(c_ctx, scanBound, searchBound);
798   TreePos treePos;
799   searchToScan(frag, idir, searchBound, treePos);
800   if (treePos.m_loc != NullTupLoc) {
801     scan.m_scanPos = treePos;
802     // link the scan to node found
803     NodeHandle node(frag);
804     selectNode(node, treePos.m_loc);
805     linkScan(node, scanPtr);
806     if (treePos.m_dir == 3) {
807       jam();
808       // check upper bound
809       TreeEnt ent = node.getEnt(treePos.m_pos);
810       if (scanCheck(scanPtr, ent)) {
811         jam();
812         scan.m_state = ScanOp::Current;
813       } else {
814         jam();
815         scan.m_state = ScanOp::Last;
816       }
817     } else {
818       jam();
819       scan.m_state = ScanOp::Next;
820     }
821   } else {
822     jam();
823     scan.m_state = ScanOp::Last;
824   }
825 #ifdef VM_TRACE
826   if (debugFlags & DebugScan) {
827     debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
828   }
829 #endif
830 }
831 
832 /*
833  * Look for entry to return as scan result.
834  */
835 void
scanFind(ScanOpPtr scanPtr)836 Dbtux::scanFind(ScanOpPtr scanPtr)
837 {
838   ScanOp& scan = *scanPtr.p;
839   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
840 #ifdef VM_TRACE
841   if (debugFlags & DebugScan) {
842     debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
843   }
844 #endif
845   ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
846   while (1) {
847     jam();
848     if (scan.m_state == ScanOp::Next)
849       scanNext(scanPtr, false);
850     if (scan.m_state == ScanOp::Current) {
851       jam();
852       const TreePos pos = scan.m_scanPos;
853       NodeHandle node(frag);
854       selectNode(node, pos.m_loc);
855       const TreeEnt ent = node.getEnt(pos.m_pos);
856       if (unlikely(scan.m_statOpPtrI != RNIL)) {
857         StatOpPtr statPtr;
858         statPtr.i = scan.m_statOpPtrI;
859         c_statOpPool.getPtr(statPtr);
860         // report row to stats, returns true if a sample is available
861         int ret = statScanAddRow(statPtr, ent);
862         if (ret == 1) {
863           jam();
864           scan.m_state = ScanOp::Found;
865           // may not access non-pseudo cols but must return valid ent
866           scan.m_scanEnt = ent;
867           break;
868         }
869       } else if (scanVisible(scanPtr, ent)) {
870         jam();
871         scan.m_state = ScanOp::Found;
872         scan.m_scanEnt = ent;
873         break;
874       }
875     } else {
876       jam();
877       break;
878     }
879     scan.m_state = ScanOp::Next;
880   }
881 #ifdef VM_TRACE
882   if (debugFlags & DebugScan) {
883     debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
884   }
885 #endif
886 }
887 
888 /*
889  * Move to next entry.  The scan is already linked to some node.  When
890  * we leave, if an entry was found, it will be linked to a possibly
891  * different node.  The scan has a position, and a direction which tells
892  * from where we came to this position.  This is one of (all comments
893  * are in terms of ascending scan):
894  *
895  * 0 - up from left child (scan this node next)
896  * 1 - up from right child (proceed to parent)
897  * 2 - up from root (the scan ends)
898  * 3 - left to right within node (at end set state 5)
899  * 4 - down from parent (proceed to left child)
900  * 5 - at node end proceed to right child (state becomes 4)
901  *
902  * If an entry was found, scan direction is 3.  Therefore tree
903  * re-organizations need not worry about scan direction.
904  *
905  * This method is also used to move a scan when its entry is removed
906  * (see moveScanList).  If the scan is Blocked, we check if it remains
907  * Blocked on a different version of the tuple.  Otherwise the tuple is
908  * lost and state becomes Current.
909  */
910 void
scanNext(ScanOpPtr scanPtr,bool fromMaintReq)911 Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
912 {
913   ScanOp& scan = *scanPtr.p;
914   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
915 #ifdef VM_TRACE
916   if (debugFlags & (DebugMaint | DebugScan)) {
917     debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
918   }
919 #endif
920   // cannot be moved away from tuple we have locked
921   ndbrequire(scan.m_state != ScanOp::Locked);
922   // scan direction
923   const unsigned idir = scan.m_descending; // 0, 1
924   const int jdir = 1 - 2 * (int)idir;      // 1, -1
925   // use copy of position
926   TreePos pos = scan.m_scanPos;
927   // get and remember original node
928   NodeHandle origNode(frag);
929   selectNode(origNode, pos.m_loc);
930   ndbrequire(islinkScan(origNode, scanPtr));
931   // current node in loop
932   NodeHandle node = origNode;
933   // copy of entry found
934   TreeEnt ent;
935   while (true) {
936     jam();
937 #ifdef VM_TRACE
938     if (debugFlags & (DebugMaint | DebugScan)) {
939       debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
940     }
941 #endif
942     if (pos.m_dir == 2) {
943       // coming up from root ends the scan
944       jam();
945       pos.m_loc = NullTupLoc;
946       break;
947     }
948     if (node.m_loc != pos.m_loc) {
949       jam();
950       selectNode(node, pos.m_loc);
951     }
952     if (pos.m_dir == 4) {
953       // coming down from parent proceed to left child
954       jam();
955       TupLoc loc = node.getLink(idir);
956       if (loc != NullTupLoc) {
957         jam();
958         pos.m_loc = loc;
959         pos.m_dir = 4;  // unchanged
960         continue;
961       }
962       // pretend we came from left child
963       pos.m_dir = idir;
964     }
965     if (pos.m_dir == 5) {
966       // at node end proceed to right child
967       jam();
968       TupLoc loc = node.getLink(1 - idir);
969       if (loc != NullTupLoc) {
970         jam();
971         pos.m_loc = loc;
972         pos.m_dir = 4;  // down from parent as usual
973         continue;
974       }
975       // pretend we came from right child
976       pos.m_dir = 1 - idir;
977     }
978     const unsigned occup = node.getOccup();
979     if (occup == 0) {
980       jam();
981       ndbrequire(fromMaintReq);
982       // move back to parent - see comment in treeRemoveInner
983       pos.m_loc = node.getLink(2);
984       pos.m_dir = node.getSide();
985       continue;
986     }
987     if (pos.m_dir == idir) {
988       // coming up from left child scan current node
989       jam();
990       pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
991       pos.m_dir = 3;
992     }
993     if (pos.m_dir == 3) {
994       // before or within node
995       jam();
996       // advance position - becomes ZNIL (> occup) if 0 and descending
997       pos.m_pos += jdir;
998       if (pos.m_pos < occup) {
999         jam();
1000         pos.m_dir = 3;  // unchanged
1001         ent = node.getEnt(pos.m_pos);
1002         if (! scanCheck(scanPtr, ent)) {
1003           jam();
1004           pos.m_loc = NullTupLoc;
1005         }
1006         break;
1007       }
1008       // after node proceed to right child
1009       pos.m_dir = 5;
1010       continue;
1011     }
1012     if (pos.m_dir == 1 - idir) {
1013       // coming up from right child proceed to parent
1014       jam();
1015       pos.m_loc = node.getLink(2);
1016       pos.m_dir = node.getSide();
1017       continue;
1018     }
1019     ndbrequire(false);
1020   }
1021   // copy back position
1022   scan.m_scanPos = pos;
1023   // relink
1024   if (pos.m_loc != NullTupLoc) {
1025     ndbrequire(pos.m_dir == 3);
1026     ndbrequire(pos.m_loc == node.m_loc);
1027     if (origNode.m_loc != node.m_loc) {
1028       jam();
1029       unlinkScan(origNode, scanPtr);
1030       linkScan(node, scanPtr);
1031     }
1032     if (scan.m_state != ScanOp::Blocked) {
1033       scan.m_state = ScanOp::Current;
1034     } else {
1035       jam();
1036       ndbrequire(fromMaintReq);
1037       TreeEnt& scanEnt = scan.m_scanEnt;
1038       ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
1039       if (scanEnt.eqtuple(ent)) {
1040         // remains blocked on another version
1041         scanEnt = ent;
1042       } else {
1043         jam();
1044         scanEnt.m_tupLoc = NullTupLoc;
1045         scan.m_state = ScanOp::Current;
1046       }
1047     }
1048   } else {
1049     jam();
1050     unlinkScan(origNode, scanPtr);
1051     scan.m_state = ScanOp::Last;
1052   }
1053 #ifdef VM_TRACE
1054   if (debugFlags & (DebugMaint | DebugScan)) {
1055     debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
1056   }
1057 #endif
1058 }
1059 
1060 /*
1061  * Check end key.  Return true if scan is still within range.
1062  *
1063  * Error handling:  If scan error code has been set, return false at
1064  * once.  This terminates the scan and also avoids kernel crash on
1065  * invalid data.
1066  */
1067 bool
scanCheck(ScanOpPtr scanPtr,TreeEnt ent)1068 Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
1069 {
1070   ScanOp& scan = *scanPtr.p;
1071   if (unlikely(scan.m_errorCode != 0)) {
1072     jam();
1073     return false;
1074   }
1075   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1076   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
1077   const unsigned idir = scan.m_descending;
1078   const int jdir = 1 - 2 * (int)idir;
1079   const ScanBound& scanBound = scan.m_scanBound[1 - idir];
1080   int ret = 0;
1081   if (scanBound.m_cnt != 0) {
1082     jam();
1083     // set up bound from segmented memory
1084     KeyDataC searchBoundData(index.m_keySpec, true);
1085     KeyBoundC searchBound(searchBoundData);
1086     unpackBound(c_ctx, scanBound, searchBound);
1087     // key data for the entry
1088     KeyData entryKey(index.m_keySpec, true, 0);
1089     entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
1090     readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
1091     // compare bound to key
1092     const Uint32 boundCount = searchBound.get_data().get_cnt();
1093     ret = cmpSearchBound(c_ctx, searchBound, entryKey, boundCount);
1094     ndbrequire(ret != 0);
1095     ret = (-1) * ret; // reverse for key vs bound
1096     ret = jdir * ret; // reverse for descending scan
1097   }
1098 #ifdef VM_TRACE
1099   if (debugFlags & DebugScan) {
1100     debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
1101   }
1102 #endif
1103   return (ret <= 0);
1104 }
1105 
1106 /*
1107  * Check if an entry is visible to the scan.
1108  *
1109  * There is a special check to never accept same tuple twice in a row.
1110  * This is faster than asking TUP.  It also fixes some special cases
1111  * which are not analyzed or handled yet.
1112  *
1113  * Error handling:  If scan error code has been set, return false since
1114  * no new result can be returned to LQH.  The scan will then look for
1115  * next result and terminate via scanCheck():
1116  */
1117 bool
scanVisible(ScanOpPtr scanPtr,TreeEnt ent)1118 Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
1119 {
1120   const ScanOp& scan = *scanPtr.p;
1121   if (unlikely(scan.m_errorCode != 0)) {
1122     jam();
1123     return false;
1124   }
1125   const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1126   Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
1127   Uint32 pageId = ent.m_tupLoc.getPageId();
1128   Uint32 pageOffset = ent.m_tupLoc.getPageOffset();
1129   Uint32 tupVersion = ent.m_tupVersion;
1130   // check for same tuple twice in row
1131   if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc)
1132   {
1133     jam();
1134     return false;
1135   }
1136   Uint32 transId1 = scan.m_transId1;
1137   Uint32 transId2 = scan.m_transId2;
1138   bool dirty = scan.m_readCommitted;
1139   Uint32 savePointId = scan.m_savePointId;
1140   bool ret = c_tup->tuxQueryTh(tableFragPtrI, pageId, pageOffset, tupVersion, transId1, transId2, dirty, savePointId);
1141   jamEntry();
1142   return ret;
1143 }
1144 
1145 /*
1146  * Finish closing of scan and send conf.  Any lock wait has been done
1147  * already.
1148  *
1149  * Error handling:  Every scan ends here.  If error code has been set,
1150  * send a REF.
1151  */
1152 void
scanClose(Signal * signal,ScanOpPtr scanPtr)1153 Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
1154 {
1155   ScanOp& scan = *scanPtr.p;
1156   ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
1157   // unlock all not unlocked by LQH
1158   if (! scan.m_accLockOps.isEmpty()) {
1159     jam();
1160     abortAccLockOps(signal, scanPtr);
1161   }
1162   if (scanPtr.p->m_errorCode == 0) {
1163     jam();
1164     // send conf
1165     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1166     conf->scanPtr = scanPtr.p->m_userPtr;
1167     conf->accOperationPtr = RNIL;
1168     conf->fragId = RNIL;
1169     unsigned signalLength = 3;
1170     sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
1171         signal, signalLength, JBB);
1172   } else {
1173     // send ref
1174     NextScanRef* ref = (NextScanRef*)signal->getDataPtr();
1175     ref->scanPtr = scanPtr.p->m_userPtr;
1176     ref->accOperationPtr = RNIL;
1177     ref->fragId = RNIL;
1178     ref->errorCode = scanPtr.p->m_errorCode;
1179     sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANREF,
1180                signal, NextScanRef::SignalLength, JBB);
1181   }
1182   releaseScanOp(scanPtr);
1183 }
1184 
1185 void
abortAccLockOps(Signal * signal,ScanOpPtr scanPtr)1186 Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr)
1187 {
1188   ScanOp& scan = *scanPtr.p;
1189 #ifdef VM_TRACE
1190   if (debugFlags & (DebugScan | DebugLock)) {
1191     debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl;
1192   }
1193 #endif
1194   LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1195   ScanLockPtr lockPtr;
1196   while (list.first(lockPtr)) {
1197     jam();
1198     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
1199     lockReq->returnCode = RNIL;
1200     lockReq->requestInfo = AccLockReq::Abort;
1201     lockReq->accOpPtr = lockPtr.p->m_accLockOp;
1202     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
1203     jamEntry();
1204     ndbrequire(lockReq->returnCode == AccLockReq::Success);
1205     list.release(lockPtr);
1206   }
1207 }
1208 
1209 void
addAccLockOp(ScanOpPtr scanPtr,Uint32 accLockOp)1210 Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1211 {
1212   ScanOp& scan = *scanPtr.p;
1213 #ifdef VM_TRACE
1214   if (debugFlags & (DebugScan | DebugLock)) {
1215     debugOut << "Add lock " << hex << accLockOp << dec
1216              << " to scan " << scanPtr.i << " " << scan << endl;
1217   }
1218 #endif
1219   LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1220   ScanLockPtr lockPtr;
1221 #ifdef VM_TRACE
1222   list.first(lockPtr);
1223   while (lockPtr.i != RNIL) {
1224     ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
1225     list.next(lockPtr);
1226   }
1227 #endif
1228   bool ok = list.seize(lockPtr);
1229   ndbrequire(ok);
1230   ndbrequire(accLockOp != RNIL);
1231   lockPtr.p->m_accLockOp = accLockOp;
1232 }
1233 
1234 void
removeAccLockOp(ScanOpPtr scanPtr,Uint32 accLockOp)1235 Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1236 {
1237   ScanOp& scan = *scanPtr.p;
1238 #ifdef VM_TRACE
1239   if (debugFlags & (DebugScan | DebugLock)) {
1240     debugOut << "Remove lock " << hex << accLockOp << dec
1241              << " from scan " << scanPtr.i << " " << scan << endl;
1242   }
1243 #endif
1244   LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1245   ScanLockPtr lockPtr;
1246   list.first(lockPtr);
1247   while (lockPtr.i != RNIL) {
1248     if (lockPtr.p->m_accLockOp == accLockOp) {
1249       jam();
1250       break;
1251     }
1252     list.next(lockPtr);
1253   }
1254   ndbrequire(lockPtr.i != RNIL);
1255   list.release(lockPtr);
1256 }
1257 
1258 /*
1259  * Release allocated records.
1260  */
1261 void
releaseScanOp(ScanOpPtr & scanPtr)1262 Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
1263 {
1264 #ifdef VM_TRACE
1265   if (debugFlags & DebugScan) {
1266     debugOut << "Release scan " << scanPtr.i << " " << *scanPtr.p << endl;
1267   }
1268 #endif
1269   Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
1270   for (unsigned i = 0; i <= 1; i++) {
1271     ScanBound& scanBound = scanPtr.p->m_scanBound[i];
1272     DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
1273     LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
1274     b.release();
1275   }
1276   if (unlikely(scanPtr.p->m_statOpPtrI != RNIL)) {
1277     jam();
1278     StatOpPtr statPtr;
1279     statPtr.i = scanPtr.p->m_statOpPtrI;
1280     c_statOpPool.getPtr(statPtr);
1281     c_statOpPool.release(statPtr);
1282   }
1283   // unlink from per-fragment list and release from pool
1284   frag.m_scanList.release(scanPtr);
1285 }
1286