1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUX_SCAN_CPP
26 #include "Dbtux.hpp"
27 #include <my_sys.h>
28 
29 #define JAM_FILE_ID 371
30 
31 
32 /*
33  * Error handling:  Any seized scan op is released.  ACC_SCANREF is sent
34  * to LQH.  LQH sets error code, and treats this like ZEMPTY_FRAGMENT.
35  * Therefore scan is now closed on both sides.
36  */
37 void
execACC_SCANREQ(Signal * signal)38 Dbtux::execACC_SCANREQ(Signal* signal)
39 {
40   jamEntry();
41   const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
42   const AccScanReq* const req = &reqCopy;
43   Uint32 errorCode = 0;
44   ScanOpPtr scanPtr;
45   scanPtr.i = RNIL;
46   do {
47     // get the index
48     IndexPtr indexPtr;
49     c_indexPool.getPtr(indexPtr, req->tableId);
50     // get the fragment
51     FragPtr fragPtr;
52     findFrag(jamBuffer(), *indexPtr.p, req->fragmentNo, fragPtr);
53     ndbrequire(fragPtr.i != RNIL);
54     Frag& frag = *fragPtr.p;
55     // check for index not Online (i.e. Dropping)
56     if (unlikely(indexPtr.p->m_state != Index::Online)) {
57       jam();
58 #ifdef VM_TRACE
59       if (debugFlags & (DebugMeta | DebugScan)) {
60         debugOut << "Index dropping at ACC_SCANREQ " << indexPtr.i << " " << *indexPtr.p << endl;
61       }
62 #endif
63       errorCode = AccScanRef::TuxIndexNotOnline;
64       break;
65     }
66     // must be normal DIH/TC fragment
67     TreeHead& tree = frag.m_tree;
68     // check for empty fragment
69     if (tree.m_root == NullTupLoc) {
70       jam();
71       AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
72       conf->scanPtr = req->senderData;
73       conf->accPtr = RNIL;
74       conf->flag = AccScanConf::ZEMPTY_FRAGMENT;
75       signal->theData[8] = 0;
76       /* Return ACC_SCANCONF */
77       return;
78     }
79     // seize from pool and link to per-fragment list
80     if (ERROR_INSERTED(12008) ||
81         ! frag.m_scanList.seizeFirst(scanPtr)) {
82       CLEAR_ERROR_INSERT_VALUE;
83       jam();
84       // should never happen but can be used to test error handling
85       errorCode = AccScanRef::TuxNoFreeScanOp;
86       break;
87     }
88     new (scanPtr.p) ScanOp;
89     scanPtr.p->m_state = ScanOp::First;
90     scanPtr.p->m_userPtr = req->senderData;
91     scanPtr.p->m_userRef = req->senderRef;
92     scanPtr.p->m_tableId = indexPtr.p->m_tableId;
93     scanPtr.p->m_indexId = indexPtr.i;
94     scanPtr.p->m_fragId = fragPtr.p->m_fragId;
95     scanPtr.p->m_fragPtrI = fragPtr.i;
96     scanPtr.p->m_transId1 = req->transId1;
97     scanPtr.p->m_transId2 = req->transId2;
98     scanPtr.p->m_savePointId = req->savePointId;
99     scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
100     scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
101     scanPtr.p->m_descending = AccScanReq::getDescendingFlag(req->requestInfo);
102     /*
103      * readCommitted lockMode keyInfo
104      * 1 0 0 - read committed (no lock)
105      * 0 0 0 - read latest (read lock)
106      * 0 1 1 - read exclusive (write lock)
107      */
108     const bool isStatScan = AccScanReq::getStatScanFlag(req->requestInfo);
109     if (unlikely(isStatScan)) {
110       jam();
111       if (!scanPtr.p->m_readCommitted) {
112         jam();
113         errorCode = AccScanRef::TuxInvalidLockMode;
114         break;
115       }
116       StatOpPtr statPtr;
117       if (!c_statOpPool.seize(statPtr)) {
118         jam();
119         errorCode = AccScanRef::TuxNoFreeStatOp;
120         break;
121       }
122       scanPtr.p->m_statOpPtrI = statPtr.i;
123       new (statPtr.p) StatOp(*indexPtr.p);
124       statPtr.p->m_scanOpPtrI = scanPtr.i;
125       // rest of StatOp is initialized in execTUX_BOUND_INFO
126 #ifdef VM_TRACE
127       if (debugFlags & DebugStat) {
128         debugOut << "Seize stat op" << endl;
129       }
130 #endif
131     }
132 #ifdef VM_TRACE
133     if (debugFlags & DebugScan) {
134       debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
135     }
136 #endif
137     // conf
138     AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
139     conf->scanPtr = req->senderData;
140     conf->accPtr = scanPtr.i;
141     conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
142     signal->theData[8] = 0;
143     /* Return ACC_SCANCONF */
144     return;
145   } while (0);
146   if (scanPtr.i != RNIL) {
147     jam();
148     releaseScanOp(scanPtr);
149   }
150   // ref
151   ndbrequire(errorCode != 0);
152   AccScanRef* ref = (AccScanRef*)signal->getDataPtrSend();
153   ref->scanPtr = req->senderData;
154   ref->accPtr = RNIL;
155   ref->errorCode = errorCode;
156   signal->theData[8] = 1;
157   /* Return ACC_SCANREF */
158 }
159 
160 /*
161  * Receive bounds for scan in single direct call.  The bounds can arrive
162  * in any order.  Attribute ids are those of index table.
163  *
164  * Replace EQ by equivalent LE + GE.  Check for conflicting bounds.
165  * Check that sets of lower and upper bounds are on initial sequences of
166  * keys and that all but possibly last bound is non-strict.
167  *
168  * Finally convert the sets of lower and upper bounds (i.e. start key
169  * and end key) to NdbPack format.  The data is saved in segmented
170  * memory.  The bound is reconstructed at use time via unpackBound().
171  *
172  * Error handling:  Error code is set in the scan and also returned in
173  * EXECUTE_DIRECT (the old way).
174  */
175 void
execTUX_BOUND_INFO(Signal * signal)176 Dbtux::execTUX_BOUND_INFO(Signal* signal)
177 {
178   jamEntry();
179   // get records
180   TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
181   ScanOpPtr scanPtr;
182   scanPtr.i = req->tuxScanPtrI;
183   c_scanOpPool.getPtr(scanPtr);
184   ScanOp& scan = *scanPtr.p;
185   const Index& index = *c_indexPool.getPtr(scan.m_indexId);
186   // compiler warning unused: const DescHead& descHead = getDescHead(index);
187   // compiler warning unused: const KeyType* keyTypes = getKeyTypes(descHead);
188   // data passed in Signal
189   const Uint32* const boundData = &req->data[0];
190   Uint32 boundLen = req->boundAiLength;
191   Uint32 boundOffset = 0;
192   // initialize stats scan
193   if (unlikely(scan.m_statOpPtrI != RNIL)) {
194     // stats options before bounds
195     StatOpPtr statPtr;
196     statPtr.i = scan.m_statOpPtrI;
197     c_statOpPool.getPtr(statPtr);
198     Uint32 usedLen = 0;
199     if (statScanInit(statPtr, boundData, boundLen, &usedLen) == -1) {
200       jam();
201       ndbrequire(scan.m_errorCode != 0);
202       req->errorCode = scan.m_errorCode;
203       return;
204     }
205     ndbrequire(usedLen <= boundLen);
206     boundLen -= usedLen;
207     boundOffset += usedLen;
208   }
209   // extract lower and upper bound in separate passes
210   for (unsigned idir = 0; idir <= 1; idir++) {
211     jam();
212     struct BoundInfo {
213       int type2;      // with EQ -> LE/GE
214       Uint32 offset;  // word offset in signal data
215       Uint32 bytes;
216     };
217     BoundInfo boundInfo[MaxIndexAttributes];
218     // largest attrId seen plus one
219     Uint32 maxAttrId = 0;
220     const Uint32* const data = &boundData[boundOffset];
221     Uint32 offset = 0;
222     while (offset + 2 <= boundLen) {
223       jam();
224       const Uint32 type = data[offset];
225       const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
226       const Uint32 attrId = ah->getAttributeId();
227       const Uint32 byteSize = ah->getByteSize();
228       const Uint32 dataSize = ah->getDataSize();
229       // check type
230       if (unlikely(type > 4)) {
231         jam();
232         scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
233         req->errorCode = scan.m_errorCode;
234         return;
235       }
236       Uint32 type2 = type;
237       if (type2 == 4) {
238         jam();
239         type2 = (idir << 1); // LE=0 GE=2
240       }
241       // check if attribute belongs to this bound
242       if ((type2 & 0x2) == (idir << 1)) {
243         if (unlikely(attrId >= index.m_numAttrs)) {
244           jam();
245           scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
246           req->errorCode = scan.m_errorCode;
247           return;
248         }
249         // mark entries in any gap as undefined
250         while (maxAttrId <= attrId) {
251           jam();
252           BoundInfo& b = boundInfo[maxAttrId];
253           b.type2 = -1;
254           maxAttrId++;
255         }
256         BoundInfo& b = boundInfo[attrId];
257         // duplicate no longer allowed (wl#4163)
258         if (unlikely(b.type2 != -1)) {
259           jam();
260           scan.m_errorCode = TuxBoundInfo::InvalidBounds;
261           req->errorCode = scan.m_errorCode;
262           return;
263         }
264         b.type2 = (int)type2;
265         b.offset = offset + 1; // poai
266         b.bytes = byteSize;
267       }
268       // jump to next
269       offset += 2 + dataSize;
270     }
271     if (unlikely(offset != boundLen)) {
272       jam();
273       scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
274       req->errorCode = scan.m_errorCode;
275       return;
276     }
277     // check and pack the bound data
278     KeyData searchBoundData(index.m_keySpec, true, 0);
279     KeyBound searchBound(searchBoundData);
280     searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
281     int strict = 0; // 0 or 1
282     Uint32 i;
283     for (i = 0; i < maxAttrId; i++) {
284       jam();
285       const BoundInfo& b = boundInfo[i];
286        // check for gap or strict bound before last
287        strict = (b.type2 & 0x1);
288        if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
289          jam();
290          scan.m_errorCode = TuxBoundInfo::InvalidBounds;
291          req->errorCode = scan.m_errorCode;
292          return;
293        }
294        Uint32 len;
295        if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
296            b.bytes != len)) {
297          jam();
298          scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
299          req->errorCode = scan.m_errorCode;
300          return;
301        }
302     }
303     int side = 0;
304     if (maxAttrId != 0) {
305       // arithmetic is faster
306       // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
307       side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
308     }
309     if (unlikely(searchBound.finalize(side) == -1)) {
310       jam();
311       scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
312       req->errorCode = scan.m_errorCode;
313       return;
314     }
315     ScanBound& scanBound = scan.m_scanBound[idir];
316     scanBound.m_cnt = maxAttrId;
317     scanBound.m_side = side;
318     // save data words in segmented memory
319     {
320       DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
321       LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
322       const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
323       Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
324       bool ok = b.append(data, size);
325       if (unlikely(!ok)) {
326         jam();
327         scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
328         req->errorCode = scan.m_errorCode;
329         return;
330       }
331     }
332   }
333   if (ERROR_INSERTED(12009)) {
334     jam();
335     CLEAR_ERROR_INSERT_VALUE;
336     scan.m_errorCode = TuxBoundInfo::InvalidBounds;
337     req->errorCode = scan.m_errorCode;
338     return;
339   }
340   // no error
341   req->errorCode = 0;
342 }
343 
344 void
execNEXT_SCANREQ(Signal * signal)345 Dbtux::execNEXT_SCANREQ(Signal* signal)
346 {
347   jamEntry();
348   const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
349   const NextScanReq* const req = &reqCopy;
350   ScanOpPtr scanPtr;
351   scanPtr.i = req->accPtr;
352   c_scanOpPool.getPtr(scanPtr);
353   ScanOp& scan = *scanPtr.p;
354   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
355 #ifdef VM_TRACE
356   if (debugFlags & DebugScan) {
357     debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
358   }
359 #endif
360   // handle unlock previous and close scan
361   switch (req->scanFlag) {
362   case NextScanReq::ZSCAN_NEXT:
363     jam();
364     break;
365   case NextScanReq::ZSCAN_NEXT_COMMIT:
366     jam();
367   case NextScanReq::ZSCAN_COMMIT:
368     jam();
369     if (! scan.m_readCommitted) {
370       jam();
371       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
372       lockReq->returnCode = RNIL;
373       lockReq->requestInfo = AccLockReq::Unlock;
374       lockReq->accOpPtr = req->accOperationPtr;
375       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
376       jamEntry();
377       ndbrequire(lockReq->returnCode == AccLockReq::Success);
378       removeAccLockOp(scanPtr, req->accOperationPtr);
379     }
380     if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
381       jam();
382       signal->theData[0] = 0; /* Success */
383       /**
384        * Return with signal->theData[0] = 0 means a return
385        * signal NEXT_SCANCONF for NextScanReq::ZSCAN_COMMIT
386        */
387       return;
388     }
389     break;
390   case NextScanReq::ZSCAN_CLOSE:
391     jam();
392     // unlink from tree node first to avoid state changes
393     if (scan.m_scanPos.m_loc != NullTupLoc) {
394       jam();
395       const TupLoc loc = scan.m_scanPos.m_loc;
396       NodeHandle node(frag);
397       selectNode(node, loc);
398       unlinkScan(node, scanPtr);
399       scan.m_scanPos.m_loc = NullTupLoc;
400     }
401     if (scan.m_lockwait) {
402       jam();
403       ndbrequire(scan.m_accLockOp != RNIL);
404       // use ACC_ABORTCONF to flush out any reply in job buffer
405       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
406       lockReq->returnCode = RNIL;
407       lockReq->requestInfo = AccLockReq::AbortWithConf;
408       lockReq->accOpPtr = scan.m_accLockOp;
409       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
410 		     AccLockReq::UndoSignalLength);
411       jamEntry();
412       ndbrequire(lockReq->returnCode == AccLockReq::Success);
413       scan.m_state = ScanOp::Aborting;
414       return;
415     }
416     if (scan.m_state == ScanOp::Locked) {
417       jam();
418       ndbrequire(scan.m_accLockOp != RNIL);
419       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
420       lockReq->returnCode = RNIL;
421       lockReq->requestInfo = AccLockReq::Abort;
422       lockReq->accOpPtr = scan.m_accLockOp;
423       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
424 		     AccLockReq::UndoSignalLength);
425       jamEntry();
426       ndbrequire(lockReq->returnCode == AccLockReq::Success);
427       scan.m_accLockOp = RNIL;
428     }
429     scan.m_state = ScanOp::Aborting;
430     scanClose(signal, scanPtr);
431     return;
432   case NextScanReq::ZSCAN_NEXT_ABORT:
433     jam();
434   default:
435     jam();
436     ndbrequire(false);
437     break;
438   }
439   // start looking for next scan result
440   AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
441   checkReq->accPtr = scanPtr.i;
442   checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
443   EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
444   jamEntry();
445 }
446 
447 void
execACC_CHECK_SCAN(Signal * signal)448 Dbtux::execACC_CHECK_SCAN(Signal* signal)
449 {
450   jamEntry();
451   const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
452   const AccCheckScan* const req = &reqCopy;
453   ScanOpPtr scanPtr;
454   scanPtr.i = req->accPtr;
455   c_scanOpPool.getPtr(scanPtr);
456   ScanOp& scan = *scanPtr.p;
457   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
458 #ifdef VM_TRACE
459   if (debugFlags & DebugScan) {
460     debugOut << "ACC_CHECK_SCAN scan " << scanPtr.i << " " << scan << endl;
461   }
462 #endif
463   if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
464     jam();
465     signal->theData[0] = scan.m_userPtr;
466     signal->theData[1] = true;
467     EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
468     jamEntry();
469     return;   // stop
470   }
471   if (scan.m_lockwait) {
472     jam();
473     // LQH asks if we are waiting for lock and we tell it to ask again
474     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
475     conf->scanPtr = scan.m_userPtr;
476     conf->accOperationPtr = RNIL;       // no tuple returned
477     conf->fragId = frag.m_fragId;
478     unsigned signalLength = 3;
479     // if TC has ordered scan close, it will be detected here
480     sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
481                signal, signalLength, JBB);
482     return;     // stop
483   }
484   // check index online
485   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
486   if (unlikely(index.m_state != Index::Online) &&
487       scanPtr.p->m_errorCode == 0) {
488     jam();
489 #ifdef VM_TRACE
490     if (debugFlags & (DebugMeta | DebugScan)) {
491       debugOut << "Index dropping at execACC_CHECK_SCAN " << scanPtr.i << " " << *scanPtr.p << endl;
492     }
493 #endif
494     scanPtr.p->m_errorCode = AccScanRef::TuxIndexNotOnline;
495   }
496   if (scan.m_state == ScanOp::First) {
497     jam();
498     // search is done only once in single range scan
499     scanFirst(scanPtr);
500   }
501   if (scan.m_state == ScanOp::Current ||
502       scan.m_state == ScanOp::Next) {
503     jam();
504     // look for next
505     scanFind(scanPtr);
506   }
507   // for reading tuple key in Found or Locked state
508   Uint32* pkData = c_ctx.c_dataBuffer;
509   unsigned pkSize = 0; // indicates not yet done
510   if (scan.m_state == ScanOp::Found) {
511     // found an entry to return
512     jam();
513     ndbrequire(scan.m_accLockOp == RNIL);
514     if (! scan.m_readCommitted) {
515       jam();
516       const TreeEnt ent = scan.m_scanEnt;
517       // read tuple key
518       readTablePk(frag, ent, pkData, pkSize);
519       // get read lock or exclusive lock
520       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
521       lockReq->returnCode = RNIL;
522       lockReq->requestInfo =
523         scan.m_lockMode == 0 ? AccLockReq::LockShared : AccLockReq::LockExclusive;
524       lockReq->accOpPtr = RNIL;
525       lockReq->userPtr = scanPtr.i;
526       lockReq->userRef = reference();
527       lockReq->tableId = scan.m_tableId;
528       lockReq->fragId = frag.m_fragId;
529       lockReq->fragPtrI = frag.m_accTableFragPtrI;
530       const Uint32* const buf32 = static_cast<Uint32*>(pkData);
531       const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
532       lockReq->hashValue = md5_hash(buf64, pkSize);
533       Uint32 lkey1, lkey2;
534       getTupAddr(frag, ent, lkey1, lkey2);
535       lockReq->page_id = lkey1;
536       lockReq->page_idx = lkey2;
537       lockReq->transId1 = scan.m_transId1;
538       lockReq->transId2 = scan.m_transId2;
539       // execute
540       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::LockSignalLength);
541       jamEntry();
542       switch (lockReq->returnCode) {
543       case AccLockReq::Success:
544         jam();
545         scan.m_state = ScanOp::Locked;
546         scan.m_accLockOp = lockReq->accOpPtr;
547 #ifdef VM_TRACE
548         if (debugFlags & (DebugScan | DebugLock)) {
549           debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl;
550         }
551 #endif
552         break;
553       case AccLockReq::IsBlocked:
554         jam();
555         // normal lock wait
556         scan.m_state = ScanOp::Blocked;
557         scan.m_lockwait = true;
558         scan.m_accLockOp = lockReq->accOpPtr;
559 #ifdef VM_TRACE
560         if (debugFlags & (DebugScan | DebugLock)) {
561           debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl;
562         }
563 #endif
564         // LQH will wake us up
565         signal->theData[0] = scan.m_userPtr;
566         signal->theData[1] = true;
567         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
568         jamEntry();
569         return;  // stop
570         break;
571       case AccLockReq::Refused:
572         jam();
573         // we cannot see deleted tuple (assert only)
574         ndbassert(false);
575         // skip it
576         scan.m_state = ScanOp::Next;
577         signal->theData[0] = scan.m_userPtr;
578         signal->theData[1] = true;
579         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
580         jamEntry();
581         return;  // stop
582         break;
583       case AccLockReq::NoFreeOp:
584         jam();
585         // max ops should depend on max scans (assert only)
586         ndbassert(false);
587         // stay in Found state
588         scan.m_state = ScanOp::Found;
589         signal->theData[0] = scan.m_userPtr;
590         signal->theData[1] = true;
591         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
592         jamEntry();
593         return;  // stop
594         break;
595       default:
596         ndbrequire(false);
597         break;
598       }
599     } else {
600       scan.m_state = ScanOp::Locked;
601     }
602   }
603   if (scan.m_state == ScanOp::Locked) {
604     // we have lock or do not need one
605     jam();
606     // read keys if not already done (uses signal)
607     const TreeEnt ent = scan.m_scanEnt;
608     // conf signal
609     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
610     conf->scanPtr = scan.m_userPtr;
611     // the lock is passed to LQH
612     Uint32 accLockOp = scan.m_accLockOp;
613     if (accLockOp != RNIL) {
614       scan.m_accLockOp = RNIL;
615       // remember it until LQH unlocks it
616       addAccLockOp(scanPtr, accLockOp);
617     } else {
618       ndbrequire(scan.m_readCommitted);
619       // operation RNIL in LQH would signal no tuple returned
620       accLockOp = (Uint32)-1;
621     }
622     conf->accOperationPtr = accLockOp;
623     conf->fragId = frag.m_fragId;
624     Uint32 lkey1, lkey2;
625     getTupAddr(frag, ent, lkey1, lkey2);
626     conf->localKey[0] = lkey1;
627     conf->localKey[1] = lkey2;
628     unsigned signalLength = 5;
629     // add key info
630     // next time look for next entry
631     scan.m_state = ScanOp::Next;
632     /* We need primary table fragment id here, not index fragment id */
633     c_tup->prepareTUPKEYREQ(lkey1, lkey2, frag.m_tupTableFragPtrI);
634     const Uint32 blockNo = refToMain(scan.m_userRef);
635     EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
636     return;
637   }
638   // XXX in ACC this is checked before req->checkLcpStop
639   if (scan.m_state == ScanOp::Last) {
640     jam();
641     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
642     conf->scanPtr = scan.m_userPtr;
643     conf->accOperationPtr = RNIL;
644     conf->fragId = RNIL;
645     unsigned signalLength = 3;
646     Uint32 blockNo = refToMain(scan.m_userRef);
647     EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
648     return;
649   }
650   ndbrequire(false);
651 }
652 
653 /*
654  * Lock succeeded (after delay) in ACC.  If the lock is for current
655  * entry, set state to Locked.  If the lock is for an entry we were
656  * moved away from, simply unlock it.  Finally, if we are closing the
657  * scan, do nothing since we have already sent an abort request.
658  */
659 void
execACCKEYCONF(Signal * signal)660 Dbtux::execACCKEYCONF(Signal* signal)
661 {
662   jamEntry();
663   ScanOpPtr scanPtr;
664   scanPtr.i = signal->theData[0];
665   c_scanOpPool.getPtr(scanPtr);
666   ScanOp& scan = *scanPtr.p;
667 #ifdef VM_TRACE
668   if (debugFlags & (DebugScan | DebugLock)) {
669     debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl;
670   }
671 #endif
672   ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
673   scan.m_lockwait = false;
674   if (scan.m_state == ScanOp::Blocked) {
675     // the lock wait was for current entry
676     jam();
677     scan.m_state = ScanOp::Locked;
678     // LQH has the ball
679     return;
680   }
681   if (scan.m_state != ScanOp::Aborting) {
682     // we were moved, release lock
683     jam();
684     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
685     lockReq->returnCode = RNIL;
686     lockReq->requestInfo = AccLockReq::Abort;
687     lockReq->accOpPtr = scan.m_accLockOp;
688     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
689     jamEntry();
690     ndbrequire(lockReq->returnCode == AccLockReq::Success);
691     scan.m_accLockOp = RNIL;
692     // LQH has the ball
693     return;
694   }
695   // lose the lock
696   scan.m_accLockOp = RNIL;
697   // continue at ACC_ABORTCONF
698 }
699 
700 /*
701  * Lock failed (after delay) in ACC.  Probably means somebody ahead of
702  * us in lock queue deleted the tuple.
703  */
704 void
execACCKEYREF(Signal * signal)705 Dbtux::execACCKEYREF(Signal* signal)
706 {
707   jamEntry();
708   ScanOpPtr scanPtr;
709   scanPtr.i = signal->theData[0];
710   c_scanOpPool.getPtr(scanPtr);
711   ScanOp& scan = *scanPtr.p;
712 #ifdef VM_TRACE
713   if (debugFlags & (DebugScan | DebugLock)) {
714     debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl;
715   }
716 #endif
717   ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
718   scan.m_lockwait = false;
719   if (scan.m_state != ScanOp::Aborting) {
720     jam();
721     // release the operation
722     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
723     lockReq->returnCode = RNIL;
724     lockReq->requestInfo = AccLockReq::Abort;
725     lockReq->accOpPtr = scan.m_accLockOp;
726     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
727     jamEntry();
728     ndbrequire(lockReq->returnCode == AccLockReq::Success);
729     scan.m_accLockOp = RNIL;
730     // scan position should already have been moved (assert only)
731     if (scan.m_state == ScanOp::Blocked) {
732       jam();
733       // can happen when Dropping
734 #ifdef VM_TRACE
735       const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
736       const Index& index = *c_indexPool.getPtr(frag.m_indexId);
737       ndbassert(index.m_state != Index::Online);
738 #endif
739       scan.m_state = ScanOp::Next;
740     }
741     // LQH has the ball
742     return;
743   }
744   // lose the lock
745   scan.m_accLockOp = RNIL;
746   // continue at ACC_ABORTCONF
747 }
748 
749 /*
750  * Received when scan is closing.  This signal arrives after any
751  * ACCKEYCON or ACCKEYREF which may have been in job buffer.
752  */
753 void
execACC_ABORTCONF(Signal * signal)754 Dbtux::execACC_ABORTCONF(Signal* signal)
755 {
756   jamEntry();
757   ScanOpPtr scanPtr;
758   scanPtr.i = signal->theData[0];
759   c_scanOpPool.getPtr(scanPtr);
760   ScanOp& scan = *scanPtr.p;
761 #ifdef VM_TRACE
762   if (debugFlags & (DebugScan | DebugLock)) {
763     debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl;
764   }
765 #endif
766   ndbrequire(scan.m_state == ScanOp::Aborting);
767   // most likely we are still in lock wait
768   if (scan.m_lockwait) {
769     jam();
770     scan.m_lockwait = false;
771     scan.m_accLockOp = RNIL;
772   }
773   scanClose(signal, scanPtr);
774 }
775 
776 /*
777  * Find start position for single range scan.
778  */
779 void
scanFirst(ScanOpPtr scanPtr)780 Dbtux::scanFirst(ScanOpPtr scanPtr)
781 {
782   ScanOp& scan = *scanPtr.p;
783   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
784   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
785 #ifdef VM_TRACE
786   if (debugFlags & DebugScan) {
787     debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
788   }
789 #endif
790   // scan direction 0, 1
791   const unsigned idir = scan.m_descending;
792   // set up bound from segmented memory
793   const ScanBound& scanBound = scan.m_scanBound[idir];
794   KeyDataC searchBoundData(index.m_keySpec, true);
795   KeyBoundC searchBound(searchBoundData);
796   unpackBound(c_ctx, scanBound, searchBound);
797   TreePos treePos;
798   searchToScan(frag, idir, searchBound, treePos);
799   if (treePos.m_loc != NullTupLoc) {
800     scan.m_scanPos = treePos;
801     // link the scan to node found
802     NodeHandle node(frag);
803     selectNode(node, treePos.m_loc);
804     linkScan(node, scanPtr);
805     if (treePos.m_dir == 3) {
806       jam();
807       // check upper bound
808       TreeEnt ent = node.getEnt(treePos.m_pos);
809       if (scanCheck(scanPtr, ent)) {
810         jam();
811         scan.m_state = ScanOp::Current;
812       } else {
813         jam();
814         scan.m_state = ScanOp::Last;
815       }
816     } else {
817       jam();
818       scan.m_state = ScanOp::Next;
819     }
820   } else {
821     jam();
822     scan.m_state = ScanOp::Last;
823   }
824 #ifdef VM_TRACE
825   if (debugFlags & DebugScan) {
826     debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
827   }
828 #endif
829 }
830 
831 /*
832  * Look for entry to return as scan result.
833  */
834 void
scanFind(ScanOpPtr scanPtr)835 Dbtux::scanFind(ScanOpPtr scanPtr)
836 {
837   ScanOp& scan = *scanPtr.p;
838   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
839 #ifdef VM_TRACE
840   if (debugFlags & DebugScan) {
841     debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
842   }
843 #endif
844   ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
845   while (1) {
846     jam();
847     if (scan.m_state == ScanOp::Next)
848       scanNext(scanPtr, false);
849     if (scan.m_state == ScanOp::Current) {
850       jam();
851       const TreePos pos = scan.m_scanPos;
852       NodeHandle node(frag);
853       selectNode(node, pos.m_loc);
854       const TreeEnt ent = node.getEnt(pos.m_pos);
855       if (unlikely(scan.m_statOpPtrI != RNIL)) {
856         StatOpPtr statPtr;
857         statPtr.i = scan.m_statOpPtrI;
858         c_statOpPool.getPtr(statPtr);
859         // report row to stats, returns true if a sample is available
860         int ret = statScanAddRow(statPtr, ent);
861         if (ret == 1) {
862           jam();
863           scan.m_state = ScanOp::Found;
864           // may not access non-pseudo cols but must return valid ent
865           scan.m_scanEnt = ent;
866           break;
867         }
868       } else if (scanVisible(scanPtr, ent)) {
869         jam();
870         scan.m_state = ScanOp::Found;
871         scan.m_scanEnt = ent;
872         break;
873       }
874     } else {
875       jam();
876       break;
877     }
878     scan.m_state = ScanOp::Next;
879   }
880 #ifdef VM_TRACE
881   if (debugFlags & DebugScan) {
882     debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
883   }
884 #endif
885 }
886 
887 /*
888  * Move to next entry.  The scan is already linked to some node.  When
889  * we leave, if an entry was found, it will be linked to a possibly
890  * different node.  The scan has a position, and a direction which tells
891  * from where we came to this position.  This is one of (all comments
892  * are in terms of ascending scan):
893  *
894  * 0 - up from left child (scan this node next)
895  * 1 - up from right child (proceed to parent)
896  * 2 - up from root (the scan ends)
897  * 3 - left to right within node (at end set state 5)
898  * 4 - down from parent (proceed to left child)
899  * 5 - at node end proceed to right child (state becomes 4)
900  *
901  * If an entry was found, scan direction is 3.  Therefore tree
902  * re-organizations need not worry about scan direction.
903  *
904  * This method is also used to move a scan when its entry is removed
905  * (see moveScanList).  If the scan is Blocked, we check if it remains
906  * Blocked on a different version of the tuple.  Otherwise the tuple is
907  * lost and state becomes Current.
908  */
909 void
scanNext(ScanOpPtr scanPtr,bool fromMaintReq)910 Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
911 {
912   ScanOp& scan = *scanPtr.p;
913   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
914 #ifdef VM_TRACE
915   if (debugFlags & (DebugMaint | DebugScan)) {
916     debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
917   }
918 #endif
919   // cannot be moved away from tuple we have locked
920 #if defined VM_TRACE || defined ERROR_INSERT
921   ndbrequire(scan.m_state != ScanOp::Locked);
922 #else
923   ndbrequire(fromMaintReq || scan.m_state != ScanOp::Locked);
924 #endif
925   // scan direction
926   const unsigned idir = scan.m_descending; // 0, 1
927   const int jdir = 1 - 2 * (int)idir;      // 1, -1
928   // use copy of position
929   TreePos pos = scan.m_scanPos;
930   // get and remember original node
931   NodeHandle origNode(frag);
932   selectNode(origNode, pos.m_loc);
933   ndbrequire(islinkScan(origNode, scanPtr));
934   if (unlikely(scan.m_state == ScanOp::Locked)) {
935     // bug#32040 - no fix, just unlock and continue
936     jam();
937     if (scan.m_accLockOp != RNIL) {
938       jam();
939       Signal* signal = c_signal_bug32040;
940       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
941       lockReq->returnCode = RNIL;
942       lockReq->requestInfo = AccLockReq::Abort;
943       lockReq->accOpPtr = scan.m_accLockOp;
944       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
945       jamEntry();
946       ndbrequire(lockReq->returnCode == AccLockReq::Success);
947       scan.m_accLockOp = RNIL;
948       scan.m_lockwait = false;
949     }
950     scan.m_state = ScanOp::Next;
951   }
952   // current node in loop
953   NodeHandle node = origNode;
954   // copy of entry found
955   TreeEnt ent;
956   while (true) {
957     jam();
958 #ifdef VM_TRACE
959     if (debugFlags & (DebugMaint | DebugScan)) {
960       debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
961     }
962 #endif
963     if (pos.m_dir == 2) {
964       // coming up from root ends the scan
965       jam();
966       pos.m_loc = NullTupLoc;
967       break;
968     }
969     if (node.m_loc != pos.m_loc) {
970       jam();
971       selectNode(node, pos.m_loc);
972     }
973     if (pos.m_dir == 4) {
974       // coming down from parent proceed to left child
975       jam();
976       TupLoc loc = node.getLink(idir);
977       if (loc != NullTupLoc) {
978         jam();
979         pos.m_loc = loc;
980         pos.m_dir = 4;  // unchanged
981         continue;
982       }
983       // pretend we came from left child
984       pos.m_dir = idir;
985     }
986     if (pos.m_dir == 5) {
987       // at node end proceed to right child
988       jam();
989       TupLoc loc = node.getLink(1 - idir);
990       if (loc != NullTupLoc) {
991         jam();
992         pos.m_loc = loc;
993         pos.m_dir = 4;  // down from parent as usual
994         continue;
995       }
996       // pretend we came from right child
997       pos.m_dir = 1 - idir;
998     }
999     const unsigned occup = node.getOccup();
1000     if (occup == 0) {
1001       jam();
1002       ndbrequire(fromMaintReq);
1003       // move back to parent - see comment in treeRemoveInner
1004       pos.m_loc = node.getLink(2);
1005       pos.m_dir = node.getSide();
1006       continue;
1007     }
1008     if (pos.m_dir == idir) {
1009       // coming up from left child scan current node
1010       jam();
1011       pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
1012       pos.m_dir = 3;
1013     }
1014     if (pos.m_dir == 3) {
1015       // before or within node
1016       jam();
1017       // advance position - becomes ZNIL (> occup) if 0 and descending
1018       pos.m_pos += jdir;
1019       if (pos.m_pos < occup) {
1020         jam();
1021         pos.m_dir = 3;  // unchanged
1022         ent = node.getEnt(pos.m_pos);
1023         if (! scanCheck(scanPtr, ent)) {
1024           jam();
1025           pos.m_loc = NullTupLoc;
1026         }
1027         break;
1028       }
1029       // after node proceed to right child
1030       pos.m_dir = 5;
1031       continue;
1032     }
1033     if (pos.m_dir == 1 - idir) {
1034       // coming up from right child proceed to parent
1035       jam();
1036       pos.m_loc = node.getLink(2);
1037       pos.m_dir = node.getSide();
1038       continue;
1039     }
1040     ndbrequire(false);
1041   }
1042   // copy back position
1043   scan.m_scanPos = pos;
1044   // relink
1045   if (pos.m_loc != NullTupLoc) {
1046     ndbrequire(pos.m_dir == 3);
1047     ndbrequire(pos.m_loc == node.m_loc);
1048     if (origNode.m_loc != node.m_loc) {
1049       jam();
1050       unlinkScan(origNode, scanPtr);
1051       linkScan(node, scanPtr);
1052     }
1053     if (scan.m_state != ScanOp::Blocked) {
1054       scan.m_state = ScanOp::Current;
1055     } else {
1056       jam();
1057       ndbrequire(fromMaintReq);
1058       TreeEnt& scanEnt = scan.m_scanEnt;
1059       ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
1060       if (scanEnt.eqtuple(ent)) {
1061         // remains blocked on another version
1062         scanEnt = ent;
1063       } else {
1064         jam();
1065         scanEnt.m_tupLoc = NullTupLoc;
1066         scan.m_state = ScanOp::Current;
1067       }
1068     }
1069   } else {
1070     jam();
1071     unlinkScan(origNode, scanPtr);
1072     scan.m_state = ScanOp::Last;
1073   }
1074 #ifdef VM_TRACE
1075   if (debugFlags & (DebugMaint | DebugScan)) {
1076     debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
1077   }
1078 #endif
1079 }
1080 
1081 /*
1082  * Check end key.  Return true if scan is still within range.
1083  *
1084  * Error handling:  If scan error code has been set, return false at
1085  * once.  This terminates the scan and also avoids kernel crash on
1086  * invalid data.
1087  */
1088 bool
scanCheck(ScanOpPtr scanPtr,TreeEnt ent)1089 Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
1090 {
1091   ScanOp& scan = *scanPtr.p;
1092   if (unlikely(scan.m_errorCode != 0)) {
1093     jam();
1094     return false;
1095   }
1096   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1097   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
1098   const unsigned idir = scan.m_descending;
1099   const int jdir = 1 - 2 * (int)idir;
1100   const ScanBound& scanBound = scan.m_scanBound[1 - idir];
1101   int ret = 0;
1102   if (scanBound.m_cnt != 0) {
1103     jam();
1104     // set up bound from segmented memory
1105     KeyDataC searchBoundData(index.m_keySpec, true);
1106     KeyBoundC searchBound(searchBoundData);
1107     unpackBound(c_ctx, scanBound, searchBound);
1108     // key data for the entry
1109     KeyData entryKey(index.m_keySpec, true, 0);
1110     entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
1111     readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
1112     // compare bound to key
1113     const Uint32 boundCount = searchBound.get_data().get_cnt();
1114     ret = cmpSearchBound(c_ctx, searchBound, entryKey, boundCount);
1115     ndbrequire(ret != 0);
1116     ret = (-1) * ret; // reverse for key vs bound
1117     ret = jdir * ret; // reverse for descending scan
1118   }
1119 #ifdef VM_TRACE
1120   if (debugFlags & DebugScan) {
1121     debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
1122   }
1123 #endif
1124   return (ret <= 0);
1125 }
1126 
1127 /*
1128  * Check if an entry is visible to the scan.
1129  *
1130  * There is a special check to never accept same tuple twice in a row.
1131  * This is faster than asking TUP.  It also fixes some special cases
1132  * which are not analyzed or handled yet.
1133  *
1134  * Error handling:  If scan error code has been set, return false since
1135  * no new result can be returned to LQH.  The scan will then look for
1136  * next result and terminate via scanCheck():
1137  */
1138 bool
scanVisible(ScanOpPtr scanPtr,TreeEnt ent)1139 Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
1140 {
1141   const ScanOp& scan = *scanPtr.p;
1142   if (unlikely(scan.m_errorCode != 0)) {
1143     jam();
1144     return false;
1145   }
1146   const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1147   Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
1148   Uint32 pageId = ent.m_tupLoc.getPageId();
1149   Uint32 pageOffset = ent.m_tupLoc.getPageOffset();
1150   Uint32 tupVersion = ent.m_tupVersion;
1151   // check for same tuple twice in row
1152   if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc)
1153   {
1154     jam();
1155     return false;
1156   }
1157   Uint32 transId1 = scan.m_transId1;
1158   Uint32 transId2 = scan.m_transId2;
1159   bool dirty = scan.m_readCommitted;
1160   Uint32 savePointId = scan.m_savePointId;
1161   bool ret = c_tup->tuxQueryTh(tableFragPtrI, pageId, pageOffset, tupVersion, transId1, transId2, dirty, savePointId);
1162   jamEntry();
1163   return ret;
1164 }
1165 
1166 /*
1167  * Finish closing of scan and send conf.  Any lock wait has been done
1168  * already.
1169  *
1170  * Error handling:  Every scan ends here.  If error code has been set,
1171  * send a REF.
1172  */
1173 void
scanClose(Signal * signal,ScanOpPtr scanPtr)1174 Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
1175 {
1176   ScanOp& scan = *scanPtr.p;
1177   ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
1178   // unlock all not unlocked by LQH
1179   if (! scan.m_accLockOps.isEmpty()) {
1180     jam();
1181     abortAccLockOps(signal, scanPtr);
1182   }
1183   Uint32 blockNo = refToMain(scanPtr.p->m_userRef);
1184   if (scanPtr.p->m_errorCode == 0) {
1185     jam();
1186     // send conf
1187     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1188     conf->scanPtr = scanPtr.p->m_userPtr;
1189     conf->accOperationPtr = RNIL;
1190     conf->fragId = RNIL;
1191     unsigned signalLength = 3;
1192     releaseScanOp(scanPtr);
1193     EXECUTE_DIRECT(blockNo,
1194                    GSN_NEXT_SCANCONF,
1195                    signal,
1196                    signalLength);
1197   } else {
1198     // send ref
1199     NextScanRef* ref = (NextScanRef*)signal->getDataPtr();
1200     ref->scanPtr = scanPtr.p->m_userPtr;
1201     ref->accOperationPtr = RNIL;
1202     ref->fragId = RNIL;
1203     ref->errorCode = scanPtr.p->m_errorCode;
1204     releaseScanOp(scanPtr);
1205     EXECUTE_DIRECT(blockNo,
1206                    GSN_NEXT_SCANREF,
1207                    signal,
1208                    NextScanRef::SignalLength);
1209   }
1210 }
1211 
1212 void
abortAccLockOps(Signal * signal,ScanOpPtr scanPtr)1213 Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr)
1214 {
1215   ScanOp& scan = *scanPtr.p;
1216 #ifdef VM_TRACE
1217   if (debugFlags & (DebugScan | DebugLock)) {
1218     debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl;
1219   }
1220 #endif
1221   LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1222   ScanLockPtr lockPtr;
1223   while (list.first(lockPtr)) {
1224     jam();
1225     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
1226     lockReq->returnCode = RNIL;
1227     lockReq->requestInfo = AccLockReq::Abort;
1228     lockReq->accOpPtr = lockPtr.p->m_accLockOp;
1229     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
1230     jamEntry();
1231     ndbrequire(lockReq->returnCode == AccLockReq::Success);
1232     list.release(lockPtr);
1233   }
1234 }
1235 
1236 void
addAccLockOp(ScanOpPtr scanPtr,Uint32 accLockOp)1237 Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1238 {
1239   ScanOp& scan = *scanPtr.p;
1240 #ifdef VM_TRACE
1241   if (debugFlags & (DebugScan | DebugLock)) {
1242     debugOut << "Add lock " << hex << accLockOp << dec
1243              << " to scan " << scanPtr.i << " " << scan << endl;
1244   }
1245 #endif
1246   LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1247   ScanLockPtr lockPtr;
1248 #ifdef VM_TRACE
1249   list.first(lockPtr);
1250   while (lockPtr.i != RNIL) {
1251     ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
1252     list.next(lockPtr);
1253   }
1254 #endif
1255   bool ok = list.seizeLast(lockPtr);
1256   ndbrequire(ok);
1257   ndbrequire(accLockOp != RNIL);
1258   lockPtr.p->m_accLockOp = accLockOp;
1259 }
1260 
1261 void
removeAccLockOp(ScanOpPtr scanPtr,Uint32 accLockOp)1262 Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1263 {
1264   ScanOp& scan = *scanPtr.p;
1265 #ifdef VM_TRACE
1266   if (debugFlags & (DebugScan | DebugLock)) {
1267     debugOut << "Remove lock " << hex << accLockOp << dec
1268              << " from scan " << scanPtr.i << " " << scan << endl;
1269   }
1270 #endif
1271   LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1272   ScanLockPtr lockPtr;
1273   list.first(lockPtr);
1274   while (lockPtr.i != RNIL) {
1275     if (lockPtr.p->m_accLockOp == accLockOp) {
1276       jam();
1277       break;
1278     }
1279     list.next(lockPtr);
1280   }
1281   ndbrequire(lockPtr.i != RNIL);
1282   list.release(lockPtr);
1283 }
1284 
1285 /*
1286  * Release allocated records.
1287  */
1288 void
releaseScanOp(ScanOpPtr & scanPtr)1289 Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
1290 {
1291 #ifdef VM_TRACE
1292   if (debugFlags & DebugScan) {
1293     debugOut << "Release scan " << scanPtr.i << " " << *scanPtr.p << endl;
1294   }
1295 #endif
1296   Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
1297   for (unsigned i = 0; i <= 1; i++) {
1298     ScanBound& scanBound = scanPtr.p->m_scanBound[i];
1299     DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
1300     LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
1301     b.release();
1302   }
1303   if (unlikely(scanPtr.p->m_statOpPtrI != RNIL)) {
1304     jam();
1305     StatOpPtr statPtr;
1306     statPtr.i = scanPtr.p->m_statOpPtrI;
1307     c_statOpPool.getPtr(statPtr);
1308     c_statOpPool.release(statPtr);
1309   }
1310   // unlink from per-fragment list and release from pool
1311   frag.m_scanList.release(scanPtr);
1312 }
1313