1 /*
2 Copyright (c) 2005, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #define DBTUP_C
26 #define DBTUP_SCAN_CPP
27 #include "Dbtup.hpp"
28 #include "../backup/Backup.hpp"
29 #include <signaldata/AccScan.hpp>
30 #include <signaldata/NextScan.hpp>
31 #include <signaldata/AccLock.hpp>
32 #include <md5_hash.hpp>
33 #include <portlib/ndb_prefetch.h>
34 #include "../dblqh/Dblqh.hpp"
35
36 #define JAM_FILE_ID 408
37
38 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
39 //#define DEBUG_LCP 1
40 //#define DEBUG_LCP_DEL2 1
41 //#define DEBUG_LCP_DEL_EXTRA 1
42 //#define DEBUG_LCP_SKIP_EXTRA 1
43 //#define DEBUG_LCP_KEEP 1
44 //#define DEBUG_LCP_REL 1
45 //#define DEBUG_NR_SCAN 1
46 //#define DEBUG_NR_SCAN_EXTRA 1
47 //#define DEBUG_LCP_SCANNED_BIT 1
48 //#define DEBUG_LCP_FILTER 1
49 //#define DEBUG_LCP_DEL 1
50 //#define DEBUG_LCP_DELAY 1
51 //#define DEBUG_LCP_SKIP 1
52 //#define DEBUG_LCP_DEL 1
53 //#define DEBUG_LCP_SKIP 1
54 #endif
55
56 #ifdef DEBUG_LCP_DELAY
57 #define DEB_LCP_DELAY(arglist) do { g_eventLogger->info arglist ; } while (0)
58 #else
59 #define DEB_LCP_DELAY(arglist) do { } while (0)
60 #endif
61
62 #ifdef DEBUG_LCP_FILTER
63 #define DEB_LCP_FILTER(arglist) do { g_eventLogger->info arglist ; } while (0)
64 #else
65 #define DEB_LCP_FILTER(arglist) do { } while (0)
66 #endif
67
68 #ifdef DEBUG_LCP
69 #define DEB_LCP(arglist) do { g_eventLogger->info arglist ; } while (0)
70 #else
71 #define DEB_LCP(arglist) do { } while (0)
72 #endif
73
74 #ifdef DEBUG_LCP_DEL
75 #define DEB_LCP_DEL(arglist) do { g_eventLogger->info arglist ; } while (0)
76 #else
77 #define DEB_LCP_DEL(arglist) do { } while (0)
78 #endif
79
80 #ifdef DEBUG_LCP_DEL2
81 #define DEB_LCP_DEL2(arglist) do { g_eventLogger->info arglist ; } while (0)
82 #else
83 #define DEB_LCP_DEL2(arglist) do { } while (0)
84 #endif
85
86 #ifdef DEBUG_LCP_DEL_EXTRA
87 #define DEB_LCP_DEL_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
88 #else
89 #define DEB_LCP_DEL_EXTRA(arglist) do { } while (0)
90 #endif
91
92 #ifdef DEBUG_LCP_SKIP
93 #define DEB_LCP_SKIP(arglist) do { g_eventLogger->info arglist ; } while (0)
94 #else
95 #define DEB_LCP_SKIP(arglist) do { } while (0)
96 #endif
97
98 #ifdef DEBUG_LCP_SKIP_EXTRA
99 #define DEB_LCP_SKIP_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
100 #else
101 #define DEB_LCP_SKIP_EXTRA(arglist) do { } while (0)
102 #endif
103
104 #ifdef DEBUG_LCP_KEEP
105 #define DEB_LCP_KEEP(arglist) do { g_eventLogger->info arglist ; } while (0)
106 #else
107 #define DEB_LCP_KEEP(arglist) do { } while (0)
108 #endif
109
110 #ifdef DEBUG_LCP_REL
111 #define DEB_LCP_REL(arglist) do { g_eventLogger->info arglist ; } while (0)
112 #else
113 #define DEB_LCP_REL(arglist) do { } while (0)
114 #endif
115
116 #ifdef DEBUG_NR_SCAN
117 #define DEB_NR_SCAN(arglist) do { g_eventLogger->info arglist ; } while (0)
118 #else
119 #define DEB_NR_SCAN(arglist) do { } while (0)
120 #endif
121
122 #ifdef DEBUG_NR_SCAN_EXTRA
123 #define DEB_NR_SCAN_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
124 #else
125 #define DEB_NR_SCAN_EXTRA(arglist) do { } while (0)
126 #endif
127
128 #ifdef VM_TRACE
129 #define dbg(x) globalSignalLoggers.log x
130 #else
131 #define dbg(x)
132 #endif
133
134 void
prepare_scan_ctx(Uint32 scanPtrI)135 Dbtup::prepare_scan_ctx(Uint32 scanPtrI)
136 {
137 (void)scanPtrI;
138 }
139
140 void
execACC_SCANREQ(Signal * signal)141 Dbtup::execACC_SCANREQ(Signal* signal)
142 {
143 jamEntry();
144 const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
145 const AccScanReq* const req = &reqCopy;
146 ScanOpPtr scanPtr;
147 scanPtr.i = RNIL;
148 do {
149 // find table and fragment
150 TablerecPtr tablePtr;
151 tablePtr.i = req->tableId;
152 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
153 FragrecordPtr fragPtr;
154 Uint32 fragId = req->fragmentNo;
155 fragPtr.i = RNIL;
156 getFragmentrec(fragPtr, fragId, tablePtr.p);
157 ndbrequire(fragPtr.i != RNIL);
158 Fragrecord& frag = *fragPtr.p;
159 // flags
160 Uint32 bits = 0;
161
162 if (AccScanReq::getLcpScanFlag(req->requestInfo))
163 {
164 jam();
165 bits |= ScanOp::SCAN_LCP;
166 scanPtr.i = c_lcp_scan_op;
167 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
168 ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i);
169 ndbrequire(scanPtr.p->m_state == ScanOp::First);
170 }
171 else if (AccScanReq::getCopyFragScanFlag(req->requestInfo))
172 {
173 jam();
174 bits |= ScanOp::SCAN_COPY_FRAG;
175 scanPtr.i = c_copy_frag_scan_op;
176 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
177 ndbrequire(scanPtr.p->m_state == ScanOp::First);
178 ndbrequire(scanPtr.p->m_bits == 0);
179 }
180 else
181 {
182 // seize from pool and link to per-fragment list
183 if (!c_scanOpPool.seize(scanPtr))
184 {
185 jam();
186 break;
187 }
188 Local_ScanOp_list list(c_scanOpPool, frag.m_scanList);
189 list.addFirst(scanPtr);
190 jam();
191 }
192
193 if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
194 && tablePtr.p->m_no_of_disk_attributes)
195 {
196 jam();
197 bits |= ScanOp::SCAN_DD;
198 }
199
200 bool mm = (bits & ScanOp::SCAN_DD);
201 if ((tablePtr.p->m_attributes[mm].m_no_of_varsize +
202 tablePtr.p->m_attributes[mm].m_no_of_dynamic) > 0)
203 {
204 if (bits & ScanOp::SCAN_DD)
205 {
206 // only dd scan varsize pages
207 // mm always has a fixed part
208 jam();
209 bits |= ScanOp::SCAN_VS;
210 }
211 }
212
213 if (! AccScanReq::getReadCommittedFlag(req->requestInfo))
214 {
215 if (AccScanReq::getLockMode(req->requestInfo) == 0)
216 {
217 jam();
218 bits |= ScanOp::SCAN_LOCK_SH;
219 }
220 else
221 {
222 jam();
223 bits |= ScanOp::SCAN_LOCK_EX;
224 }
225 }
226
227 if (AccScanReq::getNRScanFlag(req->requestInfo))
228 {
229 jam();
230 bits |= ScanOp::SCAN_NR;
231 scanPtr.p->m_endPage = req->maxPage;
232 if (req->maxPage != RNIL && req->maxPage > frag.m_max_page_cnt)
233 {
234 DEB_NR_SCAN(("%u %u endPage: %u (noOfPages: %u maxPage: %u)",
235 tablePtr.i,
236 fragId,
237 req->maxPage,
238 fragPtr.p->noOfPages,
239 fragPtr.p->m_max_page_cnt));
240 }
241 }
242 else if (AccScanReq::getLcpScanFlag(req->requestInfo))
243 {
244 jam();
245 ndbrequire((bits & ScanOp::SCAN_DD) == 0);
246 ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
247 }
248 else
249 {
250 jam();
251 scanPtr.p->m_endPage = RNIL;
252 }
253
254 if (bits & ScanOp::SCAN_VS)
255 {
256 jam();
257 ndbrequire((bits & ScanOp::SCAN_NR) == 0);
258 ndbrequire((bits & ScanOp::SCAN_LCP) == 0);
259 }
260
261 // set up scan op
262 ScanOp& scan = *scanPtr.p;
263 scan.m_state = ScanOp::First;
264 scan.m_bits = bits;
265 scan.m_userPtr = req->senderData;
266 scan.m_userRef = req->senderRef;
267 scan.m_tableId = tablePtr.i;
268 scan.m_fragId = frag.fragmentId;
269 scan.m_fragPtrI = fragPtr.i;
270 scan.m_transId1 = req->transId1;
271 scan.m_transId2 = req->transId2;
272 scan.m_savePointId = req->savePointId;
273 scan.m_accLockOp = RNIL;
274 scan.m_last_seen = __LINE__;
275
276 // conf
277 AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
278 conf->scanPtr = req->senderData;
279 conf->accPtr = scanPtr.i;
280 conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
281 signal->theData[8] = 0;
282 /* Return ACC_SCANCONF */
283 return;
284 } while (0);
285 signal->theData[8] = AccScanRef::TupNoFreeScanOp; /* Failure */
286 /* Return ACC_SCANREF */
287 }
288
289 void
execNEXT_SCANREQ(Signal * signal)290 Dbtup::execNEXT_SCANREQ(Signal* signal)
291 {
292 jamEntryDebug();
293 const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
294 const NextScanReq* const req = &reqCopy;
295 ScanOpPtr scanPtr;
296 scanPtr.i = req->accPtr;
297 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
298 ScanOp& scan = *scanPtr.p;
299 switch (req->scanFlag) {
300 case NextScanReq::ZSCAN_NEXT:
301 jam();
302 break;
303 case NextScanReq::ZSCAN_COMMIT:
304 jam();
305 // Fall through
306 case NextScanReq::ZSCAN_NEXT_COMMIT:
307 jam();
308 if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
309 jam();
310 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
311 lockReq->returnCode = RNIL;
312 lockReq->requestInfo = AccLockReq::Unlock;
313 lockReq->accOpPtr = req->accOperationPtr;
314 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
315 signal, AccLockReq::UndoSignalLength);
316 jamEntry();
317 ndbrequire(lockReq->returnCode == AccLockReq::Success);
318 removeAccLockOp(scan, req->accOperationPtr);
319 }
320 if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
321 signal->theData[0] = 0; /* Success */
322 /**
323 * signal->theData[0] = 0 means return signal
324 * NEXT_SCANCONF for NextScanReq::ZSCAN_COMMIT
325 */
326 return;
327 }
328 break;
329 case NextScanReq::ZSCAN_CLOSE:
330 jam();
331 if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
332 jam();
333 ndbrequire(scan.m_accLockOp != RNIL);
334 // use ACC_ABORTCONF to flush out any reply in job buffer
335 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
336 lockReq->returnCode = RNIL;
337 lockReq->requestInfo = AccLockReq::AbortWithConf;
338 lockReq->accOpPtr = scan.m_accLockOp;
339 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
340 signal, AccLockReq::UndoSignalLength);
341 jamEntry();
342 ndbrequire(lockReq->returnCode == AccLockReq::Success);
343 scan.m_last_seen = __LINE__;
344 scan.m_state = ScanOp::Aborting;
345 return;
346 }
347 if (scan.m_state == ScanOp::Locked) {
348 jam();
349 ndbrequire(scan.m_accLockOp != RNIL);
350 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
351 lockReq->returnCode = RNIL;
352 lockReq->requestInfo = AccLockReq::Abort;
353 lockReq->accOpPtr = scan.m_accLockOp;
354 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
355 signal, AccLockReq::UndoSignalLength);
356 jamEntry();
357 ndbrequire(lockReq->returnCode == AccLockReq::Success);
358 scan.m_accLockOp = RNIL;
359 }
360 scan.m_last_seen = __LINE__;
361 scan.m_state = ScanOp::Aborting;
362 scanClose(signal, scanPtr);
363 return;
364 case NextScanReq::ZSCAN_NEXT_ABORT:
365 ndbabort();
366 default:
367 ndbabort();
368 }
369 // start looking for next scan result
370 AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
371 checkReq->accPtr = scanPtr.i;
372 checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
373 EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
374 jamEntryDebug();
375 }
376
377 void
execACC_CHECK_SCAN(Signal * signal)378 Dbtup::execACC_CHECK_SCAN(Signal* signal)
379 {
380 jamEntryDebug();
381 const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
382 const AccCheckScan* const req = &reqCopy;
383 ScanOpPtr scanPtr;
384 scanPtr.i = req->accPtr;
385 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
386 ScanOp& scan = *scanPtr.p;
387 // fragment
388 FragrecordPtr fragPtr;
389 fragPtr.i = scan.m_fragPtrI;
390 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
391 Fragrecord& frag = *fragPtr.p;
392 bool wait_for_scan_lock_record = false;
393 if (scan.m_bits & ScanOp::SCAN_LOCK &&
394 c_freeScanLock == RNIL)
395 {
396 ScanLockPtr allocPtr;
397 if (likely((scan.m_bits & ScanOp::SCAN_COPY_FRAG) == 0))
398 {
399 if (likely(c_scanLockPool.seize(allocPtr)))
400 {
401 c_freeScanLock = allocPtr.i;
402 }
403 else
404 {
405 jam();
406 wait_for_scan_lock_record = true;
407 }
408 }
409 else
410 {
411 jam();
412 /**
413 * Copy fragment scans use a preallocated scan lock record to avoid
414 * risk of not getting a scan lock record.
415 */
416 c_freeScanLock = c_copy_frag_scan_lock;
417 }
418 }
419 if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP &&
420 (scan.m_bits & ScanOp::SCAN_LOCK_WAIT ||
421 wait_for_scan_lock_record))
422 {
423 /**
424 * Go to sleep for 1 millisecond while we are waiting for a
425 * row lock or the scan lock record to store the row lock in.
426 *
427 * Could also be that we are waiting for a lock record to become
428 * available.
429 */
430 jam();
431 CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
432 cls->scanPtrI = scan.m_userPtr;
433 if (wait_for_scan_lock_record)
434 {
435 jam();
436 cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT_STOPPABLE;
437 }
438 else
439 {
440 jam();
441 cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT;
442 }
443 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
444 if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
445 {
446 jamEntry();
447 release_c_free_scan_lock();
448 return;
449 }
450 jamEntry();
451 ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
452 /* Fall through, we will send NEXT_SCANCONF, this will detect close */
453 }
454 if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT ||
455 wait_for_scan_lock_record)
456 {
457 jam();
458 /**
459 * LQH asks if we are waiting for lock and we tell it to ask again
460 * The reason to go back to LQH here is to ensure that the scan can be
461 * closed if TC asked LQH to close the scan in the middle of the scan
462 * process.
463 * We go this path also when we could not allocate a lock record and
464 * it is time to go to LQH to check status before we go to sleep.
465 */
466 release_c_free_scan_lock();
467 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
468 conf->scanPtr = scan.m_userPtr;
469 conf->accOperationPtr = RNIL; // no tuple returned
470 conf->fragId = frag.fragmentId;
471 // if TC has ordered scan close, it will be detected here
472 sendSignal(scan.m_userRef,
473 GSN_NEXT_SCANCONF,
474 signal,
475 NextScanConf::SignalLengthNoTuple,
476 JBB);
477 return; // stop
478 }
479
480 const bool lcp = (scan.m_bits & ScanOp::SCAN_LCP);
481
482 if (scan.m_state == ScanOp::First)
483 {
484 if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
485 {
486 jam();
487 /**
488 * Handle lcp keep list already here
489 * So that scan state is not altered
490 * if lcp_keep rows are found in ScanOp::First
491 */
492 scan.m_last_seen = __LINE__;
493 handle_lcp_keep(signal, fragPtr, scanPtr.p);
494 release_c_free_scan_lock();
495 return;
496 }
497 jam();
498 scanFirst(signal, scanPtr);
499 }
500 if (scan.m_state == ScanOp::Next)
501 {
502 jam();
503 bool immediate = scanNext(signal, scanPtr);
504 if (! immediate) {
505 jam();
506 // time-slicing via TUP or PGMAN
507 release_c_free_scan_lock();
508 return;
509 }
510 jam();
511 }
512 scanReply(signal, scanPtr);
513 }
514
515 void
scanReply(Signal * signal,ScanOpPtr scanPtr)516 Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
517 {
518 ScanOp& scan = *scanPtr.p;
519 FragrecordPtr fragPtr;
520 fragPtr.i = scan.m_fragPtrI;
521 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
522 Fragrecord& frag = *fragPtr.p;
523 // for reading tuple key in Current state
524 Uint32* pkData = (Uint32*)c_dataBuffer;
525 unsigned pkSize = 0;
526 if (scan.m_state == ScanOp::Current) {
527 // found an entry to return
528 jamDebug();
529 ndbrequire(scan.m_accLockOp == RNIL);
530 Uint32 scan_bits = scan.m_bits;
531 if (scan_bits & ScanOp::SCAN_LOCK)
532 {
533 jam();
534 ndbrequire((scan_bits & ScanOp::SCAN_LCP) == 0);
535 scan.m_last_seen = __LINE__;
536 // read tuple key - use TUX routine
537 const ScanPos& pos = scan.m_scanPos;
538 const Local_key& key_mm = pos.m_key_mm;
539 TablerecPtr tablePtr;
540 tablePtr.i = fragPtr.p->fragTableId;
541 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
542 int ret = tuxReadPk((Uint32*)fragPtr.p,
543 (Uint32*)tablePtr.p,
544 pos.m_realpid_mm,
545 key_mm.m_page_idx,
546 pkData, true);
547 ndbrequire(ret > 0);
548 pkSize = ret;
549 dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
550 // get read lock or exclusive lock
551 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
552 lockReq->returnCode = RNIL;
553 lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
554 AccLockReq::LockShared : AccLockReq::LockExclusive;
555 lockReq->accOpPtr = RNIL;
556 lockReq->userPtr = scanPtr.i;
557 lockReq->userRef = reference();
558 lockReq->tableId = scan.m_tableId;
559 lockReq->fragId = frag.fragmentId;
560 lockReq->fragPtrI = RNIL; // no cached frag ptr yet
561 lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
562 lockReq->page_id = key_mm.m_page_no;
563 lockReq->page_idx = key_mm.m_page_idx;
564 lockReq->transId1 = scan.m_transId1;
565 lockReq->transId2 = scan.m_transId2;
566 lockReq->isCopyFragScan = ((scan.m_bits & ScanOp::SCAN_COPY_FRAG) != 0);
567 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
568 signal, AccLockReq::LockSignalLength);
569 jamEntryDebug();
570 switch (lockReq->returnCode) {
571 case AccLockReq::Success:
572 {
573 jam();
574 scan.m_state = ScanOp::Locked;
575 scan.m_accLockOp = lockReq->accOpPtr;
576 break;
577 }
578 case AccLockReq::IsBlocked:
579 {
580 jam();
581 // normal lock wait
582 scan.m_state = ScanOp::Blocked;
583 scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
584 scan.m_accLockOp = lockReq->accOpPtr;
585 // LQH will wake us up
586 CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
587 cls->scanPtrI = scan.m_userPtr;
588 cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT;
589 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
590 if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
591 {
592 jamEntry();
593 /* Normal path */
594 release_c_free_scan_lock();
595 return;
596 }
597 jamEntry();
598 /* DBTC has most likely aborted due to timeout */
599 ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
600 /* Ensure that we send NEXT_SCANCONF immediately to close */
601 scan.m_state = ScanOp::Last;
602 break;
603 }
604 case AccLockReq::Refused:
605 {
606 jam();
607 // we cannot see deleted tuple (assert only)
608 ndbassert(false);
609 // skip it
610 scan.m_state = ScanOp::Next;
611 CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
612 cls->scanPtrI = scan.m_userPtr;
613 cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT;
614 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
615 if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
616 {
617 jamEntry();
618 release_c_free_scan_lock();
619 return;
620 }
621 jamEntry();
622 ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
623 /* Ensure that we send NEXT_SCANCONF immediately to close */
624 scan.m_state = ScanOp::Last;
625 break;
626 ndbassert(signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK);
627 return;
628 }
629 case AccLockReq::NoFreeOp:
630 {
631 jam();
632 // stay in Current state
633 ndbrequire((scan.m_bits & ScanOp::SCAN_COPY_FRAG) == 0);
634 scan.m_state = ScanOp::Current;
635 CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
636 cls->scanPtrI = scan.m_userPtr;
637 cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT_STOPPABLE;
638 EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
639 if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
640 {
641 jamEntry();
642 release_c_free_scan_lock();
643 return;
644 }
645 jamEntry();
646 ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
647 /* Ensure that we send NEXT_SCANCONF immediately to close */
648 scan.m_state = ScanOp::Last;
649 break;
650 }
651 default:
652 ndbabort();
653 }
654 ndbassert(c_freeScanLock != RNIL);
655 }
656 else
657 {
658 ndbassert(c_freeScanLock == RNIL);
659 scan.m_state = ScanOp::Locked;
660 }
661 }
662
663 if (scan.m_state == ScanOp::Locked)
664 {
665 // we have lock or do not need one
666 jamDebug();
667 // conf signal
668 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
669 conf->scanPtr = scan.m_userPtr;
670 // the lock is passed to LQH
671 Uint32 accLockOp = scan.m_accLockOp;
672 if (accLockOp != RNIL) {
673 scan.m_accLockOp = RNIL;
674 // remember it until LQH unlocks it
675 addAccLockOp(scan, accLockOp);
676 scan.m_last_seen = __LINE__;
677 } else {
678 ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
679 // operation RNIL in LQH would signal no tuple returned
680 accLockOp = (Uint32)-1;
681 scan.m_last_seen = __LINE__;
682 }
683 release_c_free_scan_lock();
684 const ScanPos& pos = scan.m_scanPos;
685 conf->accOperationPtr = accLockOp;
686 conf->fragId = frag.fragmentId;
687 conf->localKey[0] = pos.m_key_mm.m_page_no;
688 conf->localKey[1] = pos.m_key_mm.m_page_idx;
689 // next time look for next entry
690 scan.m_state = ScanOp::Next;
691 prepare_scanTUPKEYREQ(pos.m_key_mm.m_page_no,
692 pos.m_key_mm.m_page_idx);
693 /**
694 * Running the lock code takes some extra execution time, one could
695 * have this effect the number of tuples to read in one time slot.
696 * We decided to ignore this here.
697 */
698 signal->setLength(NextScanConf::SignalLengthNoGCI);
699 c_lqh->exec_next_scan_conf(signal);
700 return;
701 }
702 if (scan.m_state == ScanOp::Last)
703 {
704 jam();
705 release_c_free_scan_lock();
706 scan.m_last_seen = __LINE__;
707 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
708 conf->scanPtr = scan.m_userPtr;
709 conf->accOperationPtr = RNIL;
710 conf->fragId = RNIL;
711 signal->setLength(NextScanConf::SignalLengthNoTuple);
712 c_lqh->exec_next_scan_conf(signal);
713 return;
714 }
715 else if (scan.m_state == ScanOp::Invalid)
716 {
717 jam();
718 scan.m_last_seen = __LINE__;
719 NextScanRef* const ref = (NextScanRef*)signal->getDataPtrSend();
720 ref->scanPtr = scan.m_userPtr;
721 ref->errorCode = m_scan_error_code;
722 c_lqh->exec_next_scan_ref(signal);
723 return;
724 }
725 ndbabort();
726 }
727
728 /*
729 * Lock succeeded (after delay) in ACC. If the lock is for current
730 * entry, set state to Locked. If the lock is for an entry we were
731 * moved away from, simply unlock it. Finally, if we are closing the
732 * scan, do nothing since we have already sent an abort request.
733 */
734 void
execACCKEYCONF(Signal * signal)735 Dbtup::execACCKEYCONF(Signal* signal)
736 {
737 jamEntry();
738 ScanOpPtr scanPtr;
739 scanPtr.i = signal->theData[0];
740
741 Uint32 localKey1 = signal->theData[3];
742 Uint32 localKey2 = signal->theData[4];
743 Local_key tmp;
744 tmp.m_page_no = localKey1;
745 tmp.m_page_idx = localKey2;
746
747 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
748 ScanOp& scan = *scanPtr.p;
749 ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
750 scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
751 if (scan.m_state == ScanOp::Blocked) {
752 // the lock wait was for current entry
753 jam();
754
755 if (likely(scan.m_scanPos.m_key_mm.m_page_no == tmp.m_page_no &&
756 scan.m_scanPos.m_key_mm.m_page_idx == tmp.m_page_idx))
757 {
758 jam();
759 scan.m_state = ScanOp::Locked;
760 // LQH has the ball
761 return;
762 }
763 else
764 {
765 jam();
766 /**
767 * This means that there was DEL/INS on rowid that we tried to lock
768 * and the primary key that was previously located on this rowid
769 * (scanPos.m_key_mm) has moved.
770 * (DBACC keeps of track of primary keys)
771 *
772 * We don't care about the primary keys, but is interested in ROWID
773 * so rescan this position.
774 * Which is implemented by using execACCKEYREF...
775 */
776 ndbout << "execACCKEYCONF "
777 << scan.m_scanPos.m_key_mm
778 << " != " << tmp << " ";
779 scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
780 execACCKEYREF(signal);
781 return;
782 }
783 }
784
785 if (scan.m_state != ScanOp::Aborting) {
786 // we were moved, release lock
787 jam();
788 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
789 lockReq->returnCode = RNIL;
790 lockReq->requestInfo = AccLockReq::Abort;
791 lockReq->accOpPtr = scan.m_accLockOp;
792 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
793 jamEntry();
794 ndbrequire(lockReq->returnCode == AccLockReq::Success);
795 scan.m_accLockOp = RNIL;
796 // LQH has the ball
797 return;
798 }
799 // lose the lock
800 scan.m_accLockOp = RNIL;
801 // continue at ACC_ABORTCONF
802 }
803
804 /*
805 * Lock failed (after delay) in ACC. Probably means somebody ahead of
806 * us in lock queue deleted the tuple.
807 */
808 void
execACCKEYREF(Signal * signal)809 Dbtup::execACCKEYREF(Signal* signal)
810 {
811 jamEntry();
812 ScanOpPtr scanPtr;
813 scanPtr.i = signal->theData[0];
814 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
815 ScanOp& scan = *scanPtr.p;
816 ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
817 scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
818 if (scan.m_state != ScanOp::Aborting) {
819 jam();
820 // release the operation
821 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
822 lockReq->returnCode = RNIL;
823 lockReq->requestInfo = AccLockReq::Abort;
824 lockReq->accOpPtr = scan.m_accLockOp;
825 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
826 jamEntry();
827 ndbrequire(lockReq->returnCode == AccLockReq::Success);
828 scan.m_accLockOp = RNIL;
829 // scan position should already have been moved (assert only)
830 if (scan.m_state == ScanOp::Blocked) {
831 jam();
832 //ndbassert(false);
833 if (scan.m_bits & ScanOp::SCAN_NR)
834 {
835 jam();
836 /**
837 * The tuple was locked and the transaction aborted. We need
838 * to re-read the tuple again to ensure that we don't miss
839 * out on deleting rows in the starting node that no longer
840 * exists in the live node.
841 */
842 scan.m_state = ScanOp::Next;
843 scan.m_scanPos.m_get = ScanPos::Get_tuple;
844 DEB_NR_SCAN(("Ignoring scan.m_state == ScanOp::Blocked, refetch"));
845 }
846 else
847 {
848 jam();
849 scan.m_state = ScanOp::Next;
850 DEB_NR_SCAN(("Ignoring scan.m_state == ScanOp::Blocked"));
851 }
852 }
853 // LQH has the ball
854 return;
855 }
856 // lose the lock
857 scan.m_accLockOp = RNIL;
858 // continue at ACC_ABORTCONF
859 }
860
861 /*
862 * Received when scan is closing. This signal arrives after any
863 * ACCKEYCON or ACCKEYREF which may have been in job buffer.
864 */
865 void
execACC_ABORTCONF(Signal * signal)866 Dbtup::execACC_ABORTCONF(Signal* signal)
867 {
868 jamEntry();
869 ScanOpPtr scanPtr;
870 scanPtr.i = signal->theData[0];
871 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
872 ScanOp& scan = *scanPtr.p;
873 ndbrequire(scan.m_state == ScanOp::Aborting);
874 c_lqh->setup_scan_pointers(scan.m_userPtr);
875 // most likely we are still in lock wait
876 if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
877 jam();
878 scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
879 scan.m_accLockOp = RNIL;
880 }
881 scanClose(signal, scanPtr);
882 }
883
884 void
scanFirst(Signal *,ScanOpPtr scanPtr)885 Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
886 {
887 ScanOp& scan = *scanPtr.p;
888 ScanPos& pos = scan.m_scanPos;
889 Local_key& key = pos.m_key;
890 const Uint32 bits = scan.m_bits;
891 // fragment
892 FragrecordPtr fragPtr;
893 fragPtr.i = scan.m_fragPtrI;
894 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
895 Fragrecord& frag = *fragPtr.p;
896
897 if (bits & ScanOp::SCAN_NR)
898 {
899 if (scan.m_endPage == 0 && frag.m_max_page_cnt == 0)
900 {
901 jam();
902 scan.m_state = ScanOp::Last;
903 return;
904 }
905 }
906 else if (frag.noOfPages == 0)
907 {
908 jam();
909 if (!(bits & ScanOp::SCAN_LCP))
910 {
911 jam();
912 scan.m_state = ScanOp::Last;
913 return;
914 }
915 /**
916 * LCP scans will have to go through all pages even if no pages are still
917 * remaining to ensure that we reset the LCP scanned bits that possibly
918 * have been set before arriving here.
919 */
920 }
921
922 if (bits & ScanOp::SCAN_LCP)
923 {
924 jam();
925 if (scan.m_endPage == 0)
926 {
927 jam();
928 /**
929 * Partition was empty at start of LCP, no records to report.
930 * In this case we cannot have set any LCP scanned bit since
931 * no page was around in table when the scan was started.
932 */
933 scan.m_last_seen = __LINE__;
934 scan.m_state = ScanOp::Last;
935 return;
936 }
937 c_backup->init_lcp_scan(scan.m_scanGCI,
938 pos.m_lcp_scan_changed_rows_page);
939 scan.m_last_seen = __LINE__;
940 }
941
942 if (! (bits & ScanOp::SCAN_DD)) {
943 key.m_file_no = ZNIL;
944 key.m_page_no = 0;
945 pos.m_get = ScanPos::Get_page_mm;
946
947 // for MM scan real page id is cached for efficiency
948 pos.m_realpid_mm = RNIL;
949 } else {
950 Disk_alloc_info& alloc = frag.m_disk_alloc_info;
951 // for now must check disk part explicitly
952 if (alloc.m_extent_list.isEmpty()) {
953 jam();
954 scan.m_state = ScanOp::Last;
955 return;
956 }
957 pos.m_extent_info_ptr_i = alloc.m_extent_list.getFirst();
958 Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
959 key.m_file_no = ext->m_key.m_file_no;
960 key.m_page_no = ext->m_first_page_no;
961 pos.m_get = ScanPos::Get_page_dd;
962 }
963 key.m_page_idx = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
964 // let scanNext() do the work
965 scan.m_state = ScanOp::Next;
966 }
967
968 #define ZSCAN_FOUND_TUPLE 1
969 #define ZSCAN_FOUND_DELETED_ROWID 2
970 #define ZSCAN_FOUND_PAGE_END 3
971 #define ZSCAN_FOUND_DROPPED_CHANGE_PAGE 4
972 #define ZSCAN_FOUND_NEXT_ROW 5
973 /**
974 * Start a scan of a page in LCP scan
975 * ----------------------------------
976 * We have seven options here for LCP scans:
977 * 1) The page entry is empty and was empty at start of
978 * LCP. In this case there is no flag set in the page
979 * map indicating that page was dropped since last it
980 * was dropped.
981 * 1a) The page was belonging to the CHANGED ROWS pages and the
982 * last LCP state was A. In this case we need to record a
983 * DELETE by PAGEID in the LCP.
984 *
985 * 1b) The page belonged to the CHANGED ROWS pages and the last
986 * LCP state was D. In this case we can ignore the page.
987 *
988 * 1c) The page was belonging to the ALL ROWS category.
989 * We can ignore it since we only record rows existing at start of
990 * the LCP.
991 * Then we continue with the next page.
992 *
993 * 2) The page entry is empty and it was recorded as being
994 * dropped since the LCP started. In this case the LCP scan
995 * have already taken care of this page, the needed information
996 * was sent to the LCP scan through the LCP keep list.
997 * 3) The page entry was not empty but the page map indicates
998 * that the page was dropped after the LCP scan started. In this
999 * tricky case the LCP scan started, the page was dropped, the
1000 * page was resurrected again and finally now we come here to
1001 * handle the page. Again in this case we can move on since the
1002 * page was handled at the time the page was dropped.
1003 *
1004 * 2) and 3) are found through either the LCP_SCANNED_BIT being
1005 * set in the page map, or by the page_to_skip_lcp bit being set
1006 * on the page object.
1007 *
1008 * 4) The page entry is non-empty. This is the normal page
1009 * handling where we scan one row at a time.
1010 *
1011 * Finally the case 4) can have four distinct options as well.
1012 * 4a) The page existed before the LCP started and had rows
1013 * in it that need to checked one by one. This is the normal
1014 * case and by far the most commonly executed.
1015 *
1016 * 4b) The page did not exist before the LCP scan was started, but
1017 * it was allocated after the LCP scan started and before we scanned
1018 * it (thus got the LCP skip bit set on the page). It belonged to
1019 * the ALL ROWS pages and thus the page will be skipped.
1020 *
1021 * Discovered either by LCP_SCANNED_BIT or by page_to_skip_lcp bit
1022 * being set on the page.
1023 *
1024 * 4c) Same as 4b) except that it belongs to the CHANGED ROWS pages.
1025 * Also the last LCP state was D. Page is ignored.
1026 *
1027 * 4d) Same as 4c) except that last LCP state was A. In this we
1028 * record the page as a DELETE by PAGEID in the LCP.
1029 */
1030 Uint32
prepare_lcp_scan_page(ScanOp & scan,Local_key & key,Uint32 * next_ptr,Uint32 * prev_ptr)1031 Dbtup::prepare_lcp_scan_page(ScanOp& scan,
1032 Local_key& key,
1033 Uint32 *next_ptr,
1034 Uint32 *prev_ptr)
1035 {
1036 ScanPos& pos = scan.m_scanPos;
1037 bool lcp_page_already_scanned = get_lcp_scanned_bit(next_ptr);
1038 if (lcp_page_already_scanned)
1039 {
1040 jam();
1041 /* Coverage tested */
1042 #ifdef DEBUG_LCP_SCANNED_BIT
1043 if (next_ptr)
1044 {
1045 g_eventLogger->info("(%u)tab(%u,%u).%u"
1046 " reset_lcp_scanned_bit(2)",
1047 instance(),
1048 m_curr_fragptr.p->fragTableId,
1049 m_curr_fragptr.p->fragmentId,
1050 key.m_page_no);
1051 }
1052 #endif
1053 reset_lcp_scanned_bit(next_ptr);
1054 c_backup->skip_page_lcp_scanned_bit();
1055 /* Either 2) or 3) as described above */
1056 /**
1057 * No state in page map to update, the page hasn't been
1058 * defined yet, so the position in page map is empty.
1059 */
1060 pos.m_get = ScanPos::Get_next_page_mm;
1061 scan.m_last_seen = __LINE__;
1062 return ZSCAN_FOUND_PAGE_END; // incr loop count
1063 }
1064 else if (unlikely(pos.m_realpid_mm == RNIL))
1065 {
1066 bool is_last_lcp_state_A = !get_last_lcp_state(prev_ptr);
1067 bool need_record_dropped_change =
1068 pos.m_lcp_scan_changed_rows_page && is_last_lcp_state_A;
1069 /**
1070 * Case 1) from above
1071 * If we come here without having LCP_SCANNED_BIT set then
1072 * we haven't released the page during LCP scan. Thus the
1073 * new last LCP state is D. Ensure that LAST_LCP_FREE_BIT
1074 * is set to indicate that LCP state is D for this LCP.
1075 */
1076 DEB_LCP_DEL2(("(%u)tab(%u,%u) page(%u),"
1077 " is_last_lcp_state_A: %u, CHANGED: %u",
1078 instance(),
1079 m_curr_fragptr.p->fragTableId,
1080 m_curr_fragptr.p->fragmentId,
1081 key.m_page_no,
1082 is_last_lcp_state_A,
1083 pos.m_lcp_scan_changed_rows_page));
1084
1085 set_last_lcp_state(prev_ptr, true);
1086 if (!need_record_dropped_change)
1087 {
1088 jam();
1089 /* Coverage tested */
1090 /* LCP case 1b) and 1c) above goes this way */
1091 scan.m_last_seen = __LINE__;
1092 pos.m_get = ScanPos::Get_next_page_mm;
1093 c_backup->skip_empty_page_lcp();
1094 return ZSCAN_FOUND_PAGE_END; // incr loop count
1095 }
1096 else
1097 {
1098 jam();
1099 /* Coverage tested */
1100 /* 1a) as described above */
1101 scan.m_last_seen = __LINE__;
1102 pos.m_get = ScanPos::Get_next_page_mm;
1103 c_backup->record_dropped_empty_page_lcp();
1104 return ZSCAN_FOUND_DROPPED_CHANGE_PAGE;
1105 }
1106 }
1107 else
1108 {
1109 jam();
1110 /**
1111 * Case 4) above, we need to set the last LCP state flag
1112 * on the pos object to ensure that we know when a row
1113 * needs to be DELETE by ROWID or if it needs to be ignored.
1114 */
1115 pos.m_is_last_lcp_state_D = get_last_lcp_state(prev_ptr);
1116 scan.m_last_seen = __LINE__;
1117 }
1118 return ZSCAN_FOUND_TUPLE;
1119 }
1120
1121 Uint32
handle_lcp_skip_page(ScanOp & scan,Local_key key,Page * page)1122 Dbtup::handle_lcp_skip_page(ScanOp& scan,
1123 Local_key key,
1124 Page* page)
1125 {
1126 ScanPos& pos = scan.m_scanPos;
1127 /**
1128 * The page was allocated after the LCP started, so it can only
1129 * contain rows that was allocated after start of LCP and should
1130 * thus not be part of LCP. It is case 4b), 4c) or 4d). We need to
1131 * clear the skip bit on the page. We need to get the old lcp state
1132 * to be able to decide if it is 4c) or 4d). We also need to set
1133 * the last LCP* state to D.
1134 */
1135 DEB_LCP_SKIP(("(%u)Clear LCP_SKIP on tab(%u,%u), page(%u), change: %u, D: %u",
1136 instance(),
1137 m_curr_fragptr.p->fragTableId,
1138 m_curr_fragptr.p->fragmentId,
1139 key.m_page_no,
1140 pos.m_lcp_scan_changed_rows_page,
1141 pos.m_is_last_lcp_state_D));
1142
1143 page->clear_page_to_skip_lcp();
1144 set_last_lcp_state(m_curr_fragptr.p,
1145 key.m_page_no,
1146 true /* Set state to D */);
1147
1148 if (pos.m_lcp_scan_changed_rows_page && !pos.m_is_last_lcp_state_D)
1149 {
1150 jam();
1151 /* Coverage tested */
1152 /**
1153 * Case 4d) from above
1154 * At start of LCP the page was dropped, we have information that
1155 * the page was dropped after the previous LCP. Thus we need to
1156 * record the entire page as DELETE by PAGEID.
1157 */
1158 scan.m_last_seen = __LINE__;
1159 pos.m_get = ScanPos::Get_next_page_mm;
1160 c_backup->record_late_alloc_page_lcp();
1161 return ZSCAN_FOUND_DROPPED_CHANGE_PAGE;
1162 }
1163 jam();
1164 /* Coverage tested */
1165 /**
1166 * Case 4b) and 4c) from above
1167 * For ALL ROWS pages the rows should be skipped for LCP, we clear
1168 * the LCP skip flag on page in this case to speed up skipping.
1169 *
1170 * We need to keep track of the state Get_next_page_mm when checking
1171 * if a rowid is part of the remaining lcp set. If we do a real-time
1172 * break right after setting Get_next_page_mm we need to move the
1173 * page number forward one step since we have actually completed the
1174 * current page number.
1175 */
1176 scan.m_last_seen = __LINE__;
1177 pos.m_get = ScanPos::Get_next_page_mm;
1178 c_backup->page_to_skip_lcp(!pos.m_is_last_lcp_state_D);
1179 return ZSCAN_FOUND_PAGE_END; //incr loop count
1180 }
1181
1182 Uint32
handle_scan_change_page_rows(ScanOp & scan,Fix_page * fix_page,Tuple_header * tuple_header_ptr,Uint32 & foundGCI)1183 Dbtup::handle_scan_change_page_rows(ScanOp& scan,
1184 Fix_page *fix_page,
1185 Tuple_header *tuple_header_ptr,
1186 Uint32 & foundGCI)
1187 {
1188 ScanPos& pos = scan.m_scanPos;
1189 Local_key& key = pos.m_key;
1190 /**
1191 * Coming here means that the following condition is true.
1192 * bits & ScanOp::SCAN_LCP && pos.m_lcp_changed_page
1193 *
1194 * We have 3 cases here,
1195 * foundGCI == 0:
1196 * This means that the row has not been committed yet
1197 * and it has not had any previous rows in this row
1198 * id either. However the previous LCP might still have
1199 * had a row in this position since we could have
1200 * deallocated a page and allocated it again between
1201 * 2 LCPs. In this case we have to ensure that the
1202 * row id is deleted as part of the restore.
1203 *
1204 * foundGCI > scanGCI
1205 * Record has changed since last LCP
1206 * if header says tuple is free then the row is a deleted
1207 * row and we record it
1208 * otherwise it is a normal row to be recorded in normal
1209 * manner for LCPs.
1210 *
1211 * We record deleted rowid's only if scanGCI which indicates
1212 * that we are recording only changes from this row. We need
1213 * not record deleted rowids for those parts where we record
1214 * all rows.
1215 */
1216 Uint32 thbits = tuple_header_ptr->m_header_bits;
1217 if ((foundGCI = *tuple_header_ptr->get_mm_gci(m_curr_tabptr.p)) >
1218 scan.m_scanGCI)
1219 {
1220 if (unlikely(thbits & Tuple_header::LCP_DELETE))
1221 {
1222 jam();
1223 /* Ensure that LCP_DELETE bit is clear before we move on */
1224 /* Coverage tested */
1225 tuple_header_ptr->m_header_bits =
1226 thbits & (~Tuple_header::LCP_DELETE);
1227 updateChecksum(tuple_header_ptr,
1228 m_curr_tabptr.p,
1229 thbits,
1230 tuple_header_ptr->m_header_bits);
1231 fix_page->set_change_maps(key.m_page_idx);
1232 jamDebug();
1233 jamLineDebug((Uint16)key.m_page_idx);
1234 DEB_LCP_DEL(("(%u)Reset LCP_DELETE on tab(%u,%u),"
1235 " row(%u,%u), header: %x",
1236 instance(),
1237 m_curr_fragptr.p->fragTableId,
1238 m_curr_fragptr.p->fragmentId,
1239 key.m_page_no,
1240 key.m_page_idx,
1241 thbits));
1242 ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1243 scan.m_last_seen = __LINE__;
1244 return ZSCAN_FOUND_DELETED_ROWID;
1245 }
1246 else if (! (thbits & Tuple_header::FREE ||
1247 thbits & Tuple_header::DELETE_WAIT ||
1248 thbits & Tuple_header::ALLOC))
1249 {
1250 jam();
1251 /**
1252 * Tuple has changed since last LCP, we need to record
1253 * the row as a changed row unless the LCP_SKIP bit is
1254 * set on the rowid which means that the row was inserted
1255 * after starting the LCP.
1256 */
1257 scan.m_last_seen = __LINE__;
1258 return ZSCAN_FOUND_TUPLE;
1259 }
1260 else if (scan.m_scanGCI > 0 &&
1261 !(thbits & Tuple_header::LCP_SKIP))
1262 {
1263 jam();
1264 /**
1265 * We have found a row which is free, we are however scanning
1266 * CHANGED ROWS pages and thus we need to insert a DELETE by
1267 * ROWID in LCP since the page was deleted since the last
1268 * LCP was executed. We check that LCP_SKIP bit isn't set, if
1269 * LCP_SKIP bit is set it means that the tuple was deleted
1270 * since the LCP started and we have already recorded the
1271 * row present at start of LCP when the tuple was deleted.
1272 *
1273 * If we delete it after LCP start we will certainly set
1274 * the GCI on the record > scanGCI, so it is an important
1275 * check for LCP_SKIP bit set.
1276 */
1277 scan.m_last_seen = __LINE__;
1278 return ZSCAN_FOUND_DELETED_ROWID;
1279 }
1280 else if (unlikely(thbits & Tuple_header::LCP_SKIP))
1281 {
1282 /* Ensure that LCP_SKIP bit is clear before we move on */
1283 jam();
1284 /* Coverage tested */
1285 tuple_header_ptr->m_header_bits =
1286 thbits & (~Tuple_header::LCP_SKIP);
1287 DEB_LCP_SKIP(("(%u) 2 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
1288 ", header: %x",
1289 instance(),
1290 m_curr_fragptr.p->fragTableId,
1291 m_curr_fragptr.p->fragmentId,
1292 key.m_page_no,
1293 key.m_page_idx,
1294 thbits));
1295 updateChecksum(tuple_header_ptr,
1296 m_curr_tabptr.p,
1297 thbits,
1298 tuple_header_ptr->m_header_bits);
1299 fix_page->set_change_maps(key.m_page_idx);
1300 jamDebug();
1301 jamLineDebug((Uint16)key.m_page_idx);
1302 }
1303 else
1304 {
1305 jamDebug();
1306 DEB_LCP_SKIP_EXTRA(("(%u)Skipped tab(%u,%u), row(%u,%u),"
1307 " foundGCI: %u, scanGCI: %u, header: %x",
1308 instance(),
1309 m_curr_fragptr.p->fragTableId,
1310 m_curr_fragptr.p->fragmentId,
1311 key.m_page_no,
1312 key.m_page_idx,
1313 foundGCI,
1314 scan.m_scanGCI,
1315 thbits));
1316 ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1317 /* Coverage tested */
1318 }
1319 jam();
1320 scan.m_last_seen = __LINE__;
1321 /* Continue with next row */
1322 return ZSCAN_FOUND_NEXT_ROW;
1323 }
1324 else
1325 {
1326 /**
1327 * When setting LCP_DELETE flag we must also have deleted the
1328 * row and set rowGCI > scanGCI. So can't be set if we arrive
1329 * here.
1330 */
1331 if (unlikely(thbits & Tuple_header::LCP_DELETE))
1332 {
1333 g_eventLogger->info("(%u) tab(%u,%u) row(%u,%u)"
1334 " LCP_DELETE set on rowid not yet used",
1335 instance(),
1336 m_curr_fragptr.p->fragTableId,
1337 m_curr_fragptr.p->fragmentId,
1338 key.m_page_no,
1339 key.m_page_idx);
1340 ndbrequire(!(thbits & Tuple_header::LCP_DELETE));
1341 }
1342 if (foundGCI == 0 && thbits & Tuple_header::LCP_SKIP)
1343 {
1344 jam();
1345 /* Coverage tested */
1346 /**
1347 * BUG28372628:
1348 * ------------
1349 * LCP_SKIP flag is set when we perform a DELETE of a row
1350 * while an LCP is ongoing. During normal traffic operations
1351 * this means that the GCI is set to the GCI of the transaction.
1352 * The only other case where we can set LCP_SKIP is when a
1353 * DELETE operation arrives as part of COPY FRAG from live node
1354 * to starting node.
1355 *
1356 * In this case the GCI is set to the same GCI that the row in
1357 * the starting node have. If the GCI on the starting node is
1358 * not 0, then the GCI is always bigger than the GCI we are
1359 * storing locally, so we won't arrive in this path.
1360 *
1361 * There is however a case where the GCI is 0 in the live node.
1362 * This happens when the row has the state FREE_RECORD. This
1363 * means that the row is in a new page and the row hasn't been
1364 * used yet.
1365 * In this case we need to copy the row over to the starting node
1366 * to ensure that the row is deleted if it exists on the starting
1367 * node.
1368 *
1369 * If there is a row in this position AND a local LCP is ongoing,
1370 * in this case we could set the LCP_SKIP flag although the GCI
1371 * is set to 0.
1372 *
1373 * This case will only happen under the following condition.
1374 * 1) A row must have existed in this rowid before the starting node
1375 * stopped and is thus restored in the RESTORE, REBUILD, execute
1376 * REDO phase.
1377 * 2) The row must have been deleted together with all other rows
1378 * in the same page such that the page of the row is dropped.
1379 * 3) At least one row in this page must have been inserted again,
1380 * but the row in question must still be empty in the live node.
1381 * 4) A local LCP must be ongoing while COPY FRAGMENT of this
1382 * fragment is ongoing, this can only happen if we start a
1383 * full local LCP during COPY FRAGMENT. This in turn can only
1384 * happen if the UNDO log for disk data parts is filled to the
1385 * extent that we must ensure that an LCP is completed before
1386 * the COPY FRAGMENT is completed.
1387 *
1388 * If all four conditions are met we could end up here with
1389 * LCP_SKIP bit set.
1390 */
1391 tuple_header_ptr->m_header_bits =
1392 thbits & (~Tuple_header::LCP_SKIP);
1393 DEB_LCP_SKIP(("(%u) 4 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
1394 ", header: %x",
1395 instance(),
1396 m_curr_fragptr.p->fragTableId,
1397 m_curr_fragptr.p->fragmentId,
1398 key.m_page_no,
1399 key.m_page_idx,
1400 thbits));
1401 updateChecksum(tuple_header_ptr,
1402 m_curr_tabptr.p,
1403 thbits,
1404 tuple_header_ptr->m_header_bits);
1405 fix_page->set_change_maps(key.m_page_idx);
1406 jamDebug();
1407 jamLineDebug((Uint16)key.m_page_idx);
1408 ndbrequire(c_lqh->is_full_local_lcp_running());
1409 ndbrequire(c_lqh->is_full_local_lcp_running());
1410 }
1411 else if (foundGCI == 0 && scan.m_scanGCI > 0)
1412 {
1413 /* Coverage tested */
1414 jam();
1415 scan.m_last_seen = __LINE__;
1416 return ZSCAN_FOUND_DELETED_ROWID;
1417 }
1418 else
1419 {
1420 jam();
1421 /* Coverage tested */
1422 ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1423 DEB_LCP_SKIP_EXTRA(("(%u)Skipped tab(%u,%u), row(%u,%u),"
1424 " foundGCI: %u, scanGCI: %u, header: %x",
1425 instance(),
1426 m_curr_fragptr.p->fragTableId,
1427 m_curr_fragptr.p->fragmentId,
1428 key.m_page_no,
1429 key.m_page_idx,
1430 foundGCI,
1431 scan.m_scanGCI,
1432 thbits));
1433 }
1434 }
1435 scan.m_last_seen = __LINE__;
1436 return ZSCAN_FOUND_NEXT_ROW;
1437 /* Continue LCP scan, no need to handle this row in this LCP */
1438 }
1439
1440 /**
1441 * LCP scanning of CHANGE ROW pages:
1442 * ---------------------------------
1443 * The below description is implemented by the setup_change_page_for_scan and
1444 * handle_scan_change_page_rows methods.
1445 *
1446 * When scanning changed pages we only need to record those rows that actually
1447 * changed. There are two things that we need to ensure here. The first is
1448 * that we need to ensure that we restore the correct data. The second is that
1449 * we ensure that each checkpoint maintains structural consistency.
1450 *
1451 * To prove that we will restore the correct data we notice that the last
1452 * change to restore is in a previous checkpoint.
1453 *
1454 * In the previous checkpoint we wrote all rows that changed in the first GCI
1455 * that wasn't completed before we started the GCI or in any later GCI.
1456 * From this follows that we will definitely have written all changes since
1457 * the last checkpoint and even more than that.
1458 *
1459 * Given that we restore using multiple LCPs there could be a risk that we cut
1460 * away the LCP part where the changed row was recorded. This is not possible
1461 * for the following reason:
1462 * Restore of a page always start at a LCP where the page was fully written.
1463 * If this happened after the change we know that the record is there.
1464 * If the change happened after the LCP where ALL changes were recorded we
1465 * know that the LCP part is part of the restore AND we know that our change is
1466 * in this LCP part.
1467 *
1468 * From this it follows that we will restore the correct data since no changes
1469 * will be missing from the restored data.
1470 *
1471 * Next we need to verify that maintain structural consistency.This means that
1472 * we must restore exactly the set of rows that was present at the start of
1473 * the LCP that we are restoring.
1474 *
1475 * To maintain this we need to ensure that any INSERTs that happened after
1476 * start of the previous LCP but before we scanned this row is not missed due
1477 * to that no changes occurred in this page since we last scanned it. To ensure
1478 * that we don't miss those rows we will notice that those rows will always
1479 * be marked with an LCP_DELETE flag for CHANGE pages. This means that when we
1480 * encounter a row with this flag we need to set the bit in the change map to
1481 * ensure that this row is recorded in the next LCP.
1482 *
1483 * Next we need to handle DELETEs that occur after the LCP started but before
1484 * we scanned the page. All these rows have the LCP_SKIP bit set. This means
1485 * that when we encounter the LCP_SKIP for CHANGE pages we should ensure that
1486 * the row is checked also in the next LCP by setting the change map to
1487 * indicate this.
1488 *
1489 * Finally if there are so many deletes that the state on the page is deleted
1490 * since the page is dropped, this we need not worry about since this is
1491 * handled in the same manner as the original partial LCP solution. So the
1492 * proof of this applies.
1493 *
1494 * Finally UPDATEs that occur after the LCP start but before we scan the row
1495 * will be recorded in the previous LCP and will not require setting any bits
1496 * in the change map. This is in line with normal behaviour of the LCPs, the
1497 * LCP is structurally consistent with the start of the LCP (the exact same
1498 * set of rows exists that existed at start of LCP). The data is however not
1499 * necessarily consistent since we rely o* the REDO log to bring data
1500 * consistency.
1501 *
1502 * The major benefit of these change map pages comes when an entire page can
1503 * be skipped. In this case we can change scanning hundreds of rows to a
1504 * simple check of a small bitmap on the page. To handle very large databases
1505 * well we implement the bitmaps using a sort of BLOOM filter.
1506 *
1507 * We have 8 bits that indicate changes in 4 kB of the page. If this bit isn't
1508 * set we can skip an entire 4 kB part of page that could easily contain up to
1509 * a bit more than * one hundred rows.
1510 *
1511 * Finally we have a bitmap consisting of 128 bits that each means we can skip
1512 * 256 bytes at a time when a bit isn't set.
1513 *
1514 * One problem with scanning using those bitmaps is that there is a cost
1515 * attached to skipping rows since it is harder to prefetch data. Thus we will
1516 * ignore the small area change bitmap when we have enough bits set and simply
1517 * scan all rows, we will still check the large area change bitmap though
1518 * also in this case.
1519 *
1520 * One special case we need to be careful with is when a new page has been
1521 * allocated. If this new page is reusing a previously used page slot and
1522 * thus reusing row ids we need to ensure that we scan the entire page.
1523 * This is required to generate DELETE BY ROWID for all row ids not yet
1524 * inserted into (there could be old inserts into these row ids in older
1525 * LCP data files, so important to remove those to get a consistent LCP.
1526 * We solve this by always ensuring that we scan the page the first time
1527 * by setting all bits in the change map and thus ensuring that the
1528 * m_all_rows is set to true while scanning the page. We could be more
1529 * elaborate and only set it on pages that reuse a page slot or we could
1530 * even use a bit in the tuple header for it. But this method should be
1531 * good enough for now.
1532 */
1533 Uint32
setup_change_page_for_scan(ScanOp & scan,Fix_page * fix_page,Local_key & key,Uint32 size)1534 Dbtup::setup_change_page_for_scan(ScanOp& scan,
1535 Fix_page *fix_page,
1536 Local_key& key,
1537 Uint32 size)
1538 {
1539 ScanPos& pos = scan.m_scanPos;
1540 /**
1541 * This is the first row of the page, we need to decide how
1542 * to scan this page or possibly even that we don't need to
1543 * scan it at all since no changes exist on the page. No need
1544 * to check this once we started scanning the page.
1545 */
1546 if (!fix_page->get_any_changes())
1547 {
1548 /**
1549 * We only check this condition for the first row in the page.
1550 * If we passed this point we will start clearing the bits on
1551 * the page piece by piece, thus this check is only ok at the
1552 * first row of the page.
1553 *
1554 * No one has touched the page since the start of the
1555 * previous LCP. It is possible that some updates occurred
1556 * after the start of the LCP but before the previous LCP
1557 * scanned this page. These updates will have been recorded
1558 * in the previous LCP and thus as proved above will be part
1559 * of the previous LCP that will be part of the recovery
1560 * processing.
1561 */
1562 #ifdef VM_TRACE
1563 Uint32 debug_idx = key.m_page_idx;
1564 do
1565 {
1566 Tuple_header* tuple_header_ptr;
1567 tuple_header_ptr = (Tuple_header*)&fix_page->m_data[debug_idx];
1568 Uint32 thbits = tuple_header_ptr->m_header_bits;
1569 if (thbits & Tuple_header::LCP_DELETE ||
1570 thbits & Tuple_header::LCP_SKIP)
1571 {
1572 g_eventLogger->info("(%u)LCP_DELETE on page with no"
1573 " changes tab(%u,%u), page(%u,%u)"
1574 ", thbits: %x",
1575 instance(),
1576 m_curr_fragptr.p->fragTableId,
1577 m_curr_fragptr.p->fragmentId,
1578 key.m_page_no,
1579 key.m_page_idx,
1580 thbits);
1581 ndbrequire(!(thbits & Tuple_header::LCP_DELETE));
1582 ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1583 }
1584 debug_idx += size;
1585 } while ((debug_idx + size) <= Fix_page::DATA_WORDS);
1586 #endif
1587 DEB_LCP_FILTER(("(%u) tab(%u,%u) page(%u) filtered out",
1588 instance(),
1589 m_curr_fragptr.p->fragTableId,
1590 m_curr_fragptr.p->fragmentId,
1591 fix_page->frag_page_id));
1592 scan.m_last_seen = __LINE__;
1593 pos.m_get = ScanPos::Get_next_page_mm;
1594 c_backup->skip_no_change_page();
1595 return ZSCAN_FOUND_PAGE_END;
1596 }
1597 Uint32 num_changes = fix_page->get_num_changes();
1598 num_changes = 16;
1599 if (num_changes <= 15)
1600 {
1601 jam();
1602 /**
1603 * We will check every individual small area and also
1604 * check the large areas. There are only a few areas
1605 * that actually contain changes.
1606 * In this case we will not use any prefetches since
1607 * it is hard to predict which cache lines we will
1608 * actually read.
1609 *
1610 * When NDB is used with very large data sizes this
1611 * will be the most common code path since this only
1612 * looks at one individual page. If there is
1613 * 1 TB of data memory this means that we have
1614 * 32M of 32kB pages and thus the update frequency
1615 * must be at least 500M updates per LCP for the
1616 * number of changes to exceed 15 on most pages.
1617 * This is clearly not going to be the common case.
1618 *
1619 * For smaller databases with say 1 GB of data memory
1620 * there will be only 32k pages and thus around
1621 * 500k updates per LCP will be sufficient to exceed
1622 * 15 updates per page in the common case. Thus much
1623 * more likely.
1624 *
1625 * We keep the bits here until we have passed them with
1626 * the scan. Exactly the same proof that this works on
1627 * a page level now applies on the row level.
1628 *
1629 * Thus when we check the large area bit and find that no
1630 * changes have occurred we also know that no small area
1631 * bits are set, so no need to reset those. We know that
1632 * no one has touched those pages since the start of the
1633 * last LCP apart possibly from updates that doesn't change
1634 * structural consistency of the LCP.
1635 *
1636 * We initialise both the small area check index and the
1637 * large area check index to 0 to ensure that we check
1638 * already at the first row both of those areas.
1639 */
1640 pos.m_all_rows = false;
1641 pos.m_next_small_area_check_idx = 0;
1642 pos.m_next_large_area_check_idx = 0;
1643 ndbrequire(!fix_page->get_and_clear_change_while_lcp_scan());
1644 fix_page->set_page_being_lcp_scanned();
1645 }
1646 else
1647 {
1648 jam();
1649 /**
1650 * There are more than 15 parts that have changed.
1651 * In this case we expect to gain more from checking
1652 * all rows since this means that we can prefetch
1653 * memory to the CPU caches when we scan in linear
1654 * order.
1655 *
1656 * In this case we can clear the small area change map and
1657 * the large area change map already here since we won't
1658 * clear any bits during the page scan.
1659 *
1660 * With 15 changes or more the likelihhod is very high that all
1661 * 8 large areas are also set. So we will ignore checking these
1662 * to avoid extra costs attached to checking this on
1663 * each row.
1664 *
1665 * We set area check indexes to an impossible value to ensure
1666 * that we don't use those by mistake.
1667 */
1668 pos.m_all_rows = true;
1669 fix_page->clear_small_change_map();
1670 fix_page->clear_large_change_map();
1671 pos.m_next_small_area_check_idx = RNIL;
1672 pos.m_next_large_area_check_idx = RNIL;
1673 ndbassert(fix_page->verify_change_maps(jamBuffer()));
1674 }
1675 return ZSCAN_FOUND_TUPLE;
1676 }
1677
1678 Uint32
move_to_next_change_page_row(ScanOp & scan,Fix_page * fix_page,Tuple_header ** tuple_header_ptr,Uint32 & loop_count,Uint32 size)1679 Dbtup::move_to_next_change_page_row(ScanOp & scan,
1680 Fix_page *fix_page,
1681 Tuple_header **tuple_header_ptr,
1682 Uint32 & loop_count,
1683 Uint32 size)
1684 {
1685 ScanPos& pos = scan.m_scanPos;
1686 Local_key& key = pos.m_key;
1687 jam();
1688 ndbrequire(pos.m_next_large_area_check_idx != RNIL &&
1689 pos.m_next_small_area_check_idx != RNIL);
1690 do
1691 {
1692 loop_count++;
1693 if (pos.m_next_large_area_check_idx == key.m_page_idx)
1694 {
1695 jamDebug();
1696 jamLineDebug(Uint16(key.m_page_idx));
1697 pos.m_next_large_area_check_idx =
1698 fix_page->get_next_large_idx(key.m_page_idx, size);
1699 if (!fix_page->get_large_change_map(key.m_page_idx))
1700 {
1701 jamDebug();
1702 DEB_LCP_FILTER(("(%u) tab(%u,%u) page(%u) large area filtered"
1703 ", start_idx: %u",
1704 instance(),
1705 m_curr_fragptr.p->fragTableId,
1706 m_curr_fragptr.p->fragmentId,
1707 fix_page->frag_page_id,
1708 key.m_page_idx));
1709
1710 if (unlikely((pos.m_next_large_area_check_idx + size) >
1711 Fix_page::DATA_WORDS))
1712 {
1713 jamDebug();
1714 return ZSCAN_FOUND_PAGE_END;
1715 }
1716 jamDebug();
1717 /**
1718 * We have moved forward to a new large area. We assume that all
1719 * small areas we move past don't have their bits set.
1720 * It is important to start checking immediately the small area
1721 * since we have no idea if the first small area is to be checked
1722 * or not.
1723 */
1724 Uint32 next_to_check = pos.m_next_large_area_check_idx;
1725 key.m_page_idx = next_to_check;
1726 pos.m_next_small_area_check_idx = next_to_check;
1727 continue;
1728 }
1729 }
1730 if (pos.m_next_small_area_check_idx == key.m_page_idx)
1731 {
1732 jamDebug();
1733 jamLineDebug(Uint16(key.m_page_idx));
1734 pos.m_next_small_area_check_idx =
1735 fix_page->get_next_small_idx(key.m_page_idx, size);
1736 if (!fix_page->get_and_clear_change_maps(key.m_page_idx))
1737 {
1738 jamDebug();
1739 DEB_LCP_FILTER(("(%u) tab(%u,%u) page(%u) small area filtered"
1740 ", start_idx: %u",
1741 instance(),
1742 m_curr_fragptr.p->fragTableId,
1743 m_curr_fragptr.p->fragmentId,
1744 fix_page->frag_page_id,
1745 key.m_page_idx));
1746 if (unlikely((pos.m_next_small_area_check_idx + size) >
1747 Fix_page::DATA_WORDS))
1748 {
1749 jamDebug();
1750 ndbassert(fix_page->verify_change_maps(jamBuffer()));
1751 return ZSCAN_FOUND_PAGE_END;
1752 }
1753 jamDebug();
1754 ndbassert(fix_page->verify_change_maps(jamBuffer()));
1755 /**
1756 * Since 1024 is a multiple of 64 there is no risk that we move
1757 * ourselves past the next large area check.
1758 */
1759 key.m_page_idx = pos.m_next_small_area_check_idx;
1760 ndbrequire(key.m_page_idx <= pos.m_next_large_area_check_idx);
1761 continue;
1762 }
1763 }
1764 break;
1765 } while (1);
1766 (*tuple_header_ptr) = (Tuple_header*)&fix_page->m_data[key.m_page_idx];
1767 jamDebug();
1768 jamLineDebug(Uint16(key.m_page_idx));
1769 ndbassert(fix_page->verify_change_maps(jamBuffer()));
1770 return ZSCAN_FOUND_TUPLE;
1771 }
1772
1773 /**
1774 * Handling heavy insert and delete activity during LCP scans
1775 * ----------------------------------------------------------
1776 * As part of the LCP we need to record all rows that existed at the beginning
1777 * of the LCP. This means that any rows that are inserted after the LCP
1778 * started can be skipped. This is a common activity during database load
1779 * activity, so we ensure that the LCP can run quick in this case to provide
1780 * much CPU resources for the insert activity. Also important to make good
1781 * progress on LCPs to ensure that we can free REDO log space to avoid running
1782 * out of this resource.
1783 *
1784 * We use three ways to signal that a row or a set of rows is not needed to
1785 * record during an LCP.
1786 *
1787 * 1) We record the maximum page number at the start of the LCP, we never
1788 * need to scan beyond this point, there can only be pages here that
1789 * won't need recording in an LCP. We also avoid setting LCP_SKIP bits
1790 * on these pages and rows.
1791 * This will cover the common case of a small set of pages at the
1792 * start of the LCP that grows quickly during the LCP scan.
1793 *
1794 * 2) If a page was allocated after the LCP started, then it can only contain
1795 * rows that won't need recording in the LCP. If the page number was
1796 * within the maximum page number at start of LCP, and beyond the page
1797 * currently checked in LCP, then we will record the LCP skip information
1798 * in the page header. So when the LCP scan reaches this page it will
1799 * quickly move on to the next page since the page didn't have any records
1800 * eligible for LCP recording. After skipping the page we clear the LCP
1801 * skip flag since the rows should be recorded in the next LCP.
1802 *
1803 * 3) In case a row is allocated in a page that existed at start of LCP, then
1804 * we record the LCP skip information in the tuple header unless the row
1805 * has already been checked by the current LCP. We skip all rows with this
1806 * bit set and reset it to ensure that we record it in the next LCP.
1807 */
1808
1809 bool
scanNext(Signal * signal,ScanOpPtr scanPtr)1810 Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
1811 {
1812 ScanOp& scan = *scanPtr.p;
1813 ScanPos& pos = scan.m_scanPos;
1814 Local_key& key = pos.m_key;
1815 const Uint32 bits = scan.m_bits;
1816 // table
1817 TablerecPtr tablePtr;
1818 tablePtr.i = scan.m_tableId;
1819 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
1820 Tablerec& table = *tablePtr.p;
1821 m_curr_tabptr = tablePtr;
1822 // fragment
1823 FragrecordPtr fragPtr;
1824 fragPtr.i = scan.m_fragPtrI;
1825 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1826 Fragrecord& frag = *fragPtr.p;
1827 m_curr_fragptr = fragPtr;
1828 // tuple found
1829 Tuple_header* tuple_header_ptr = 0;
1830 Uint32 thbits = 0;
1831 Uint32 loop_count = 0;
1832 Uint32 foundGCI;
1833
1834 const bool mm_index = (bits & ScanOp::SCAN_DD);
1835 const bool lcp = (bits & ScanOp::SCAN_LCP);
1836
1837 const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0) ?
1838 table.m_offsets[mm_index].m_fix_header_size : 1;
1839 const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
1840
1841 if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
1842 {
1843 jam();
1844 /**
1845 * Handle lcp keep list here too, due to scanCont
1846 */
1847 /* Coverage tested */
1848 handle_lcp_keep(signal, fragPtr, scanPtr.p);
1849 scan.m_last_seen = __LINE__;
1850 return false;
1851 }
1852
1853 switch(pos.m_get){
1854 case ScanPos::Get_next_tuple:
1855 jam();
1856 key.m_page_idx += size;
1857 pos.m_get = ScanPos::Get_page;
1858 pos.m_realpid_mm = RNIL;
1859 break;
1860 case ScanPos::Get_tuple:
1861 jam();
1862 /**
1863 * We need to refetch page after timeslice
1864 */
1865 pos.m_get = ScanPos::Get_page;
1866 pos.m_realpid_mm = RNIL;
1867 break;
1868 default:
1869 break;
1870 }
1871
1872 while (true) {
1873 switch (pos.m_get) {
1874 case ScanPos::Get_next_page:
1875 // move to next page
1876 jam();
1877 {
1878 if (! (bits & ScanOp::SCAN_DD))
1879 pos.m_get = ScanPos::Get_next_page_mm;
1880 else
1881 pos.m_get = ScanPos::Get_next_page_dd;
1882 }
1883 continue;
1884 case ScanPos::Get_page:
1885 // get real page
1886 jam();
1887 {
1888 if (! (bits & ScanOp::SCAN_DD))
1889 pos.m_get = ScanPos::Get_page_mm;
1890 else
1891 pos.m_get = ScanPos::Get_page_dd;
1892 }
1893 continue;
1894 case ScanPos::Get_next_page_mm:
1895 // move to next logical TUP page
1896 jam();
1897 {
1898 /**
1899 * Code for future activation, see below for more details.
1900 * bool break_flag;
1901 * break_flag = false;
1902 */
1903 key.m_page_no++;
1904 if (likely(bits & ScanOp::SCAN_LCP))
1905 {
1906 jam();
1907 /* Coverage tested path */
1908 /**
1909 * We could be scanning for a long time and only finding LCP_SKIP
1910 * records, we need to keep the LCP watchdog aware that we are
1911 * progressing, so we report each change to a new page by reporting
1912 * the id of the next page to scan.
1913 */
1914 c_backup->update_lcp_pages_scanned(signal,
1915 c_lqh->get_scan_api_op_ptr(scan.m_userPtr),
1916 key.m_page_no,
1917 scan.m_scanGCI,
1918 pos.m_lcp_scan_changed_rows_page);
1919 scan.m_last_seen = __LINE__;
1920 }
1921 if (unlikely(key.m_page_no >= frag.m_max_page_cnt))
1922 {
1923 if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
1924 {
1925 if (key.m_page_no < scan.m_endPage)
1926 {
1927 jam();
1928 DEB_NR_SCAN(("scanning page %u", key.m_page_no));
1929 goto cont;
1930 }
1931 jam();
1932 // no more pages, scan ends
1933 pos.m_get = ScanPos::Get_undef;
1934 scan.m_state = ScanOp::Last;
1935 return true;
1936 }
1937 else if (bits & ScanOp::SCAN_LCP &&
1938 key.m_page_no < scan.m_endPage)
1939 {
1940 /**
1941 * We come here with ScanOp::SCAN_LCP set AND
1942 * frag.m_max_page_cnt < scan.m_endPage. In this case
1943 * it is still ok to finish the LCP scan. The missing
1944 * pages are handled when they are dropped, so before
1945 * we drop a page we record all entries that needs
1946 * recording for the LCP. These have been sent to the
1947 * LCP keep list. Since when we come here the LCP keep
1948 * list is empty we are done with the scan.
1949 *
1950 * We will however continue the scan for LCP scans. The
1951 * reason is that we might have set the LCP_SCANNED_BIT
1952 * on pages already dropped. So we need to continue scanning
1953 * to ensure that all the lcp scanned bits are reset.
1954 *
1955 * For the moment this code is unreachable since m_max_page_cnt
1956 * cannot decrease. Thus m_max_page_cnt cannot be smaller
1957 * than scan.m_endPage since scan.m_endPage is initialised to
1958 * m_max_page_cnt at start of scan.
1959 *
1960 * This is currently not implemented. So we
1961 * will make this code path using an ndbrequire instead.
1962 *
1963 * We keep the code as comments to be activated when we implement
1964 * the possibility to release pages in the directory.
1965 */
1966 ndbabort();
1967 /* We will not scan this page, so reset flag immediately */
1968 // reset_lcp_scanned_bit(fragPtr.p, key.m_page_no);
1969 // scan.m_last_seen = __LINE__;
1970 // break_flag = true;
1971 }
1972 else
1973 {
1974 // no more pages, scan ends
1975 pos.m_get = ScanPos::Get_undef;
1976 scan.m_last_seen = __LINE__;
1977 scan.m_state = ScanOp::Last;
1978 return true;
1979 }
1980 }
1981 if (unlikely((bits & ScanOp::SCAN_LCP) &&
1982 (key.m_page_no >= scan.m_endPage)))
1983 {
1984 jam();
1985 /**
1986 * We have arrived at a page number that didn't exist at start of
1987 * LCP, we can quit the LCP scan since we cannot find any more
1988 * pages that are containing rows to be saved in LCP.
1989 */
1990 // no more pages, scan ends
1991 pos.m_get = ScanPos::Get_undef;
1992 scan.m_last_seen = __LINE__;
1993 scan.m_state = ScanOp::Last;
1994 return true;
1995 }
1996 /**
1997 * Activate this code if we implement support for decreasing
1998 * frag.m_max_page_cnt
1999 *
2000 * if (break_flag)
2001 * {
2002 * jam();
2003 * pos.m_get = ScanPos::Get_next_page_mm;
2004 * scan.m_last_seen = __LINE__;
2005 * break; // incr loop count
2006 * }
2007 */
2008 cont:
2009 key.m_page_idx = first;
2010 pos.m_get = ScanPos::Get_page_mm;
2011 // clear cached value
2012 pos.m_realpid_mm = RNIL;
2013 }
2014 /*FALLTHRU*/
2015 case ScanPos::Get_page_mm:
2016 // get TUP real page
2017 {
2018 PagePtr pagePtr;
2019 loop_count+= 4;
2020 if (pos.m_realpid_mm == RNIL)
2021 {
2022 Uint32 *next_ptr, *prev_ptr;
2023 pos.m_realpid_mm = getRealpidScan(fragPtr.p,
2024 key.m_page_no,
2025 &next_ptr,
2026 &prev_ptr);
2027 if (bits & ScanOp::SCAN_LCP)
2028 {
2029 jam();
2030 Uint32 ret_val = prepare_lcp_scan_page(scan,
2031 key,
2032 next_ptr,
2033 prev_ptr);
2034 if (ret_val == ZSCAN_FOUND_PAGE_END)
2035 break;
2036 else if (ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE)
2037 goto record_dropped_change_page;
2038 /* else continue */
2039 }
2040 else if (unlikely(pos.m_realpid_mm == RNIL))
2041 {
2042 jam();
2043 if (bits & ScanOp::SCAN_NR)
2044 {
2045 jam();
2046 goto nopage;
2047 }
2048 pos.m_get = ScanPos::Get_next_page_mm;
2049 break; // incr loop count
2050 }
2051 else
2052 {
2053 jam();
2054 }
2055 }
2056 else
2057 {
2058 jam();
2059 }
2060 c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
2061 /**
2062 * We are in the process of performing a Full table scan, this can be
2063 * either due to a user requesting a full table scan, it can also be
2064 * as part of Node Recovery where we are assisting the starting node
2065 * to be synchronized (SCAN_NR set) and it is also used for LCP scans
2066 * (SCAN_LCP set).
2067 *
2068 * We know that we will touch all cache lines where there is a tuple
2069 * header and all scans using main memory pages are done on the fixed
2070 * pages. To speed up scan processing we will prefetch such that we
2071 * always are a few tuples ahead. We scan ahead 4 tuples here and then
2072 * we scan yet one more ahead at each new tuple we get to. We only need
2073 * initialise by scanning 3 rows ahead since we will immediately fetch
2074 * the fourth one before looking at the first row.
2075 *
2076 * PREFETCH_SCAN_TUPLE:
2077 */
2078 if (likely((key.m_page_idx + (size * 3)) <= Fix_page::DATA_WORDS))
2079 {
2080 struct Tup_fixsize_page *page_ptr =
2081 (struct Tup_fixsize_page*)pagePtr.p;
2082 NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx,
2083 size));
2084 NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + size,
2085 size));
2086 NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + (size * 2),
2087 size));
2088 }
2089 if (bits & ScanOp::SCAN_LCP)
2090 {
2091 if (pagePtr.p->is_page_to_skip_lcp())
2092 {
2093 Uint32 ret_val = handle_lcp_skip_page(scan,
2094 key,
2095 pagePtr.p);
2096 if (ret_val == ZSCAN_FOUND_PAGE_END)
2097 {
2098 jamDebug();
2099 break;
2100 }
2101 else
2102 {
2103 jamDebug();
2104 ndbrequire(ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE);
2105 goto record_dropped_change_page;
2106 }
2107 }
2108 else if (pos.m_lcp_scan_changed_rows_page)
2109 {
2110 /* CHANGE page is accessed */
2111 if (key.m_page_idx == 0)
2112 {
2113 jamDebug();
2114 /* First access of a CHANGE page */
2115 Uint32 ret_val = setup_change_page_for_scan(scan,
2116 (Fix_page*)pagePtr.p,
2117 key,
2118 size);
2119 if (ret_val == ZSCAN_FOUND_PAGE_END)
2120 {
2121 jamDebug();
2122 /* No changes found on page level bitmaps */
2123 break;
2124 }
2125 else
2126 {
2127 ndbrequire(ret_val == ZSCAN_FOUND_TUPLE);
2128 }
2129 }
2130 }
2131 else
2132 {
2133 /* LCP ALL page is accessed */
2134 jamDebug();
2135 /**
2136 * Make sure those values have defined values if we were to enter
2137 * the wrong path for some reason. These values will lead to a
2138 * crash if we try to run the CHANGE page code for an ALL page.
2139 */
2140 pos.m_all_rows = false;
2141 pos.m_next_small_area_check_idx = RNIL;
2142 pos.m_next_large_area_check_idx = RNIL;
2143 }
2144 }
2145 /* LCP normal case 4a) above goes here */
2146
2147 nopage:
2148 pos.m_page = pagePtr.p;
2149 pos.m_get = ScanPos::Get_tuple;
2150 }
2151 continue;
2152 case ScanPos::Get_next_page_dd:
2153 // move to next disk page
2154 jam();
2155 {
2156 Disk_alloc_info& alloc = frag.m_disk_alloc_info;
2157 Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
2158 Ptr<Extent_info> ext_ptr;
2159 c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
2160 Extent_info* ext = ext_ptr.p;
2161 key.m_page_no++;
2162 if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
2163 // no more pages in this extent
2164 jam();
2165 if (! list.next(ext_ptr)) {
2166 // no more extents, scan ends
2167 jam();
2168 pos.m_get = ScanPos::Get_undef;
2169 scan.m_state = ScanOp::Last;
2170 return true;
2171 } else {
2172 // move to next extent
2173 jam();
2174 pos.m_extent_info_ptr_i = ext_ptr.i;
2175 ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
2176 key.m_file_no = ext->m_key.m_file_no;
2177 key.m_page_no = ext->m_first_page_no;
2178 }
2179 }
2180 key.m_page_idx = first;
2181 pos.m_get = ScanPos::Get_page_dd;
2182 /*
2183 read ahead for scan in disk order
2184 do read ahead every 8:th page
2185 */
2186 if ((bits & ScanOp::SCAN_DD) &&
2187 (((key.m_page_no - ext->m_first_page_no) & 7) == 0))
2188 {
2189 jam();
2190 // initialize PGMAN request
2191 Page_cache_client::Request preq;
2192 preq.m_page = pos.m_key;
2193 preq.m_callback = TheNULLCallback;
2194
2195 // set maximum read ahead
2196 Uint32 read_ahead = m_max_page_read_ahead;
2197
2198 while (true)
2199 {
2200 // prepare page read ahead in current extent
2201 Uint32 page_no = preq.m_page.m_page_no;
2202 Uint32 page_no_limit = page_no + read_ahead;
2203 Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
2204 if (page_no_limit > limit)
2205 {
2206 jam();
2207 // read ahead crosses extent, set limit for this extent
2208 read_ahead = page_no_limit - limit;
2209 page_no_limit = limit;
2210 // and make sure we only read one extra extent next time around
2211 if (read_ahead > alloc.m_extent_size)
2212 read_ahead = alloc.m_extent_size;
2213 }
2214 else
2215 {
2216 jam();
2217 read_ahead = 0; // no more to read ahead after this
2218 }
2219 // do read ahead pages for this extent
2220 while (page_no < page_no_limit)
2221 {
2222 // page request to PGMAN
2223 jam();
2224 preq.m_page.m_page_no = page_no;
2225 preq.m_table_id = frag.fragTableId;
2226 preq.m_fragment_id = frag.fragmentId;
2227 int flags = Page_cache_client::DISK_SCAN;
2228 // ignore result
2229 Page_cache_client pgman(this, c_pgman);
2230 pgman.get_page(signal, preq, flags);
2231 jamEntry();
2232 page_no++;
2233 }
2234 if (!read_ahead || !list.next(ext_ptr))
2235 {
2236 // no more extents after this or read ahead done
2237 jam();
2238 break;
2239 }
2240 // move to next extent and initialize PGMAN request accordingly
2241 Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
2242 preq.m_page.m_file_no = ext->m_key.m_file_no;
2243 preq.m_page.m_page_no = ext->m_first_page_no;
2244 }
2245 } // if ScanOp::SCAN_DD read ahead
2246 }
2247 /*FALLTHRU*/
2248 case ScanPos::Get_page_dd:
2249 // get global page in PGMAN cache
2250 jam();
2251 {
2252 // check if page is un-allocated or empty
2253 if (likely(! (bits & ScanOp::SCAN_NR)))
2254 {
2255 D("Tablespace_client - scanNext");
2256 Tablespace_client tsman(signal, this, c_tsman,
2257 frag.fragTableId,
2258 frag.fragmentId,
2259 c_lqh->getCreateSchemaVersion(frag.fragTableId),
2260 frag.m_tablespace_id);
2261 unsigned uncommitted, committed;
2262 uncommitted = committed = ~(unsigned)0;
2263 int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
2264 ndbrequire(ret == 0);
2265 if (committed == 0 && uncommitted == 0) {
2266 // skip empty page
2267 jam();
2268 pos.m_get = ScanPos::Get_next_page_dd;
2269 break; // incr loop count
2270 }
2271 }
2272 // page request to PGMAN
2273 Page_cache_client::Request preq;
2274 preq.m_page = pos.m_key;
2275 preq.m_table_id = frag.fragTableId;
2276 preq.m_fragment_id = frag.fragmentId;
2277 preq.m_callback.m_callbackData = scanPtr.i;
2278 preq.m_callback.m_callbackFunction =
2279 safe_cast(&Dbtup::disk_page_tup_scan_callback);
2280 int flags = Page_cache_client::DISK_SCAN;
2281 Page_cache_client pgman(this, c_pgman);
2282 Ptr<GlobalPage> pagePtr;
2283 int res = pgman.get_page(signal, preq, flags);
2284 pagePtr = pgman.m_ptr;
2285 jamEntry();
2286 if (res == 0) {
2287 jam();
2288 // request queued
2289 pos.m_get = ScanPos::Get_tuple;
2290 return false;
2291 }
2292 else if (res < 0)
2293 {
2294 jam();
2295 if (res == -1)
2296 {
2297 jam();
2298 m_scan_error_code = Uint32(~0);
2299 }
2300 else
2301 {
2302 jam();
2303 res = -res;
2304 m_scan_error_code = res;
2305 }
2306 /* Flag to reply code that we have an error */
2307 scan.m_state = ScanOp::Invalid;
2308 return true;
2309 }
2310 ndbrequire(res > 0);
2311 pos.m_page = (Page*)pagePtr.p;
2312 }
2313 pos.m_get = ScanPos::Get_tuple;
2314 continue;
2315 // get tuple
2316 // move to next tuple
2317 case ScanPos::Get_next_tuple:
2318 // move to next fixed size tuple
2319 jam();
2320 {
2321 key.m_page_idx += size;
2322 pos.m_get = ScanPos::Get_tuple;
2323 }
2324 /*FALLTHRU*/
2325 case ScanPos::Get_tuple:
2326 // get fixed size tuple
2327 jam();
2328 if ((bits & ScanOp::SCAN_VS) == 0)
2329 {
2330 Fix_page* page = (Fix_page*)pos.m_page;
2331 if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
2332 {
2333 pos.m_get = ScanPos::Get_next_tuple;
2334 if (unlikely((bits & ScanOp::SCAN_NR) &&
2335 pos.m_realpid_mm == RNIL))
2336 {
2337 /**
2338 * pos.m_page isn't initialized this path, so handle early
2339 * We're doing a node restart and we are scanning beyond our
2340 * existing rowid's since starting node had those rowid's
2341 * defined.
2342 */
2343 jam();
2344 foundGCI = 0;
2345 goto found_deleted_rowid;
2346 }
2347 #ifdef VM_TRACE
2348 if (! (bits & ScanOp::SCAN_DD))
2349 {
2350 Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
2351 ndbrequire(pos.m_realpid_mm == realpid);
2352 }
2353 #endif
2354 tuple_header_ptr = (Tuple_header*)&page->m_data[key.m_page_idx];
2355
2356 if ((key.m_page_idx + (size * 4)) <= Fix_page::DATA_WORDS)
2357 {
2358 /**
2359 * Continue staying ahead of scan on this page by prefetching
2360 * a row 4 tuples ahead of this tuple, prefetched the first 3
2361 * at PREFETCH_SCAN_TUPLE.
2362 */
2363 struct Tup_fixsize_page *page_ptr =
2364 (struct Tup_fixsize_page*)page;
2365 NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + (size * 3),
2366 size));
2367 }
2368 if (likely((! ((bits & ScanOp::SCAN_NR) ||
2369 (bits & ScanOp::SCAN_LCP))) ||
2370 ((bits & ScanOp::SCAN_LCP) &&
2371 !pos.m_lcp_scan_changed_rows_page)))
2372 {
2373 jam();
2374 /**
2375 * We come here for normal full table scans and also for LCP
2376 * scans where we scan ALL ROWS pages.
2377 *
2378 * We simply check if the row is free, if it isn't then we will
2379 * handle it. For LCP scans we will also check at found_tuple that
2380 * the LCP_SKIP bit isn't set. If it is then the rowid was empty
2381 * at start of LCP. If the rowid is free AND we are scanning an
2382 * ALL ROWS page then the LCP_SKIP cannot be set, this is set only
2383 * for CHANGED ROWS pages when deleting tuples.
2384 *
2385 * Free rowid's might have existed at start of LCP. This was
2386 * handled by using the LCP keep list when tuple was deleted.
2387 * So when we come here we don't have to worry about LCP scanning
2388 * those rows.
2389 *
2390 * LCP_DELETE flag can never be set on ALL ROWS pages.
2391 *
2392 * The state Tuple_header::ALLOC means that the row is being
2393 * inserted, it thus have no current committed state and is
2394 * thus here equivalent to the FREE state for LCP scans.
2395 */
2396 thbits = tuple_header_ptr->m_header_bits;
2397 if ((bits & ScanOp::SCAN_LCP) &&
2398 (thbits & Tuple_header::LCP_DELETE))
2399 {
2400 g_eventLogger->info("(%u)LCP_DELETE on tab(%u,%u), row(%u,%u)"
2401 " ALL ROWS page, header: %x",
2402 instance(),
2403 fragPtr.p->fragTableId,
2404 fragPtr.p->fragmentId,
2405 key.m_page_no,
2406 key.m_page_idx,
2407 thbits);
2408 ndbabort();
2409 }
2410 if (! ((thbits & Tuple_header::FREE ||
2411 thbits & Tuple_header::DELETE_WAIT) ||
2412 ((bits & ScanOp::SCAN_LCP) &&
2413 (thbits & Tuple_header::ALLOC))))
2414 {
2415 jam();
2416 scan.m_last_seen = __LINE__;
2417 goto found_tuple;
2418 }
2419 /**
2420 * Ensure that LCP_SKIP bit is clear before we move on
2421 * It could be set if the row was inserted after LCP
2422 * start and then followed by a delete of the row before
2423 * we arrive here.
2424 */
2425 if ((bits & ScanOp::SCAN_LCP) &&
2426 (thbits & Tuple_header::LCP_SKIP))
2427 {
2428 jam();
2429 tuple_header_ptr->m_header_bits =
2430 thbits & (~Tuple_header::LCP_SKIP);
2431 DEB_LCP_SKIP(("(%u)Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
2432 ", header: %x"
2433 ", new header: %x"
2434 ", tuple_header_ptr: %p",
2435 instance(),
2436 fragPtr.p->fragTableId,
2437 fragPtr.p->fragmentId,
2438 key.m_page_no,
2439 key.m_page_idx,
2440 thbits,
2441 tuple_header_ptr->m_header_bits,
2442 tuple_header_ptr));
2443 updateChecksum(tuple_header_ptr,
2444 tablePtr.p,
2445 thbits,
2446 tuple_header_ptr->m_header_bits);
2447 }
2448 scan.m_last_seen = __LINE__;
2449 }
2450 else if (bits & ScanOp::SCAN_NR)
2451 {
2452 thbits = tuple_header_ptr->m_header_bits;
2453 if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
2454 scan.m_scanGCI ||
2455 foundGCI == 0)
2456 {
2457 /**
2458 * foundGCI == 0 means that the row is initialised but has not
2459 * yet been committed as part of insert transaction. All other
2460 * rows have the GCI entry set to last GCI it was changed, this
2461 * is true for even deleted rows as long as the page is still
2462 * maintained by the fragment.
2463 *
2464 * When foundGCI == 0 there are two cases.
2465 * The first case is that thbits == Fix_page::FREE_RECORD.
2466 * In this case the tuple doesn't exist and should be
2467 * deleted if existing in the starting node.
2468 * As part of Fix_page::FREE_RECORD the Tuple_header::FREE
2469 * bit is set. So this is handled below.
2470 * The second case is that thbits == Tuple_header::ALLOC.
2471 * In this case the tuple is currently being inserted, but the
2472 * transaction isn't yet committed. In this case we will follow
2473 * the found_tuple path. This means that we will attempt to
2474 * lock the tuple, this will be unsuccessful since the row
2475 * is currently being inserted and is locked for write.
2476 * When the commit happens the row lock is released and the
2477 * copy scan will continue on this row. It will send an INSERT
2478 * to the starting node. Most likely the INSERT transaction
2479 * was started after the copy scan started, in this case the
2480 * INSERT will simply be converted to an UPDATE by the starting
2481 * node. If the insert was started before the new replica of
2482 * the fragment was included, the INSERT will be performed.
2483 * This is the reason why we have to go the extra mile here to
2484 * ensure that we don't lose records that are being inserted as
2485 * part of long transactions.
2486 *
2487 * The final problem is when the INSERT is aborted. In this case
2488 * we return from the lock row in execACCKEYREF. Since the row
2489 * is now in the Tuple_header::FREE state we must re-read the
2490 * row again. This is handled by changing the pos.m_get state
2491 * to Get_tuple instead of Get_next_tuple.
2492 */
2493 if (! (thbits & Tuple_header::FREE ||
2494 thbits & Tuple_header::DELETE_WAIT))
2495 {
2496 jam();
2497 goto found_tuple;
2498 }
2499 else
2500 {
2501 goto found_deleted_rowid;
2502 }
2503 }
2504 else if ((thbits & Fix_page::FREE_RECORD) != Fix_page::FREE_RECORD &&
2505 tuple_header_ptr->m_operation_ptr_i != RNIL)
2506 {
2507 jam();
2508 goto found_tuple; // Locked tuple...
2509 // skip free tuple
2510 }
2511 DEB_NR_SCAN_EXTRA(("(%u)NR_SCAN_SKIP:tab(%u,%u) row(%u,%u),"
2512 " recGCI: %u, scanGCI: %u, header: %x",
2513 instance(),
2514 fragPtr.p->fragTableId,
2515 fragPtr.p->fragmentId,
2516 key.m_page_no,
2517 key.m_page_idx,
2518 foundGCI,
2519 scan.m_scanGCI,
2520 thbits));
2521 }
2522 else
2523 {
2524 ndbrequire(c_backup->is_partial_lcp_enabled());
2525 ndbrequire((bits & ScanOp::SCAN_LCP) &&
2526 pos.m_lcp_scan_changed_rows_page);
2527 Uint32 ret_val;
2528 if (!pos.m_all_rows)
2529 {
2530 ret_val = move_to_next_change_page_row(scan,
2531 page,
2532 &tuple_header_ptr,
2533 loop_count,
2534 size);
2535 if (ret_val == ZSCAN_FOUND_PAGE_END)
2536 {
2537 /**
2538 * We have finished scanning a CHANGE PAGE row where we
2539 * checked even the parts of a page. In this case we
2540 * perform very detailed analysis that we clear all bits
2541 * while scanning. To handle this we will set a special
2542 * bit if anyone updates any row in the page while
2543 * we are scanning in this mode. This ensures that the
2544 * flag bits are in read-only mode and only updated by
2545 * LCP scanning. We don't track which part of page is
2546 * updated in this case, so if any updates have been
2547 * performed on page in this state, all bits on page
2548 * are set to ensure that we will scan the entire page
2549 * in the next LCP scan.
2550 */
2551 ndbassert(!page->get_any_changes());
2552 page->clear_page_being_lcp_scanned();
2553 if (page->get_and_clear_change_while_lcp_scan())
2554 {
2555 jamDebug();
2556 page->set_all_change_map();
2557 }
2558 /**
2559 * We've finished scanning a page that was using filtering using
2560 * the bitmaps on the page. We are ready to set the last LCP
2561 * state to A.
2562 */
2563 /* Coverage tested */
2564 set_last_lcp_state(fragPtr.p,
2565 key.m_page_no,
2566 false /* Set state to A */);
2567 scan.m_last_seen = __LINE__;
2568 pos.m_get = ScanPos::Get_next_page;
2569 break;
2570 }
2571 }
2572 ret_val = handle_scan_change_page_rows(scan,
2573 page,
2574 tuple_header_ptr,
2575 foundGCI);
2576 if (likely(ret_val == ZSCAN_FOUND_TUPLE))
2577 {
2578 thbits = tuple_header_ptr->m_header_bits;
2579 goto found_tuple;
2580 }
2581 else if (ret_val == ZSCAN_FOUND_DELETED_ROWID)
2582 goto found_deleted_rowid;
2583 ndbrequire(ret_val == ZSCAN_FOUND_NEXT_ROW);
2584 }
2585 }
2586 else
2587 {
2588 jam();
2589 /**
2590 * We've finished scanning a page, for LCPs we are ready to
2591 * set the last LCP state to A.
2592 */
2593 if (bits & ScanOp::SCAN_LCP)
2594 {
2595 jam();
2596 /* Coverage tested */
2597 set_last_lcp_state(fragPtr.p,
2598 key.m_page_no,
2599 false /* Set state to A */);
2600 if (!pos.m_all_rows)
2601 {
2602 ndbassert(page->verify_change_maps(jamBuffer()));
2603 }
2604 scan.m_last_seen = __LINE__;
2605 }
2606 // no more tuples on this page
2607 pos.m_get = ScanPos::Get_next_page;
2608 }
2609 }
2610 else
2611 {
2612 jam();
2613 Var_page * page = (Var_page*)pos.m_page;
2614 if (key.m_page_idx < page->high_index)
2615 {
2616 jam();
2617 pos.m_get = ScanPos::Get_next_tuple;
2618 if (!page->is_free(key.m_page_idx))
2619 {
2620 tuple_header_ptr = (Tuple_header*)page->get_ptr(key.m_page_idx);
2621 thbits = tuple_header_ptr->m_header_bits;
2622 goto found_tuple;
2623 }
2624 }
2625 else
2626 {
2627 jam();
2628 // no more tuples on this page
2629 pos.m_get = ScanPos::Get_next_page;
2630 break;
2631 }
2632 }
2633 break; // incr loop count
2634 found_tuple:
2635 // found possible tuple to return
2636 jam();
2637 {
2638 // caller has already set pos.m_get to next tuple
2639 if (likely(! (bits & ScanOp::SCAN_LCP &&
2640 thbits & Tuple_header::LCP_SKIP)))
2641 {
2642 Local_key& key_mm = pos.m_key_mm;
2643 if (likely(! (bits & ScanOp::SCAN_DD)))
2644 {
2645 key_mm = pos.m_key;
2646 // real page id is already set
2647 if (bits & ScanOp::SCAN_LCP)
2648 {
2649 c_backup->update_pause_lcp_counter(loop_count);
2650 }
2651 }
2652 else
2653 {
2654 tuple_header_ptr->get_base_record_ref(key_mm);
2655 // recompute for each disk tuple
2656 pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
2657 }
2658 // TUPKEYREQ handles savepoint stuff
2659 scan.m_state = ScanOp::Current;
2660 return true;
2661 }
2662 else
2663 {
2664 jam();
2665 /* Clear LCP_SKIP bit so that it will not show up in next LCP */
2666 tuple_header_ptr->m_header_bits =
2667 thbits & ~(Uint32)Tuple_header::LCP_SKIP;
2668
2669 DEB_LCP_SKIP(("(%u) 3 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
2670 ", header: %x",
2671 instance(),
2672 fragPtr.p->fragTableId,
2673 fragPtr.p->fragmentId,
2674 key.m_page_no,
2675 key.m_page_idx,
2676 thbits));
2677
2678 updateChecksum(tuple_header_ptr,
2679 tablePtr.p,
2680 thbits,
2681 tuple_header_ptr->m_header_bits);
2682 scan.m_last_seen = __LINE__;
2683 }
2684 }
2685 break;
2686
2687 record_dropped_change_page:
2688 {
2689 ndbrequire(c_backup->is_partial_lcp_enabled());
2690 c_backup->update_pause_lcp_counter(loop_count);
2691 record_delete_by_pageid(signal,
2692 frag.fragTableId,
2693 frag.fragmentId,
2694 scan,
2695 key.m_page_no,
2696 size,
2697 true);
2698 return false;
2699 }
2700
2701 found_deleted_rowid:
2702
2703 ndbrequire((bits & ScanOp::SCAN_NR) ||
2704 (bits & ScanOp::SCAN_LCP));
2705 if (!(bits & ScanOp::SCAN_LCP && pos.m_is_last_lcp_state_D))
2706 {
2707 ndbrequire(bits & ScanOp::SCAN_NR ||
2708 pos.m_lcp_scan_changed_rows_page);
2709
2710 Local_key& key_mm = pos.m_key_mm;
2711 if (! (bits & ScanOp::SCAN_DD))
2712 {
2713 jam();
2714 key_mm = pos.m_key;
2715 // caller has already set pos.m_get to next tuple
2716 // real page id is already set
2717 }
2718 else
2719 {
2720 jam();
2721 /**
2722 * Currently dead code since NR scans never use Disk data scans.
2723 */
2724 ndbrequire(bits & ScanOp::SCAN_NR);
2725 tuple_header_ptr->get_base_record_ref(key_mm);
2726 // recompute for each disk tuple
2727 pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
2728
2729 Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
2730 tuple_header_ptr =
2731 (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
2732 if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
2733 scan.m_scanGCI ||
2734 foundGCI == 0)
2735 {
2736 thbits = tuple_header_ptr->m_header_bits;
2737 if (! (thbits & Tuple_header::FREE ||
2738 thbits & Tuple_header::DELETE_WAIT))
2739 {
2740 jam();
2741 break;
2742 }
2743 jam();
2744 }
2745 }
2746 /**
2747 * This code handles Node recovery, the row might still exist at the
2748 * starting node although it no longer exists at this live node. We
2749 * send a DELETE by ROWID to the starting node.
2750 *
2751 * This code is also used by LCPs to record deleted row ids.
2752 */
2753 c_backup->update_pause_lcp_counter(loop_count);
2754 record_delete_by_rowid(signal,
2755 frag.fragTableId,
2756 frag.fragmentId,
2757 scan,
2758 pos.m_key_mm,
2759 foundGCI,
2760 true);
2761 // TUPKEYREQ handles savepoint stuff
2762 return false;
2763 }
2764 scan.m_last_seen = __LINE__;
2765 break; // incr loop count
2766 default:
2767 ndbabort();
2768 }
2769 loop_count+= 4;
2770 if (loop_count >= 512)
2771 {
2772 jam();
2773 if (bits & ScanOp::SCAN_LCP)
2774 {
2775 jam();
2776 c_backup->update_pause_lcp_counter(loop_count);
2777 if (!c_backup->check_pause_lcp())
2778 {
2779 loop_count = 0;
2780 continue;
2781 }
2782 c_backup->pausing_lcp(5,loop_count);
2783 }
2784 break;
2785 }
2786 }
2787 // TODO: at drop table we have to flush and terminate these
2788 jam();
2789 scan.m_last_seen = __LINE__;
2790 signal->theData[0] = ZTUP_SCAN;
2791 signal->theData[1] = scanPtr.i;
2792 if (!c_lqh->rt_break_is_scan_prioritised(scan.m_userPtr))
2793 {
2794 jam();
2795 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2796 }
2797 else
2798 {
2799 /**
2800 * Sending with bounded delay means that we allow all signals in job buffer
2801 * to be executed until the maximum is arrived at which is currently 100.
2802 * So sending with bounded delay means that we get more predictable delay.
2803 * It might be longer than with priority B, but it will never be longer
2804 * than 100 signals.
2805 */
2806 jam();
2807 //#ifdef VM_TRACE
2808 c_debug_count++;
2809 if (c_debug_count % 10000 == 0)
2810 {
2811 DEB_LCP_DELAY(("(%u)TupScan delayed 10000 times", instance()));
2812 }
2813 //#endif
2814 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, BOUNDED_DELAY, 2);
2815 }
2816 return false;
2817 }
2818
2819 void
record_delete_by_rowid(Signal * signal,Uint32 tableId,Uint32 fragmentId,ScanOp & scan,Local_key & key,Uint32 foundGCI,bool set_scan_state)2820 Dbtup::record_delete_by_rowid(Signal *signal,
2821 Uint32 tableId,
2822 Uint32 fragmentId,
2823 ScanOp &scan,
2824 Local_key &key,
2825 Uint32 foundGCI,
2826 bool set_scan_state)
2827 {
2828 const Uint32 bits = scan.m_bits;
2829 DEB_LCP_DEL_EXTRA(("(%u)Delete by rowid tab(%u,%u), row(%u,%u)",
2830 instance(),
2831 tableId,
2832 fragmentId,
2833 key.m_page_no,
2834 key.m_page_idx));
2835 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
2836 conf->scanPtr = scan.m_userPtr;
2837 conf->accOperationPtr = (bits & ScanOp::SCAN_LCP) ? Uint32(-1) : RNIL;
2838 conf->fragId = fragmentId;
2839 conf->localKey[0] = key.m_page_no;
2840 conf->localKey[1] = key.m_page_idx;
2841 conf->gci = foundGCI;
2842 if (set_scan_state)
2843 scan.m_state = ScanOp::Next;
2844 signal->setLength(NextScanConf::SignalLengthNoKeyInfo);
2845 c_lqh->exec_next_scan_conf(signal);
2846 return;
2847 }
2848
2849 void
record_delete_by_pageid(Signal * signal,Uint32 tableId,Uint32 fragmentId,ScanOp & scan,Uint32 page_no,Uint32 record_size,bool set_scan_state)2850 Dbtup::record_delete_by_pageid(Signal *signal,
2851 Uint32 tableId,
2852 Uint32 fragmentId,
2853 ScanOp &scan,
2854 Uint32 page_no,
2855 Uint32 record_size,
2856 bool set_scan_state)
2857 {
2858 DEB_LCP_DEL_EXTRA(("(%u)Delete by pageid tab(%u,%u), page(%u)",
2859 instance(),
2860 tableId,
2861 fragmentId,
2862 page_no));
2863 jam();
2864 /**
2865 * Set page_idx to flag to LQH that it is a
2866 * DELETE by PAGEID, this also ensures that we go to the next
2867 * page when we return to continue the LCP scan.
2868 */
2869 Uint32 page_idx = ZNIL;
2870
2871 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
2872 conf->scanPtr = scan.m_userPtr;
2873 conf->accOperationPtr = Uint32(-1);
2874 conf->fragId = fragmentId;
2875 conf->localKey[0] = page_no;
2876 conf->localKey[1] = page_idx;
2877 conf->gci = record_size; /* Used to transport record size */
2878 if (set_scan_state)
2879 scan.m_state = ScanOp::Next;
2880 signal->setLength(NextScanConf::SignalLengthNoKeyInfo);
2881 c_lqh->exec_next_scan_conf(signal);
2882 }
2883
2884 /**
2885 * The LCP requires that some rows which are deleted during the main-memory
2886 * scan of fragments with disk-data parts are included in the main-memory LCP.
2887 * This is done so that during recovery, the main-memory part can be used to
2888 * find the disk-data part again, so that it can be deleted during Redo
2889 * application.
2890 *
2891 * This is implemented by copying the row content into
2892 * 'undo memory' / copy tuple space, and adding it to a per-fragment
2893 * 'lcp keep list', before deleting it at transaction commit time.
2894 * The row content is then only reachable via the lcp keep list, and does not
2895 * cause any ROWID reuse issues (899).
2896 *
2897 * The LCP scan treats the fragment's 'lcp keep list' as a top-priority source
2898 * of rows to be included in the fragment LCP, so rows should only be kept
2899 * momentarily.
2900 *
2901 * As these rows exist solely in DBTUP undo memory, it is not necessary to
2902 * perform the normal ACC locking protocols etc, but it is necessary to prepare
2903 * TUP for the coming TUPKEYREQ...
2904 *
2905 * The principle behind the LCP keep list is described in more detail in
2906 * the research paper:
2907 * Recovery Principles of MySQL Cluster 5.1 presented at VLDB in 2005.
2908 * The main thought is that we restore the disk data part to the point in time
2909 * when we start the LCP on the fragment. Thus we need to ensure that any rows
2910 * that exist at start of LCP also exist in the LCP and vice versa any row
2911 * that didn't exist at start of LCP doesn't exist in LCP. Updates of rows
2912 * don't matter since the REDO log application will ensure that the row
2913 * gets synchronized.
2914 *
2915 * An important part of this is to record the number of pages at start of LCP.
2916 * We don't need to worry about scanning pages deleted during LCP since the
2917 * LCP keep list ensures that those rows were checkpointed before being
2918 * deleted.
2919 */
2920 void
handle_lcp_keep(Signal * signal,FragrecordPtr fragPtr,ScanOp * scanPtrP)2921 Dbtup::handle_lcp_keep(Signal* signal,
2922 FragrecordPtr fragPtr,
2923 ScanOp* scanPtrP)
2924 {
2925 TablerecPtr tablePtr;
2926 tablePtr.i = scanPtrP->m_tableId;
2927 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
2928
2929 ndbrequire(!fragPtr.p->m_lcp_keep_list_head.isNull());
2930 Local_key tmp = fragPtr.p->m_lcp_keep_list_head;
2931 Uint32 * copytuple = get_copy_tuple_raw(&tmp);
2932 if (copytuple[0] == FREE_PAGE_RNIL)
2933 {
2934 jam();
2935 ndbrequire(c_backup->is_partial_lcp_enabled());
2936 /* Handle DELETE by ROWID or DELETE by PAGEID */
2937 Uint32 num_entries = copytuple[4];
2938 Uint32 page_id = copytuple[5];
2939 Uint16 *page_index_array = (Uint16*)©tuple[6];
2940 c_backup->change_current_page_temp(page_id);
2941 if (page_index_array[0] == ZNIL)
2942 {
2943 jam();
2944 /* DELETE by PAGEID */
2945 const Uint32 size = tablePtr.p->m_offsets[MM].m_fix_header_size;
2946 Local_key key;
2947 key.m_page_no = page_id;
2948 key.m_page_idx = ZNIL;
2949 ndbrequire(num_entries == 1);
2950 DEB_LCP_KEEP(("(%u)tab(%u,%u) page(%u): Handle LCP keep DELETE by PAGEID",
2951 instance(),
2952 fragPtr.p->fragTableId,
2953 fragPtr.p->fragmentId,
2954 page_id));
2955 remove_top_from_lcp_keep_list(fragPtr.p, copytuple, tmp);
2956 c_backup->lcp_keep_delete_by_page_id();
2957 record_delete_by_pageid(signal,
2958 fragPtr.p->fragTableId,
2959 fragPtr.p->fragmentId,
2960 *scanPtrP,
2961 page_id,
2962 size,
2963 false);
2964 c_undo_buffer.free_copy_tuple(&tmp);
2965 }
2966 else
2967 {
2968 jam();
2969 /* DELETE by ROWID */
2970 Local_key key;
2971 key.m_page_no = page_id;
2972 ndbrequire(num_entries > 0);
2973 num_entries--;
2974 key.m_page_no = page_id;
2975 key.m_page_idx = page_index_array[num_entries];
2976 copytuple[4] = num_entries;
2977 c_backup->lcp_keep_delete_row();
2978 DEB_LCP_KEEP(("(%u)tab(%u,%u) page(%u,%u): "
2979 "Handle LCP keep DELETE by ROWID",
2980 instance(),
2981 fragPtr.p->fragTableId,
2982 fragPtr.p->fragmentId,
2983 key.m_page_no,
2984 key.m_page_idx));
2985 if (num_entries == 0)
2986 {
2987 jam();
2988 remove_top_from_lcp_keep_list(fragPtr.p, copytuple, tmp);
2989 }
2990 record_delete_by_rowid(signal,
2991 fragPtr.p->fragTableId,
2992 fragPtr.p->fragmentId,
2993 *scanPtrP,
2994 key,
2995 0,
2996 false);
2997 if (num_entries == 0)
2998 {
2999 jam();
3000 c_undo_buffer.free_copy_tuple(&tmp);
3001 }
3002 }
3003 }
3004 else
3005 {
3006 jam();
3007 /**
3008 * tmp points to copy tuple. We need real page id to change to correct
3009 * current page temporarily. This can be found in copytuple[0]
3010 * where handle_lcp_keep_commit puts it.
3011 */
3012 c_backup->change_current_page_temp(copytuple[0]);
3013 c_backup->lcp_keep_row();
3014 remove_top_from_lcp_keep_list(fragPtr.p, copytuple, tmp);
3015 DEB_LCP_KEEP(("(%u)tab(%u,%u) row(%u,%u) page(%u,%u): Handle LCP keep"
3016 " insert entry",
3017 instance(),
3018 fragPtr.p->fragTableId,
3019 fragPtr.p->fragmentId,
3020 copytuple[0],
3021 copytuple[1],
3022 tmp.m_page_no,
3023 tmp.m_page_idx));
3024 Local_key save = tmp;
3025 setCopyTuple(tmp.m_page_no, tmp.m_page_idx);
3026 prepare_scanTUPKEYREQ(tmp.m_page_no, tmp.m_page_idx);
3027 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
3028 conf->scanPtr = scanPtrP->m_userPtr;
3029 conf->accOperationPtr = (Uint32)-1;
3030 conf->fragId = fragPtr.p->fragmentId;
3031 conf->localKey[0] = tmp.m_page_no;
3032 conf->localKey[1] = tmp.m_page_idx;
3033 signal->setLength(NextScanConf::SignalLengthNoGCI);
3034 c_lqh->exec_next_scan_conf(signal);
3035 c_undo_buffer.free_copy_tuple(&save);
3036 return;
3037 }
3038 }
3039
3040 void
remove_top_from_lcp_keep_list(Fragrecord * fragPtrP,Uint32 * copytuple,Local_key tmp)3041 Dbtup::remove_top_from_lcp_keep_list(Fragrecord *fragPtrP,
3042 Uint32 *copytuple,
3043 Local_key tmp)
3044 {
3045 memcpy(&fragPtrP->m_lcp_keep_list_head,
3046 copytuple+2,
3047 sizeof(Local_key));
3048
3049 if (fragPtrP->m_lcp_keep_list_head.isNull())
3050 {
3051 jam();
3052 DEB_LCP_KEEP(("(%u) tab(%u,%u) tmp(%u,%u) keep_list(%u,%u):"
3053 " LCP keep list empty again",
3054 instance(),
3055 fragPtrP->fragTableId,
3056 fragPtrP->fragmentId,
3057 tmp.m_page_no,
3058 tmp.m_page_idx,
3059 fragPtrP->m_lcp_keep_list_tail.m_page_no,
3060 fragPtrP->m_lcp_keep_list_tail.m_page_idx));
3061 ndbrequire(tmp.m_page_no == fragPtrP->m_lcp_keep_list_tail.m_page_no);
3062 ndbrequire(tmp.m_page_idx == fragPtrP->m_lcp_keep_list_tail.m_page_idx);
3063 fragPtrP->m_lcp_keep_list_tail.setNull();
3064 }
3065 else
3066 {
3067 jam();
3068 DEB_LCP_KEEP(("(%u)tab(%u,%u) move LCP keep head(%u,%u),tail(%u,%u)",
3069 instance(),
3070 fragPtrP->fragTableId,
3071 fragPtrP->fragmentId,
3072 fragPtrP->m_lcp_keep_list_head.m_page_no,
3073 fragPtrP->m_lcp_keep_list_head.m_page_idx,
3074 fragPtrP->m_lcp_keep_list_tail.m_page_no,
3075 fragPtrP->m_lcp_keep_list_tail.m_page_idx));
3076 }
3077 }
3078
3079 void
handle_lcp_drop_change_page(Fragrecord * fragPtrP,Uint32 logicalPageId,PagePtr pagePtr,bool delete_by_pageid)3080 Dbtup::handle_lcp_drop_change_page(Fragrecord *fragPtrP,
3081 Uint32 logicalPageId,
3082 PagePtr pagePtr,
3083 bool delete_by_pageid)
3084 {
3085 /**
3086 * We are performing an LCP scan currently. This page is part of the
3087 * CHANGED ROWS pages. This means that we need to record all rows
3088 * that was deleted at start of LCP. If the row was deleted since the
3089 * last LCP scan then we need to record it as a DELETE by ROWID in
3090 * the LCP. The rows that was deleted after LCP start have already
3091 * been handled. Those that have been handled have got the LCP_SKIP
3092 * bit set in the tuple header. Those not handled we need to check
3093 * the Row GCI to see if it is either 0 or >= scanGCI. If so then
3094 * we need to record them as part of LCP.
3095 *
3096 * We store all the rowid's we find to record as DELETE by ROWID in
3097 * in a local data array on the stack before we start writing them
3098 * into the LCP keep list.
3099 *
3100 * We depend on that allocation of copy tuple will always succeed.
3101 * Since we always will release the page we are scanning we hold
3102 * that page until we know that copy tuple allocation succeeded.
3103 * If not, we do not release the scanned page, rather only change
3104 * resource type of it in memory manager. The latter is done by
3105 * a two step operation. First account page as unused but do not
3106 * put it in any kind of free list. Then account it as a copy
3107 * tuple page.
3108 *
3109 * This procedure will guarantee that we have space to record the
3110 * DELETE by ROWIDs in the LCP keep list.
3111 *
3112 * An especially complex case happens when the LCP scan is in the
3113 * middle of scanning this page. This could happen due to an
3114 * inopportune real-time break in combination with multiple
3115 * deletes happening within this real-time break.
3116 *
3117 * If page_to_skip_lcp bit was set we will perform delete_by_pageid
3118 * here. So we need not worry about this flag in call to
3119 * is_rowid_in_remaining_lcp_set for each row in loop, this call will
3120 * ensure that we will skip any rows already handled by the LCP scan.
3121 */
3122 ScanOpPtr scanPtr;
3123 TablerecPtr tablePtr;
3124 scanPtr.i = fragPtrP->m_lcp_scan_op;
3125 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3126 tablePtr.i = fragPtrP->fragTableId;
3127 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3128 Uint32 scanGCI = scanPtr.p->m_scanGCI;
3129 Uint32 idx = 0; /* First record index */
3130 Uint32 size = tablePtr.p->m_offsets[MM].m_fix_header_size; /* Row size */
3131 Fix_page *page = (Fix_page*)pagePtr.p;
3132 Uint32 found_idx_count = 0;
3133 ndbrequire(size >= 4);
3134 Uint16 found_idx[2048]; /* Fixed size header never smaller than 16 bytes */
3135 DEB_LCP_REL(("(%u)tab(%u,%u)page(%u) handle_lcp_drop_page,"
3136 " delete_by_page: %u",
3137 instance(),
3138 fragPtrP->fragTableId,
3139 fragPtrP->fragmentId,
3140 logicalPageId,
3141 delete_by_pageid));
3142 if (!delete_by_pageid)
3143 {
3144 jam();
3145 Local_key key;
3146 /* Coverage tested */
3147 key.m_page_no = logicalPageId;
3148 while ((idx + size) <= Fix_page::DATA_WORDS)
3149 {
3150 Tuple_header *th = (Tuple_header*)&page->m_data[idx];
3151 Uint32 thbits = th->m_header_bits;
3152 Uint32 rowGCI = *th->get_mm_gci(tablePtr.p);
3153 bool lcp_skip_not_set =
3154 (thbits & Tuple_header::LCP_SKIP) ? false : true;
3155 ndbrequire(thbits & Tuple_header::FREE);
3156 ndbrequire(!(thbits & Tuple_header::LCP_DELETE) || lcp_skip_not_set);
3157 /**
3158 * We ignore LCP_DELETE on row here since if it is set then we also
3159 * know that LCP_SKIP isn't set, also we know rowGCI > scanGCI since the
3160 * row was inserted after start of LCP. So we will definitely record it
3161 * here for DELETE by ROWID.
3162 */
3163 key.m_page_idx = idx;
3164 bool is_in_remaining_lcp_set =
3165 is_rowid_in_remaining_lcp_set(pagePtr.p,
3166 fragPtrP,
3167 key,
3168 *scanPtr.p,
3169 0);
3170 if ((rowGCI > scanGCI || rowGCI == 0) &&
3171 lcp_skip_not_set &&
3172 is_in_remaining_lcp_set)
3173 {
3174 /* Coverage tested */
3175 jam();
3176 jamLine((Uint16)idx);
3177 found_idx[found_idx_count] = idx;
3178 found_idx_count++;
3179 DEB_LCP_REL(("(%u)tab(%u,%u)page(%u,%u) Keep_list DELETE_BY_ROWID",
3180 instance(),
3181 fragPtrP->fragTableId,
3182 fragPtrP->fragmentId,
3183 logicalPageId,
3184 idx));
3185 }
3186 else
3187 {
3188 /* Coverage tested */
3189 DEB_LCP_REL(("(%u)tab(%u,%u)page(%u,%u) skipped "
3190 "lcp_skip_not_set: %u, rowGCI: %u"
3191 " scanGCI: %u, in LCP set: %u",
3192 instance(),
3193 fragPtrP->fragTableId,
3194 fragPtrP->fragmentId,
3195 logicalPageId,
3196 idx,
3197 lcp_skip_not_set,
3198 rowGCI,
3199 scanGCI,
3200 is_in_remaining_lcp_set));
3201 }
3202 idx += size;
3203 }
3204 }
3205 else
3206 {
3207 jam();
3208 //ndbassert(false); //COVERAGE TEST
3209 found_idx_count = 1;
3210 found_idx[0] = ZNIL; /* Indicates DELETE by PAGEID */
3211 DEB_LCP_REL(("(%u)tab(%u,%u)page(%u) Keep_list DELETE_BY_PAGEID",
3212 instance(),
3213 fragPtrP->fragTableId,
3214 fragPtrP->fragmentId,
3215 logicalPageId));
3216 }
3217 Local_key location;
3218 /**
3219 * We store the following content into the copy tuple with a set of
3220 * DELETE by ROWID.
3221 * 1) Header (4 words)
3222 * 2) Number of rowids stored (1 word)
3223 * 3) Page Id (1 word)
3224 * 4) Array of Page indexes (1/2 word per entry)
3225 */
3226 if (found_idx_count == 0)
3227 {
3228 /* Nothing to store, all rows were already handled. */
3229 jam();
3230 returnCommonArea(pagePtr.i, 1);
3231 return;
3232 }
3233 Uint32 words = 6 + ((found_idx_count + 1) / 2);
3234 if (likely(c_undo_buffer.alloc_copy_tuple(&location, words) != nullptr))
3235 {
3236 jam();
3237 returnCommonArea(pagePtr.i, 1);
3238 }
3239 else
3240 {
3241 jam();
3242 ndbrequire(returnCommonArea_for_reuse(pagePtr.i, 1));
3243 ndbrequire(c_undo_buffer.reuse_page_for_copy_tuple(pagePtr.i));
3244 ndbrequire(c_undo_buffer.alloc_copy_tuple(&location, words) != nullptr);
3245 }
3246 Uint32 * copytuple = get_copy_tuple_raw(&location);
3247 Local_key flag_key;
3248 flag_key.m_page_no = FREE_PAGE_RNIL;
3249 flag_key.m_page_idx = 0;
3250 flag_key.m_file_no = 0;
3251
3252 copytuple[4] = found_idx_count;
3253 copytuple[5] = logicalPageId;
3254 memcpy(©tuple[6], &found_idx[0], 2 * found_idx_count);
3255 insert_lcp_keep_list(fragPtrP,
3256 location,
3257 copytuple,
3258 &flag_key);
3259 }
3260
3261 void
insert_lcp_keep_list(Fragrecord * fragPtrP,Local_key location,Uint32 * copytuple,const Local_key * rowid)3262 Dbtup::insert_lcp_keep_list(Fragrecord *fragPtrP,
3263 Local_key location,
3264 Uint32 *copytuple,
3265 const Local_key *rowid)
3266 {
3267 /**
3268 * Store original row-id in copytuple[0,1]
3269 * Store next-ptr in copytuple[2,3] (set to RNIL/RNIL)
3270 */
3271 assert(sizeof(Local_key) == 8);
3272 memcpy(copytuple+0, rowid, sizeof(Local_key));
3273 Local_key nil;
3274 nil.setNull();
3275 memcpy(copytuple+2, &nil, sizeof(Local_key));
3276 DEB_LCP_KEEP(("(%u)tab(%u,%u) Insert LCP keep for row(%u,%u)"
3277 " from location page(%u,%u)",
3278 instance(),
3279 fragPtrP->fragTableId,
3280 fragPtrP->fragmentId,
3281 rowid->m_page_no,
3282 rowid->m_page_idx,
3283 location.m_page_no,
3284 location.m_page_idx));
3285
3286 /**
3287 * Link in the copy tuple into the LCP keep list.
3288 */
3289 if (fragPtrP->m_lcp_keep_list_tail.isNull())
3290 {
3291 jam();
3292 fragPtrP->m_lcp_keep_list_head = location;
3293 }
3294 else
3295 {
3296 jam();
3297 Uint32 *tail = get_copy_tuple_raw(&fragPtrP->m_lcp_keep_list_tail);
3298 Local_key nextptr;
3299 memcpy(&nextptr, tail+2, sizeof(Local_key));
3300 ndbrequire(nextptr.isNull());
3301 memcpy(tail+2, &location, sizeof(Local_key));
3302 }
3303 fragPtrP->m_lcp_keep_list_tail = location;
3304 }
3305
3306 void
scanCont(Signal * signal,ScanOpPtr scanPtr)3307 Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
3308 {
3309 bool immediate = scanNext(signal, scanPtr);
3310 if (! immediate) {
3311 jam();
3312 // time-slicing again
3313 return;
3314 }
3315 scanReply(signal, scanPtr);
3316 }
3317
3318 void
disk_page_tup_scan_callback(Signal * signal,Uint32 scanPtrI,Uint32 page_i)3319 Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
3320 {
3321 ScanOpPtr scanPtr;
3322 scanPtr.i = scanPtrI;
3323 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3324 ScanOp& scan = *scanPtr.p;
3325 c_lqh->setup_scan_pointers(scan.m_userPtr);
3326 ScanPos& pos = scan.m_scanPos;
3327 // get cache page
3328 Ptr<GlobalPage> gptr;
3329 m_global_page_pool.getPtr(gptr, page_i);
3330 pos.m_page = (Page*)gptr.p;
3331 // continue
3332 ndbrequire((scan.m_bits & ScanOp::SCAN_LOCK) == 0);
3333 /**
3334 * Since Disk scans can only scan read only and without locks we can bypass
3335 * the code in execACC_CHECK_SCAN and move directly to scanNext and
3336 * scanReply.
3337 */
3338 scanCont(signal, scanPtr);
3339 }
3340
3341 void
scanClose(Signal * signal,ScanOpPtr scanPtr)3342 Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
3343 {
3344 ScanOp& scan = *scanPtr.p;
3345 ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) && scan.m_accLockOp == RNIL);
3346 {
3347 /**
3348 * unlock all not unlocked by LQH
3349 * Ensure that LocalDLFifoList is destroyed before calling
3350 * EXECUTE_DIRECT on NEXT_SCANCONF which might end up
3351 * creating the same object further down the stack.
3352 */
3353 Local_ScanLock_fifo list(c_scanLockPool, scan.m_accLockOps);
3354 ScanLockPtr lockPtr;
3355 while (list.first(lockPtr)) {
3356 jam();
3357 AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
3358 lockReq->returnCode = RNIL;
3359 lockReq->requestInfo = AccLockReq::Abort;
3360 lockReq->accOpPtr = lockPtr.p->m_accLockOp;
3361 EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
3362 jamEntry();
3363 ndbrequire(lockReq->returnCode == AccLockReq::Success);
3364 list.remove(lockPtr);
3365 release_scan_lock(lockPtr);
3366 }
3367 }
3368 checkPoolShrinkNeed(DBTUP_SCAN_LOCK_TRANSIENT_POOL_INDEX,
3369 c_scanLockPool);
3370 // send conf
3371 scan.m_last_seen = __LINE__;
3372 NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
3373 conf->scanPtr = scanPtr.p->m_userPtr;
3374 conf->accOperationPtr = RNIL;
3375 conf->fragId = RNIL;
3376 releaseScanOp(scanPtr);
3377 signal->setLength(NextScanConf::SignalLengthNoTuple);
3378 c_lqh->exec_next_scan_conf(signal);
3379 return;
3380 }
3381
release_scan_lock(ScanLockPtr releasePtr)3382 void Dbtup::release_scan_lock(ScanLockPtr releasePtr)
3383 {
3384 if (likely(releasePtr.i != c_copy_frag_scan_lock))
3385 {
3386 c_scanLockPool.release(releasePtr);
3387 }
3388 else
3389 {
3390 jam();
3391 releasePtr.p->m_accLockOp = RNIL;
3392 releasePtr.p->prevList = RNIL;
3393 releasePtr.p->nextList = RNIL;
3394 }
3395 }
3396
release_c_free_scan_lock()3397 void Dbtup::release_c_free_scan_lock()
3398 {
3399 if (c_freeScanLock != RNIL)
3400 {
3401 ScanLockPtr releasePtr;
3402 releasePtr.i = c_freeScanLock;
3403 ndbrequire(c_scanLockPool.getValidPtr(releasePtr));
3404 release_scan_lock(releasePtr);
3405 c_freeScanLock = RNIL;
3406 checkPoolShrinkNeed(DBTUP_SCAN_LOCK_TRANSIENT_POOL_INDEX,
3407 c_scanLockPool);
3408 }
3409 }
3410
3411 void
addAccLockOp(ScanOp & scan,Uint32 accLockOp)3412 Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
3413 {
3414 Local_ScanLock_fifo list(c_scanLockPool, scan.m_accLockOps);
3415 ScanLockPtr lockPtr;
3416 #ifdef VM_TRACE
3417 list.first(lockPtr);
3418 while (lockPtr.i != RNIL) {
3419 ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
3420 list.next(lockPtr);
3421 }
3422 #endif
3423 lockPtr.i = c_freeScanLock;
3424 c_freeScanLock = RNIL;
3425 ndbrequire(c_scanLockPool.getValidPtr(lockPtr));
3426 lockPtr.p->m_accLockOp = accLockOp;
3427 list.addLast(lockPtr);
3428 }
3429
3430 void
removeAccLockOp(ScanOp & scan,Uint32 accLockOp)3431 Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
3432 {
3433 Local_ScanLock_fifo list(c_scanLockPool, scan.m_accLockOps);
3434 ScanLockPtr lockPtr;
3435 list.first(lockPtr);
3436 while (lockPtr.i != RNIL) {
3437 if (lockPtr.p->m_accLockOp == accLockOp) {
3438 jam();
3439 break;
3440 }
3441 list.next(lockPtr);
3442 }
3443 ndbrequire(lockPtr.i != RNIL);
3444 list.remove(lockPtr);
3445 release_scan_lock(lockPtr);
3446 checkPoolShrinkNeed(DBTUP_SCAN_LOCK_TRANSIENT_POOL_INDEX,
3447 c_scanLockPool);
3448 }
3449
3450 void
stop_lcp_scan(Uint32 tableId,Uint32 fragId)3451 Dbtup::stop_lcp_scan(Uint32 tableId, Uint32 fragId)
3452 {
3453 jamEntry();
3454 TablerecPtr tablePtr;
3455 tablePtr.i = tableId;
3456 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3457
3458 FragrecordPtr fragPtr;
3459 fragPtr.i = RNIL;
3460 getFragmentrec(fragPtr, fragId, tablePtr.p);
3461 ndbrequire(fragPtr.i != RNIL);
3462 Fragrecord& frag = *fragPtr.p;
3463
3464 ndbrequire(frag.m_lcp_scan_op != RNIL && c_lcp_scan_op != RNIL);
3465 ScanOpPtr scanPtr;
3466 scanPtr.i = frag.m_lcp_scan_op;
3467 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3468 ndbrequire(scanPtr.p->m_fragPtrI != RNIL);
3469
3470 fragPtr.p->m_lcp_scan_op = RNIL;
3471 scanPtr.p->m_fragPtrI = RNIL;
3472 scanPtr.p->m_tableId = RNIL;
3473 }
3474
3475 void
releaseScanOp(ScanOpPtr & scanPtr)3476 Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
3477 {
3478 FragrecordPtr fragPtr;
3479 fragPtr.i = scanPtr.p->m_fragPtrI;
3480 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3481
3482 if (scanPtr.p->m_bits & ScanOp::SCAN_LCP)
3483 {
3484 jam();
3485 /**
3486 * Ignore, handled in release_lcp_scan, an LCP scan
3487 * can happen in several scans, one per LCP file.
3488 */
3489 }
3490 else if ((scanPtr.p->m_bits & ScanOp::SCAN_COPY_FRAG) != 0)
3491 {
3492 jam();
3493 ndbrequire(c_copy_frag_scan_op == scanPtr.i);
3494 scanPtr.p->m_state = ScanOp::First;
3495 scanPtr.p->m_bits = 0;
3496 }
3497 else
3498 {
3499 jam();
3500 Local_ScanOp_list list(c_scanOpPool, fragPtr.p->m_scanList);
3501 list.remove(scanPtr);
3502 c_scanOpPool.release(scanPtr);
3503 checkPoolShrinkNeed(DBTUP_SCAN_OPERATION_TRANSIENT_POOL_INDEX,
3504 c_scanOpPool);
3505 }
3506 }
3507
3508 void
start_lcp_scan(Uint32 tableId,Uint32 fragId,Uint32 & max_page_cnt)3509 Dbtup::start_lcp_scan(Uint32 tableId,
3510 Uint32 fragId,
3511 Uint32 & max_page_cnt)
3512 {
3513 jamEntry();
3514 TablerecPtr tablePtr;
3515 tablePtr.i = tableId;
3516 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3517
3518 FragrecordPtr fragPtr;
3519 fragPtr.i = RNIL;
3520 getFragmentrec(fragPtr, fragId, tablePtr.p);
3521 ndbrequire(fragPtr.i != RNIL);
3522 Fragrecord& frag = *fragPtr.p;
3523
3524 ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
3525 frag.m_lcp_scan_op = c_lcp_scan_op;
3526 ScanOpPtr scanPtr;
3527 scanPtr.i = frag.m_lcp_scan_op;
3528 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3529 ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
3530 new (scanPtr.p) ScanOp;
3531 scanPtr.p->m_fragPtrI = fragPtr.i;
3532 scanPtr.p->m_tableId = tableId;
3533 scanPtr.p->m_state = ScanOp::First;
3534 scanPtr.p->m_last_seen = __LINE__;
3535 scanPtr.p->m_endPage = frag.m_max_page_cnt;
3536 max_page_cnt = frag.m_max_page_cnt;
3537
3538 ndbrequire(frag.m_lcp_keep_list_head.isNull());
3539 ndbrequire(frag.m_lcp_keep_list_tail.isNull());
3540 }
3541
3542 void
lcp_frag_watchdog_print(Uint32 tableId,Uint32 fragId)3543 Dbtup::lcp_frag_watchdog_print(Uint32 tableId, Uint32 fragId)
3544 {
3545 TablerecPtr tablePtr;
3546 tablePtr.i = tableId;
3547 if (tableId > cnoOfTablerec)
3548 {
3549 jam();
3550 return;
3551 }
3552 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3553
3554 FragrecordPtr fragPtr;
3555 fragPtr.i = RNIL;
3556 getFragmentrec(fragPtr, fragId, tablePtr.p);
3557 ndbrequire(fragPtr.i != RNIL);
3558 Fragrecord& frag = *fragPtr.p;
3559
3560 if (c_lcp_scan_op == RNIL)
3561 {
3562 jam();
3563 g_eventLogger->info("No LCP scan ongoing in TUP tab(%u,%u)",
3564 tableId, fragId);
3565 ndbabort();
3566 }
3567 else if (frag.m_lcp_scan_op == RNIL)
3568 {
3569 jam();
3570 DEB_LCP(("LCP scan stopped, signal to stop watchdog still in flight tab(%u,%u)",
3571 tableId, fragId));
3572 }
3573 else if (frag.m_lcp_scan_op != c_lcp_scan_op)
3574 {
3575 jam();
3576 g_eventLogger->info("Corrupt internal, LCP scan not on correct tab(%u,%u)",
3577 tableId, fragId);
3578 ndbabort();
3579 }
3580 else
3581 {
3582 jam();
3583 ScanOpPtr scanPtr;
3584 scanPtr.i = frag.m_lcp_scan_op;
3585 ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3586 g_eventLogger->info("LCP Frag watchdog: tab(%u,%u), state: %u,"
3587 " last seen line %u",
3588 tableId, fragId,
3589 scanPtr.p->m_state,
3590 scanPtr.p->m_last_seen);
3591 }
3592 }
3593