1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #define DBTUX_SCAN_CPP
26 #include "Dbtux.hpp"
27 #include <my_sys.h>
28
29 /*
30 * Error handling: Any seized scan op is released. ACC_SCANREF is sent
31 * to LQH. LQH sets error code, and treats this like ZEMPTY_FRAGMENT.
32 * Therefore scan is now closed on both sides.
33 */
34 void
execACC_SCANREQ(Signal * signal)35 Dbtux::execACC_SCANREQ(Signal* signal)
36 {
37 jamEntry();
38 const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
39 const AccScanReq* const req = &reqCopy;
40 Uint32 errorCode = 0;
41 ScanOpPtr scanPtr;
42 scanPtr.i = RNIL;
43 do {
44 // get the index
45 IndexPtr indexPtr;
46 c_indexPool.getPtr(indexPtr, req->tableId);
47 // get the fragment
48 FragPtr fragPtr;
49 findFrag(*indexPtr.p, req->fragmentNo, fragPtr);
50 ndbrequire(fragPtr.i != RNIL);
51 Frag& frag = *fragPtr.p;
52 // check for index not Online (i.e. Dropping)
53 if (unlikely(indexPtr.p->m_state != Index::Online)) {
54 jam();
55 #ifdef VM_TRACE
56 if (debugFlags & (DebugMeta | DebugScan)) {
57 debugOut << "Index dropping at ACC_SCANREQ " << indexPtr.i << " " << *indexPtr.p << endl;
58 }
59 #endif
60 errorCode = AccScanRef::TuxIndexNotOnline;
61 break;
62 }
63 // must be normal DIH/TC fragment
64 TreeHead& tree = frag.m_tree;
65 // check for empty fragment
66 if (tree.m_root == NullTupLoc) {
67 jam();
68 AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
69 conf->scanPtr = req->senderData;
70 conf->accPtr = RNIL;
71 conf->flag = AccScanConf::ZEMPTY_FRAGMENT;
72 sendSignal(req->senderRef, GSN_ACC_SCANCONF,
73 signal, AccScanConf::SignalLength, JBB);
74 return;
75 }
76 // seize from pool and link to per-fragment list
77 if (ERROR_INSERTED(12008) ||
78 ! frag.m_scanList.seize(scanPtr)) {
79 CLEAR_ERROR_INSERT_VALUE;
80 jam();
81 // should never happen but can be used to test error handling
82 errorCode = AccScanRef::TuxNoFreeScanOp;
83 break;
84 }
85 new (scanPtr.p) ScanOp;
86 scanPtr.p->m_state = ScanOp::First;
87 scanPtr.p->m_userPtr = req->senderData;
88 scanPtr.p->m_userRef = req->senderRef;
89 scanPtr.p->m_tableId = indexPtr.p->m_tableId;
90 scanPtr.p->m_indexId = indexPtr.i;
91 scanPtr.p->m_fragId = fragPtr.p->m_fragId;
92 scanPtr.p->m_fragPtrI = fragPtr.i;
93 scanPtr.p->m_transId1 = req->transId1;
94 scanPtr.p->m_transId2 = req->transId2;
95 scanPtr.p->m_savePointId = req->savePointId;
96 scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
97 scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
98 scanPtr.p->m_descending = AccScanReq::getDescendingFlag(req->requestInfo);
99 /*
100 * readCommitted lockMode keyInfo
101 * 1 0 0 - read committed (no lock)
102 * 0 0 0 - read latest (read lock)
103 * 0 1 1 - read exclusive (write lock)
104 */
105 const bool isStatScan = AccScanReq::getStatScanFlag(req->requestInfo);
106 if (unlikely(isStatScan)) {
107 jam();
108 if (!scanPtr.p->m_readCommitted) {
109 jam();
110 errorCode = AccScanRef::TuxInvalidLockMode;
111 break;
112 }
113 StatOpPtr statPtr;
114 if (!c_statOpPool.seize(statPtr)) {
115 jam();
116 errorCode = AccScanRef::TuxNoFreeStatOp;
117 break;
118 }
119 scanPtr.p->m_statOpPtrI = statPtr.i;
120 new (statPtr.p) StatOp(*indexPtr.p);
121 statPtr.p->m_scanOpPtrI = scanPtr.i;
122 // rest of StatOp is initialized in execTUX_BOUND_INFO
123 #ifdef VM_TRACE
124 if (debugFlags & DebugStat) {
125 debugOut << "Seize stat op" << endl;
126 }
127 #endif
128 }
129 #ifdef VM_TRACE
130 if (debugFlags & DebugScan) {
131 debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
132 }
133 #endif
134 // conf
135 AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
136 conf->scanPtr = req->senderData;
137 conf->accPtr = scanPtr.i;
138 conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
139 sendSignal(req->senderRef, GSN_ACC_SCANCONF,
140 signal, AccScanConf::SignalLength, JBB);
141 return;
142 } while (0);
143 if (scanPtr.i != RNIL) {
144 jam();
145 releaseScanOp(scanPtr);
146 }
147 // ref
148 ndbrequire(errorCode != 0);
149 AccScanRef* ref = (AccScanRef*)signal->getDataPtrSend();
150 ref->scanPtr = req->senderData;
151 ref->accPtr = RNIL;
152 ref->errorCode = errorCode;
153 sendSignal(req->senderRef, GSN_ACC_SCANREF,
154 signal, AccScanRef::SignalLength, JBB);
155 }
156
157 /*
158 * Receive bounds for scan in single direct call. The bounds can arrive
159 * in any order. Attribute ids are those of index table.
160 *
161 * Replace EQ by equivalent LE + GE. Check for conflicting bounds.
162 * Check that sets of lower and upper bounds are on initial sequences of
163 * keys and that all but possibly last bound is non-strict.
164 *
165 * Finally convert the sets of lower and upper bounds (i.e. start key
166 * and end key) to NdbPack format. The data is saved in segmented
167 * memory. The bound is reconstructed at use time via unpackBound().
168 *
169 * Error handling: Error code is set in the scan and also returned in
170 * EXECUTE_DIRECT (the old way).
171 */
172 void
execTUX_BOUND_INFO(Signal * signal)173 Dbtux::execTUX_BOUND_INFO(Signal* signal)
174 {
175 jamEntry();
176 // get records
177 TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
178 ScanOpPtr scanPtr;
179 scanPtr.i = req->tuxScanPtrI;
180 c_scanOpPool.getPtr(scanPtr);
181 ScanOp& scan = *scanPtr.p;
182 const Index& index = *c_indexPool.getPtr(scan.m_indexId);
183 const DescHead& descHead = getDescHead(index);
184 const KeyType* keyTypes = getKeyTypes(descHead);
185 // data passed in Signal
186 const Uint32* const boundData = &req->data[0];
187 Uint32 boundLen = req->boundAiLength;
188 Uint32 boundOffset = 0;
189 // initialize stats scan
190 if (unlikely(scan.m_statOpPtrI != RNIL)) {
191 // stats options before bounds
192 StatOpPtr statPtr;
193 statPtr.i = scan.m_statOpPtrI;
194 c_statOpPool.getPtr(statPtr);
195 Uint32 usedLen = 0;
196 if (statScanInit(statPtr, boundData, boundLen, &usedLen) == -1) {
197 jam();
198 ndbrequire(scan.m_errorCode != 0);
199 req->errorCode = scan.m_errorCode;
200 return;
201 }
202 ndbrequire(usedLen <= boundLen);
203 boundLen -= usedLen;
204 boundOffset += usedLen;
205 }
206 // extract lower and upper bound in separate passes
207 for (unsigned idir = 0; idir <= 1; idir++) {
208 jam();
209 struct BoundInfo {
210 int type2; // with EQ -> LE/GE
211 Uint32 offset; // word offset in signal data
212 Uint32 bytes;
213 };
214 BoundInfo boundInfo[MaxIndexAttributes];
215 // largest attrId seen plus one
216 Uint32 maxAttrId = 0;
217 const Uint32* const data = &boundData[boundOffset];
218 Uint32 offset = 0;
219 while (offset + 2 <= boundLen) {
220 jam();
221 const Uint32 type = data[offset];
222 const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
223 const Uint32 attrId = ah->getAttributeId();
224 const Uint32 byteSize = ah->getByteSize();
225 const Uint32 dataSize = ah->getDataSize();
226 // check type
227 if (unlikely(type > 4)) {
228 jam();
229 scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
230 req->errorCode = scan.m_errorCode;
231 return;
232 }
233 Uint32 type2 = type;
234 if (type2 == 4) {
235 jam();
236 type2 = (idir << 1); // LE=0 GE=2
237 }
238 // check if attribute belongs to this bound
239 if ((type2 & 0x2) == (idir << 1)) {
240 if (unlikely(attrId >= index.m_numAttrs)) {
241 jam();
242 scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
243 req->errorCode = scan.m_errorCode;
244 return;
245 }
246 // mark entries in any gap as undefined
247 while (maxAttrId <= attrId) {
248 jam();
249 BoundInfo& b = boundInfo[maxAttrId];
250 b.type2 = -1;
251 maxAttrId++;
252 }
253 BoundInfo& b = boundInfo[attrId];
254 // duplicate no longer allowed (wl#4163)
255 if (unlikely(b.type2 != -1)) {
256 jam();
257 scan.m_errorCode = TuxBoundInfo::InvalidBounds;
258 req->errorCode = scan.m_errorCode;
259 return;
260 }
261 b.type2 = (int)type2;
262 b.offset = offset + 1; // poai
263 b.bytes = byteSize;
264 }
265 // jump to next
266 offset += 2 + dataSize;
267 }
268 if (unlikely(offset != boundLen)) {
269 jam();
270 scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
271 req->errorCode = scan.m_errorCode;
272 return;
273 }
274 // check and pack the bound data
275 KeyData searchBoundData(index.m_keySpec, true, 0);
276 KeyBound searchBound(searchBoundData);
277 searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
278 int strict = 0; // 0 or 1
279 Uint32 i;
280 for (i = 0; i < maxAttrId; i++) {
281 jam();
282 const BoundInfo& b = boundInfo[i];
283 // check for gap or strict bound before last
284 strict = (b.type2 & 0x1);
285 if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
286 jam();
287 scan.m_errorCode = TuxBoundInfo::InvalidBounds;
288 req->errorCode = scan.m_errorCode;
289 return;
290 }
291 Uint32 len;
292 if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
293 b.bytes != len)) {
294 jam();
295 scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
296 req->errorCode = scan.m_errorCode;
297 return;
298 }
299 }
300 int side = 0;
301 if (maxAttrId != 0) {
302 // arithmetic is faster
303 // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
304 side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
305 }
306 if (unlikely(searchBound.finalize(side) == -1)) {
307 jam();
308 scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
309 req->errorCode = scan.m_errorCode;
310 return;
311 }
312 ScanBound& scanBound = scan.m_scanBound[idir];
313 scanBound.m_cnt = maxAttrId;
314 scanBound.m_side = side;
315 // save data words in segmented memory
316 {
317 DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
318 LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
319 const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
320 Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
321 bool ok = b.append(data, size);
322 if (unlikely(!ok)) {
323 jam();
324 scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
325 req->errorCode = scan.m_errorCode;
326 return;
327 }
328 }
329 }
330 if (ERROR_INSERTED(12009)) {
331 jam();
332 CLEAR_ERROR_INSERT_VALUE;
333 scan.m_errorCode = TuxBoundInfo::InvalidBounds;
334 req->errorCode = scan.m_errorCode;
335 return;
336 }
337 // no error
338 req->errorCode = 0;
339 }
340
341 void
execNEXT_SCANREQ(Signal * signal)342 Dbtux::execNEXT_SCANREQ(Signal* signal)
343 {
344 jamEntry();
345 const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
346 const NextScanReq* const req = &reqCopy;
347 ScanOpPtr scanPtr;
348 scanPtr.i = req->accPtr;
349 c_scanOpPool.getPtr(scanPtr);
350 ScanOp& scan = *scanPtr.p;
351 Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
352 #ifdef VM_TRACE
353 if (debugFlags & DebugScan) {
354 debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
355 }
356 #endif
357 // handle unlock previous and close scan
358 switch (req->scanFlag) {
359 case NextScanReq::ZSCAN_NEXT:
360 jam();
361 break;
362 case NextScanReq::ZSCAN_NEXT_COMMIT:
363 jam();
364 case NextScanReq::ZSCAN_COMMIT:
365 jam();
366 if (! scan.m_readCommitted) {
367 jam();
368 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
369 lockReq->returnCode = RNIL;
370 lockReq->requestInfo = AccLockReq::Unlock;
371 lockReq->accOpPtr = req->accOperationPtr;
372 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
373 jamEntry();
374 ndbrequire(lockReq->returnCode == AccLockReq::Success);
375 removeAccLockOp(scanPtr, req->accOperationPtr);
376 }
377 if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
378 jam();
379 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
380 conf->scanPtr = scan.m_userPtr;
381 unsigned signalLength = 1;
382 sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
383 signal, signalLength, JBB);
384 return;
385 }
386 break;
387 case NextScanReq::ZSCAN_CLOSE:
388 jam();
389 // unlink from tree node first to avoid state changes
390 if (scan.m_scanPos.m_loc != NullTupLoc) {
391 jam();
392 const TupLoc loc = scan.m_scanPos.m_loc;
393 NodeHandle node(frag);
394 selectNode(node, loc);
395 unlinkScan(node, scanPtr);
396 scan.m_scanPos.m_loc = NullTupLoc;
397 }
398 if (scan.m_lockwait) {
399 jam();
400 ndbrequire(scan.m_accLockOp != RNIL);
401 // use ACC_ABORTCONF to flush out any reply in job buffer
402 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
403 lockReq->returnCode = RNIL;
404 lockReq->requestInfo = AccLockReq::AbortWithConf;
405 lockReq->accOpPtr = scan.m_accLockOp;
406 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
407 AccLockReq::UndoSignalLength);
408 jamEntry();
409 ndbrequire(lockReq->returnCode == AccLockReq::Success);
410 scan.m_state = ScanOp::Aborting;
411 return;
412 }
413 if (scan.m_state == ScanOp::Locked) {
414 jam();
415 ndbrequire(scan.m_accLockOp != RNIL);
416 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
417 lockReq->returnCode = RNIL;
418 lockReq->requestInfo = AccLockReq::Abort;
419 lockReq->accOpPtr = scan.m_accLockOp;
420 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
421 AccLockReq::UndoSignalLength);
422 jamEntry();
423 ndbrequire(lockReq->returnCode == AccLockReq::Success);
424 scan.m_accLockOp = RNIL;
425 }
426 scan.m_state = ScanOp::Aborting;
427 scanClose(signal, scanPtr);
428 return;
429 case NextScanReq::ZSCAN_NEXT_ABORT:
430 jam();
431 default:
432 jam();
433 ndbrequire(false);
434 break;
435 }
436 // start looking for next scan result
437 AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
438 checkReq->accPtr = scanPtr.i;
439 checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
440 EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
441 jamEntry();
442 }
443
444 void
execACC_CHECK_SCAN(Signal * signal)445 Dbtux::execACC_CHECK_SCAN(Signal* signal)
446 {
447 jamEntry();
448 const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
449 const AccCheckScan* const req = &reqCopy;
450 ScanOpPtr scanPtr;
451 scanPtr.i = req->accPtr;
452 c_scanOpPool.getPtr(scanPtr);
453 ScanOp& scan = *scanPtr.p;
454 Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
455 #ifdef VM_TRACE
456 if (debugFlags & DebugScan) {
457 debugOut << "ACC_CHECK_SCAN scan " << scanPtr.i << " " << scan << endl;
458 }
459 #endif
460 if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
461 jam();
462 signal->theData[0] = scan.m_userPtr;
463 signal->theData[1] = true;
464 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
465 jamEntry();
466 return; // stop
467 }
468 if (scan.m_lockwait) {
469 jam();
470 // LQH asks if we are waiting for lock and we tell it to ask again
471 const TreeEnt ent = scan.m_scanEnt;
472 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
473 conf->scanPtr = scan.m_userPtr;
474 conf->accOperationPtr = RNIL; // no tuple returned
475 conf->fragId = frag.m_fragId;
476 unsigned signalLength = 3;
477 // if TC has ordered scan close, it will be detected here
478 sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
479 signal, signalLength, JBB);
480 return; // stop
481 }
482 // check index online
483 const Index& index = *c_indexPool.getPtr(frag.m_indexId);
484 if (unlikely(index.m_state != Index::Online) &&
485 scanPtr.p->m_errorCode == 0) {
486 jam();
487 #ifdef VM_TRACE
488 if (debugFlags & (DebugMeta | DebugScan)) {
489 debugOut << "Index dropping at execACC_CHECK_SCAN " << scanPtr.i << " " << *scanPtr.p << endl;
490 }
491 #endif
492 scanPtr.p->m_errorCode = AccScanRef::TuxIndexNotOnline;
493 }
494 if (scan.m_state == ScanOp::First) {
495 jam();
496 // search is done only once in single range scan
497 scanFirst(scanPtr);
498 }
499 if (scan.m_state == ScanOp::Current ||
500 scan.m_state == ScanOp::Next) {
501 jam();
502 // look for next
503 scanFind(scanPtr);
504 }
505 // for reading tuple key in Found or Locked state
506 Uint32* pkData = c_ctx.c_dataBuffer;
507 unsigned pkSize = 0; // indicates not yet done
508 if (scan.m_state == ScanOp::Found) {
509 // found an entry to return
510 jam();
511 ndbrequire(scan.m_accLockOp == RNIL);
512 if (! scan.m_readCommitted) {
513 jam();
514 const TreeEnt ent = scan.m_scanEnt;
515 // read tuple key
516 readTablePk(frag, ent, pkData, pkSize);
517 // get read lock or exclusive lock
518 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
519 lockReq->returnCode = RNIL;
520 lockReq->requestInfo =
521 scan.m_lockMode == 0 ? AccLockReq::LockShared : AccLockReq::LockExclusive;
522 lockReq->accOpPtr = RNIL;
523 lockReq->userPtr = scanPtr.i;
524 lockReq->userRef = reference();
525 lockReq->tableId = scan.m_tableId;
526 lockReq->fragId = frag.m_fragId;
527 lockReq->fragPtrI = frag.m_accTableFragPtrI;
528 const Uint32* const buf32 = static_cast<Uint32*>(pkData);
529 const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
530 lockReq->hashValue = md5_hash(buf64, pkSize);
531 Uint32 lkey1, lkey2;
532 getTupAddr(frag, ent, lkey1, lkey2);
533 lockReq->page_id = lkey1;
534 lockReq->page_idx = lkey2;
535 lockReq->transId1 = scan.m_transId1;
536 lockReq->transId2 = scan.m_transId2;
537 // execute
538 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::LockSignalLength);
539 jamEntry();
540 switch (lockReq->returnCode) {
541 case AccLockReq::Success:
542 jam();
543 scan.m_state = ScanOp::Locked;
544 scan.m_accLockOp = lockReq->accOpPtr;
545 #ifdef VM_TRACE
546 if (debugFlags & (DebugScan | DebugLock)) {
547 debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl;
548 }
549 #endif
550 break;
551 case AccLockReq::IsBlocked:
552 jam();
553 // normal lock wait
554 scan.m_state = ScanOp::Blocked;
555 scan.m_lockwait = true;
556 scan.m_accLockOp = lockReq->accOpPtr;
557 #ifdef VM_TRACE
558 if (debugFlags & (DebugScan | DebugLock)) {
559 debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl;
560 }
561 #endif
562 // LQH will wake us up
563 signal->theData[0] = scan.m_userPtr;
564 signal->theData[1] = true;
565 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
566 jamEntry();
567 return; // stop
568 break;
569 case AccLockReq::Refused:
570 jam();
571 // we cannot see deleted tuple (assert only)
572 ndbassert(false);
573 // skip it
574 scan.m_state = ScanOp::Next;
575 signal->theData[0] = scan.m_userPtr;
576 signal->theData[1] = true;
577 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
578 jamEntry();
579 return; // stop
580 break;
581 case AccLockReq::NoFreeOp:
582 jam();
583 // max ops should depend on max scans (assert only)
584 ndbassert(false);
585 // stay in Found state
586 scan.m_state = ScanOp::Found;
587 signal->theData[0] = scan.m_userPtr;
588 signal->theData[1] = true;
589 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
590 jamEntry();
591 return; // stop
592 break;
593 default:
594 ndbrequire(false);
595 break;
596 }
597 } else {
598 scan.m_state = ScanOp::Locked;
599 }
600 }
601 if (scan.m_state == ScanOp::Locked) {
602 // we have lock or do not need one
603 jam();
604 // read keys if not already done (uses signal)
605 const TreeEnt ent = scan.m_scanEnt;
606 // conf signal
607 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
608 conf->scanPtr = scan.m_userPtr;
609 // the lock is passed to LQH
610 Uint32 accLockOp = scan.m_accLockOp;
611 if (accLockOp != RNIL) {
612 scan.m_accLockOp = RNIL;
613 // remember it until LQH unlocks it
614 addAccLockOp(scanPtr, accLockOp);
615 } else {
616 ndbrequire(scan.m_readCommitted);
617 // operation RNIL in LQH would signal no tuple returned
618 accLockOp = (Uint32)-1;
619 }
620 conf->accOperationPtr = accLockOp;
621 conf->fragId = frag.m_fragId;
622 Uint32 lkey1, lkey2;
623 getTupAddr(frag, ent, lkey1, lkey2);
624 conf->localKey[0] = lkey1;
625 conf->localKey[1] = lkey2;
626 unsigned signalLength = 5;
627 // add key info
628 if (! scan.m_readCommitted) {
629 sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
630 signal, signalLength, JBB);
631 } else {
632 Uint32 blockNo = refToMain(scan.m_userRef);
633 EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
634 }
635 // next time look for next entry
636 scan.m_state = ScanOp::Next;
637 return;
638 }
639 // XXX in ACC this is checked before req->checkLcpStop
640 if (scan.m_state == ScanOp::Last) {
641 jam();
642 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
643 conf->scanPtr = scan.m_userPtr;
644 conf->accOperationPtr = RNIL;
645 conf->fragId = RNIL;
646 unsigned signalLength = 3;
647 sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
648 signal, signalLength, JBB);
649 return;
650 }
651 ndbrequire(false);
652 }
653
654 /*
655 * Lock succeeded (after delay) in ACC. If the lock is for current
656 * entry, set state to Locked. If the lock is for an entry we were
657 * moved away from, simply unlock it. Finally, if we are closing the
658 * scan, do nothing since we have already sent an abort request.
659 */
660 void
execACCKEYCONF(Signal * signal)661 Dbtux::execACCKEYCONF(Signal* signal)
662 {
663 jamEntry();
664 ScanOpPtr scanPtr;
665 scanPtr.i = signal->theData[0];
666 c_scanOpPool.getPtr(scanPtr);
667 ScanOp& scan = *scanPtr.p;
668 #ifdef VM_TRACE
669 if (debugFlags & (DebugScan | DebugLock)) {
670 debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl;
671 }
672 #endif
673 ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
674 scan.m_lockwait = false;
675 if (scan.m_state == ScanOp::Blocked) {
676 // the lock wait was for current entry
677 jam();
678 scan.m_state = ScanOp::Locked;
679 // LQH has the ball
680 return;
681 }
682 if (scan.m_state != ScanOp::Aborting) {
683 // we were moved, release lock
684 jam();
685 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
686 lockReq->returnCode = RNIL;
687 lockReq->requestInfo = AccLockReq::Abort;
688 lockReq->accOpPtr = scan.m_accLockOp;
689 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
690 jamEntry();
691 ndbrequire(lockReq->returnCode == AccLockReq::Success);
692 scan.m_accLockOp = RNIL;
693 // LQH has the ball
694 return;
695 }
696 // lose the lock
697 scan.m_accLockOp = RNIL;
698 // continue at ACC_ABORTCONF
699 }
700
701 /*
702 * Lock failed (after delay) in ACC. Probably means somebody ahead of
703 * us in lock queue deleted the tuple.
704 */
705 void
execACCKEYREF(Signal * signal)706 Dbtux::execACCKEYREF(Signal* signal)
707 {
708 jamEntry();
709 ScanOpPtr scanPtr;
710 scanPtr.i = signal->theData[0];
711 c_scanOpPool.getPtr(scanPtr);
712 ScanOp& scan = *scanPtr.p;
713 #ifdef VM_TRACE
714 if (debugFlags & (DebugScan | DebugLock)) {
715 debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl;
716 }
717 #endif
718 ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
719 scan.m_lockwait = false;
720 if (scan.m_state != ScanOp::Aborting) {
721 jam();
722 // release the operation
723 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
724 lockReq->returnCode = RNIL;
725 lockReq->requestInfo = AccLockReq::Abort;
726 lockReq->accOpPtr = scan.m_accLockOp;
727 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
728 jamEntry();
729 ndbrequire(lockReq->returnCode == AccLockReq::Success);
730 scan.m_accLockOp = RNIL;
731 // scan position should already have been moved (assert only)
732 if (scan.m_state == ScanOp::Blocked) {
733 jam();
734 // can happen when Dropping
735 #ifdef VM_TRACE
736 const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
737 const Index& index = *c_indexPool.getPtr(frag.m_indexId);
738 ndbassert(index.m_state != Index::Online);
739 #endif
740 scan.m_state = ScanOp::Next;
741 }
742 // LQH has the ball
743 return;
744 }
745 // lose the lock
746 scan.m_accLockOp = RNIL;
747 // continue at ACC_ABORTCONF
748 }
749
750 /*
751 * Received when scan is closing. This signal arrives after any
752 * ACCKEYCON or ACCKEYREF which may have been in job buffer.
753 */
754 void
execACC_ABORTCONF(Signal * signal)755 Dbtux::execACC_ABORTCONF(Signal* signal)
756 {
757 jamEntry();
758 ScanOpPtr scanPtr;
759 scanPtr.i = signal->theData[0];
760 c_scanOpPool.getPtr(scanPtr);
761 ScanOp& scan = *scanPtr.p;
762 #ifdef VM_TRACE
763 if (debugFlags & (DebugScan | DebugLock)) {
764 debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl;
765 }
766 #endif
767 ndbrequire(scan.m_state == ScanOp::Aborting);
768 // most likely we are still in lock wait
769 if (scan.m_lockwait) {
770 jam();
771 scan.m_lockwait = false;
772 scan.m_accLockOp = RNIL;
773 }
774 scanClose(signal, scanPtr);
775 }
776
777 /*
778 * Find start position for single range scan.
779 */
780 void
scanFirst(ScanOpPtr scanPtr)781 Dbtux::scanFirst(ScanOpPtr scanPtr)
782 {
783 ScanOp& scan = *scanPtr.p;
784 Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
785 const Index& index = *c_indexPool.getPtr(frag.m_indexId);
786 #ifdef VM_TRACE
787 if (debugFlags & DebugScan) {
788 debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
789 }
790 #endif
791 // scan direction 0, 1
792 const unsigned idir = scan.m_descending;
793 // set up bound from segmented memory
794 const ScanBound& scanBound = scan.m_scanBound[idir];
795 KeyDataC searchBoundData(index.m_keySpec, true);
796 KeyBoundC searchBound(searchBoundData);
797 unpackBound(c_ctx, scanBound, searchBound);
798 TreePos treePos;
799 searchToScan(frag, idir, searchBound, treePos);
800 if (treePos.m_loc != NullTupLoc) {
801 scan.m_scanPos = treePos;
802 // link the scan to node found
803 NodeHandle node(frag);
804 selectNode(node, treePos.m_loc);
805 linkScan(node, scanPtr);
806 if (treePos.m_dir == 3) {
807 jam();
808 // check upper bound
809 TreeEnt ent = node.getEnt(treePos.m_pos);
810 if (scanCheck(scanPtr, ent)) {
811 jam();
812 scan.m_state = ScanOp::Current;
813 } else {
814 jam();
815 scan.m_state = ScanOp::Last;
816 }
817 } else {
818 jam();
819 scan.m_state = ScanOp::Next;
820 }
821 } else {
822 jam();
823 scan.m_state = ScanOp::Last;
824 }
825 #ifdef VM_TRACE
826 if (debugFlags & DebugScan) {
827 debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
828 }
829 #endif
830 }
831
832 /*
833 * Look for entry to return as scan result.
834 */
835 void
scanFind(ScanOpPtr scanPtr)836 Dbtux::scanFind(ScanOpPtr scanPtr)
837 {
838 ScanOp& scan = *scanPtr.p;
839 Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
840 #ifdef VM_TRACE
841 if (debugFlags & DebugScan) {
842 debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
843 }
844 #endif
845 ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
846 while (1) {
847 jam();
848 if (scan.m_state == ScanOp::Next)
849 scanNext(scanPtr, false);
850 if (scan.m_state == ScanOp::Current) {
851 jam();
852 const TreePos pos = scan.m_scanPos;
853 NodeHandle node(frag);
854 selectNode(node, pos.m_loc);
855 const TreeEnt ent = node.getEnt(pos.m_pos);
856 if (unlikely(scan.m_statOpPtrI != RNIL)) {
857 StatOpPtr statPtr;
858 statPtr.i = scan.m_statOpPtrI;
859 c_statOpPool.getPtr(statPtr);
860 // report row to stats, returns true if a sample is available
861 int ret = statScanAddRow(statPtr, ent);
862 if (ret == 1) {
863 jam();
864 scan.m_state = ScanOp::Found;
865 // may not access non-pseudo cols but must return valid ent
866 scan.m_scanEnt = ent;
867 break;
868 }
869 } else if (scanVisible(scanPtr, ent)) {
870 jam();
871 scan.m_state = ScanOp::Found;
872 scan.m_scanEnt = ent;
873 break;
874 }
875 } else {
876 jam();
877 break;
878 }
879 scan.m_state = ScanOp::Next;
880 }
881 #ifdef VM_TRACE
882 if (debugFlags & DebugScan) {
883 debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
884 }
885 #endif
886 }
887
888 /*
889 * Move to next entry. The scan is already linked to some node. When
890 * we leave, if an entry was found, it will be linked to a possibly
891 * different node. The scan has a position, and a direction which tells
892 * from where we came to this position. This is one of (all comments
893 * are in terms of ascending scan):
894 *
895 * 0 - up from left child (scan this node next)
896 * 1 - up from right child (proceed to parent)
897 * 2 - up from root (the scan ends)
898 * 3 - left to right within node (at end set state 5)
899 * 4 - down from parent (proceed to left child)
900 * 5 - at node end proceed to right child (state becomes 4)
901 *
902 * If an entry was found, scan direction is 3. Therefore tree
903 * re-organizations need not worry about scan direction.
904 *
905 * This method is also used to move a scan when its entry is removed
906 * (see moveScanList). If the scan is Blocked, we check if it remains
907 * Blocked on a different version of the tuple. Otherwise the tuple is
908 * lost and state becomes Current.
909 */
910 void
scanNext(ScanOpPtr scanPtr,bool fromMaintReq)911 Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
912 {
913 ScanOp& scan = *scanPtr.p;
914 Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
915 #ifdef VM_TRACE
916 if (debugFlags & (DebugMaint | DebugScan)) {
917 debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
918 }
919 #endif
920 // cannot be moved away from tuple we have locked
921 ndbrequire(scan.m_state != ScanOp::Locked);
922 // scan direction
923 const unsigned idir = scan.m_descending; // 0, 1
924 const int jdir = 1 - 2 * (int)idir; // 1, -1
925 // use copy of position
926 TreePos pos = scan.m_scanPos;
927 // get and remember original node
928 NodeHandle origNode(frag);
929 selectNode(origNode, pos.m_loc);
930 ndbrequire(islinkScan(origNode, scanPtr));
931 // current node in loop
932 NodeHandle node = origNode;
933 // copy of entry found
934 TreeEnt ent;
935 while (true) {
936 jam();
937 #ifdef VM_TRACE
938 if (debugFlags & (DebugMaint | DebugScan)) {
939 debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
940 }
941 #endif
942 if (pos.m_dir == 2) {
943 // coming up from root ends the scan
944 jam();
945 pos.m_loc = NullTupLoc;
946 break;
947 }
948 if (node.m_loc != pos.m_loc) {
949 jam();
950 selectNode(node, pos.m_loc);
951 }
952 if (pos.m_dir == 4) {
953 // coming down from parent proceed to left child
954 jam();
955 TupLoc loc = node.getLink(idir);
956 if (loc != NullTupLoc) {
957 jam();
958 pos.m_loc = loc;
959 pos.m_dir = 4; // unchanged
960 continue;
961 }
962 // pretend we came from left child
963 pos.m_dir = idir;
964 }
965 if (pos.m_dir == 5) {
966 // at node end proceed to right child
967 jam();
968 TupLoc loc = node.getLink(1 - idir);
969 if (loc != NullTupLoc) {
970 jam();
971 pos.m_loc = loc;
972 pos.m_dir = 4; // down from parent as usual
973 continue;
974 }
975 // pretend we came from right child
976 pos.m_dir = 1 - idir;
977 }
978 const unsigned occup = node.getOccup();
979 if (occup == 0) {
980 jam();
981 ndbrequire(fromMaintReq);
982 // move back to parent - see comment in treeRemoveInner
983 pos.m_loc = node.getLink(2);
984 pos.m_dir = node.getSide();
985 continue;
986 }
987 if (pos.m_dir == idir) {
988 // coming up from left child scan current node
989 jam();
990 pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
991 pos.m_dir = 3;
992 }
993 if (pos.m_dir == 3) {
994 // before or within node
995 jam();
996 // advance position - becomes ZNIL (> occup) if 0 and descending
997 pos.m_pos += jdir;
998 if (pos.m_pos < occup) {
999 jam();
1000 pos.m_dir = 3; // unchanged
1001 ent = node.getEnt(pos.m_pos);
1002 if (! scanCheck(scanPtr, ent)) {
1003 jam();
1004 pos.m_loc = NullTupLoc;
1005 }
1006 break;
1007 }
1008 // after node proceed to right child
1009 pos.m_dir = 5;
1010 continue;
1011 }
1012 if (pos.m_dir == 1 - idir) {
1013 // coming up from right child proceed to parent
1014 jam();
1015 pos.m_loc = node.getLink(2);
1016 pos.m_dir = node.getSide();
1017 continue;
1018 }
1019 ndbrequire(false);
1020 }
1021 // copy back position
1022 scan.m_scanPos = pos;
1023 // relink
1024 if (pos.m_loc != NullTupLoc) {
1025 ndbrequire(pos.m_dir == 3);
1026 ndbrequire(pos.m_loc == node.m_loc);
1027 if (origNode.m_loc != node.m_loc) {
1028 jam();
1029 unlinkScan(origNode, scanPtr);
1030 linkScan(node, scanPtr);
1031 }
1032 if (scan.m_state != ScanOp::Blocked) {
1033 scan.m_state = ScanOp::Current;
1034 } else {
1035 jam();
1036 ndbrequire(fromMaintReq);
1037 TreeEnt& scanEnt = scan.m_scanEnt;
1038 ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
1039 if (scanEnt.eqtuple(ent)) {
1040 // remains blocked on another version
1041 scanEnt = ent;
1042 } else {
1043 jam();
1044 scanEnt.m_tupLoc = NullTupLoc;
1045 scan.m_state = ScanOp::Current;
1046 }
1047 }
1048 } else {
1049 jam();
1050 unlinkScan(origNode, scanPtr);
1051 scan.m_state = ScanOp::Last;
1052 }
1053 #ifdef VM_TRACE
1054 if (debugFlags & (DebugMaint | DebugScan)) {
1055 debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
1056 }
1057 #endif
1058 }
1059
1060 /*
1061 * Check end key. Return true if scan is still within range.
1062 *
1063 * Error handling: If scan error code has been set, return false at
1064 * once. This terminates the scan and also avoids kernel crash on
1065 * invalid data.
1066 */
1067 bool
scanCheck(ScanOpPtr scanPtr,TreeEnt ent)1068 Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
1069 {
1070 ScanOp& scan = *scanPtr.p;
1071 if (unlikely(scan.m_errorCode != 0)) {
1072 jam();
1073 return false;
1074 }
1075 Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1076 const Index& index = *c_indexPool.getPtr(frag.m_indexId);
1077 const unsigned idir = scan.m_descending;
1078 const int jdir = 1 - 2 * (int)idir;
1079 const ScanBound& scanBound = scan.m_scanBound[1 - idir];
1080 int ret = 0;
1081 if (scanBound.m_cnt != 0) {
1082 jam();
1083 // set up bound from segmented memory
1084 KeyDataC searchBoundData(index.m_keySpec, true);
1085 KeyBoundC searchBound(searchBoundData);
1086 unpackBound(c_ctx, scanBound, searchBound);
1087 // key data for the entry
1088 KeyData entryKey(index.m_keySpec, true, 0);
1089 entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
1090 readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
1091 // compare bound to key
1092 const Uint32 boundCount = searchBound.get_data().get_cnt();
1093 ret = cmpSearchBound(c_ctx, searchBound, entryKey, boundCount);
1094 ndbrequire(ret != 0);
1095 ret = (-1) * ret; // reverse for key vs bound
1096 ret = jdir * ret; // reverse for descending scan
1097 }
1098 #ifdef VM_TRACE
1099 if (debugFlags & DebugScan) {
1100 debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
1101 }
1102 #endif
1103 return (ret <= 0);
1104 }
1105
1106 /*
1107 * Check if an entry is visible to the scan.
1108 *
1109 * There is a special check to never accept same tuple twice in a row.
1110 * This is faster than asking TUP. It also fixes some special cases
1111 * which are not analyzed or handled yet.
1112 *
1113 * Error handling: If scan error code has been set, return false since
1114 * no new result can be returned to LQH. The scan will then look for
1115 * next result and terminate via scanCheck():
1116 */
1117 bool
scanVisible(ScanOpPtr scanPtr,TreeEnt ent)1118 Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
1119 {
1120 const ScanOp& scan = *scanPtr.p;
1121 if (unlikely(scan.m_errorCode != 0)) {
1122 jam();
1123 return false;
1124 }
1125 const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
1126 Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
1127 Uint32 pageId = ent.m_tupLoc.getPageId();
1128 Uint32 pageOffset = ent.m_tupLoc.getPageOffset();
1129 Uint32 tupVersion = ent.m_tupVersion;
1130 // check for same tuple twice in row
1131 if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc)
1132 {
1133 jam();
1134 return false;
1135 }
1136 Uint32 transId1 = scan.m_transId1;
1137 Uint32 transId2 = scan.m_transId2;
1138 bool dirty = scan.m_readCommitted;
1139 Uint32 savePointId = scan.m_savePointId;
1140 bool ret = c_tup->tuxQueryTh(tableFragPtrI, pageId, pageOffset, tupVersion, transId1, transId2, dirty, savePointId);
1141 jamEntry();
1142 return ret;
1143 }
1144
1145 /*
1146 * Finish closing of scan and send conf. Any lock wait has been done
1147 * already.
1148 *
1149 * Error handling: Every scan ends here. If error code has been set,
1150 * send a REF.
1151 */
1152 void
scanClose(Signal * signal,ScanOpPtr scanPtr)1153 Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
1154 {
1155 ScanOp& scan = *scanPtr.p;
1156 ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
1157 // unlock all not unlocked by LQH
1158 if (! scan.m_accLockOps.isEmpty()) {
1159 jam();
1160 abortAccLockOps(signal, scanPtr);
1161 }
1162 if (scanPtr.p->m_errorCode == 0) {
1163 jam();
1164 // send conf
1165 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
1166 conf->scanPtr = scanPtr.p->m_userPtr;
1167 conf->accOperationPtr = RNIL;
1168 conf->fragId = RNIL;
1169 unsigned signalLength = 3;
1170 sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
1171 signal, signalLength, JBB);
1172 } else {
1173 // send ref
1174 NextScanRef* ref = (NextScanRef*)signal->getDataPtr();
1175 ref->scanPtr = scanPtr.p->m_userPtr;
1176 ref->accOperationPtr = RNIL;
1177 ref->fragId = RNIL;
1178 ref->errorCode = scanPtr.p->m_errorCode;
1179 sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANREF,
1180 signal, NextScanRef::SignalLength, JBB);
1181 }
1182 releaseScanOp(scanPtr);
1183 }
1184
1185 void
abortAccLockOps(Signal * signal,ScanOpPtr scanPtr)1186 Dbtux::abortAccLockOps(Signal* signal, ScanOpPtr scanPtr)
1187 {
1188 ScanOp& scan = *scanPtr.p;
1189 #ifdef VM_TRACE
1190 if (debugFlags & (DebugScan | DebugLock)) {
1191 debugOut << "Abort locks in scan " << scanPtr.i << " " << scan << endl;
1192 }
1193 #endif
1194 LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1195 ScanLockPtr lockPtr;
1196 while (list.first(lockPtr)) {
1197 jam();
1198 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
1199 lockReq->returnCode = RNIL;
1200 lockReq->requestInfo = AccLockReq::Abort;
1201 lockReq->accOpPtr = lockPtr.p->m_accLockOp;
1202 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
1203 jamEntry();
1204 ndbrequire(lockReq->returnCode == AccLockReq::Success);
1205 list.release(lockPtr);
1206 }
1207 }
1208
1209 void
addAccLockOp(ScanOpPtr scanPtr,Uint32 accLockOp)1210 Dbtux::addAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1211 {
1212 ScanOp& scan = *scanPtr.p;
1213 #ifdef VM_TRACE
1214 if (debugFlags & (DebugScan | DebugLock)) {
1215 debugOut << "Add lock " << hex << accLockOp << dec
1216 << " to scan " << scanPtr.i << " " << scan << endl;
1217 }
1218 #endif
1219 LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1220 ScanLockPtr lockPtr;
1221 #ifdef VM_TRACE
1222 list.first(lockPtr);
1223 while (lockPtr.i != RNIL) {
1224 ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
1225 list.next(lockPtr);
1226 }
1227 #endif
1228 bool ok = list.seize(lockPtr);
1229 ndbrequire(ok);
1230 ndbrequire(accLockOp != RNIL);
1231 lockPtr.p->m_accLockOp = accLockOp;
1232 }
1233
1234 void
removeAccLockOp(ScanOpPtr scanPtr,Uint32 accLockOp)1235 Dbtux::removeAccLockOp(ScanOpPtr scanPtr, Uint32 accLockOp)
1236 {
1237 ScanOp& scan = *scanPtr.p;
1238 #ifdef VM_TRACE
1239 if (debugFlags & (DebugScan | DebugLock)) {
1240 debugOut << "Remove lock " << hex << accLockOp << dec
1241 << " from scan " << scanPtr.i << " " << scan << endl;
1242 }
1243 #endif
1244 LocalDLFifoList<ScanLock> list(c_scanLockPool, scan.m_accLockOps);
1245 ScanLockPtr lockPtr;
1246 list.first(lockPtr);
1247 while (lockPtr.i != RNIL) {
1248 if (lockPtr.p->m_accLockOp == accLockOp) {
1249 jam();
1250 break;
1251 }
1252 list.next(lockPtr);
1253 }
1254 ndbrequire(lockPtr.i != RNIL);
1255 list.release(lockPtr);
1256 }
1257
1258 /*
1259 * Release allocated records.
1260 */
1261 void
releaseScanOp(ScanOpPtr & scanPtr)1262 Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
1263 {
1264 #ifdef VM_TRACE
1265 if (debugFlags & DebugScan) {
1266 debugOut << "Release scan " << scanPtr.i << " " << *scanPtr.p << endl;
1267 }
1268 #endif
1269 Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
1270 for (unsigned i = 0; i <= 1; i++) {
1271 ScanBound& scanBound = scanPtr.p->m_scanBound[i];
1272 DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
1273 LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
1274 b.release();
1275 }
1276 if (unlikely(scanPtr.p->m_statOpPtrI != RNIL)) {
1277 jam();
1278 StatOpPtr statPtr;
1279 statPtr.i = scanPtr.p->m_statOpPtrI;
1280 c_statOpPool.getPtr(statPtr);
1281 c_statOpPool.release(statPtr);
1282 }
1283 // unlink from per-fragment list and release from pool
1284 frag.m_scanList.release(scanPtr);
1285 }
1286