1 /*
2    Copyright (c) 2005, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUP_C
26 #define DBTUP_SCAN_CPP
27 #include "Dbtup.hpp"
28 #include "../backup/Backup.hpp"
29 #include <signaldata/AccScan.hpp>
30 #include <signaldata/NextScan.hpp>
31 #include <signaldata/AccLock.hpp>
32 #include <md5_hash.hpp>
33 #include <portlib/ndb_prefetch.h>
34 #include "../dblqh/Dblqh.hpp"
35 
36 #define JAM_FILE_ID 408
37 
38 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
39 //#define DEBUG_LCP 1
40 //#define DEBUG_LCP_DEL2 1
41 //#define DEBUG_LCP_DEL_EXTRA 1
42 //#define DEBUG_LCP_SKIP_EXTRA 1
43 //#define DEBUG_LCP_KEEP 1
44 //#define DEBUG_LCP_REL 1
45 //#define DEBUG_NR_SCAN 1
46 //#define DEBUG_NR_SCAN_EXTRA 1
47 //#define DEBUG_LCP_SCANNED_BIT 1
48 //#define DEBUG_LCP_FILTER 1
49 //#define DEBUG_LCP_DEL 1
50 //#define DEBUG_LCP_DELAY 1
51 //#define DEBUG_LCP_SKIP 1
52 //#define DEBUG_LCP_DEL 1
53 //#define DEBUG_LCP_SKIP 1
54 #endif
55 
56 #ifdef DEBUG_LCP_DELAY
57 #define DEB_LCP_DELAY(arglist) do { g_eventLogger->info arglist ; } while (0)
58 #else
59 #define DEB_LCP_DELAY(arglist) do { } while (0)
60 #endif
61 
62 #ifdef DEBUG_LCP_FILTER
63 #define DEB_LCP_FILTER(arglist) do { g_eventLogger->info arglist ; } while (0)
64 #else
65 #define DEB_LCP_FILTER(arglist) do { } while (0)
66 #endif
67 
68 #ifdef DEBUG_LCP
69 #define DEB_LCP(arglist) do { g_eventLogger->info arglist ; } while (0)
70 #else
71 #define DEB_LCP(arglist) do { } while (0)
72 #endif
73 
74 #ifdef DEBUG_LCP_DEL
75 #define DEB_LCP_DEL(arglist) do { g_eventLogger->info arglist ; } while (0)
76 #else
77 #define DEB_LCP_DEL(arglist) do { } while (0)
78 #endif
79 
80 #ifdef DEBUG_LCP_DEL2
81 #define DEB_LCP_DEL2(arglist) do { g_eventLogger->info arglist ; } while (0)
82 #else
83 #define DEB_LCP_DEL2(arglist) do { } while (0)
84 #endif
85 
86 #ifdef DEBUG_LCP_DEL_EXTRA
87 #define DEB_LCP_DEL_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
88 #else
89 #define DEB_LCP_DEL_EXTRA(arglist) do { } while (0)
90 #endif
91 
92 #ifdef DEBUG_LCP_SKIP
93 #define DEB_LCP_SKIP(arglist) do { g_eventLogger->info arglist ; } while (0)
94 #else
95 #define DEB_LCP_SKIP(arglist) do { } while (0)
96 #endif
97 
98 #ifdef DEBUG_LCP_SKIP_EXTRA
99 #define DEB_LCP_SKIP_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
100 #else
101 #define DEB_LCP_SKIP_EXTRA(arglist) do { } while (0)
102 #endif
103 
104 #ifdef DEBUG_LCP_KEEP
105 #define DEB_LCP_KEEP(arglist) do { g_eventLogger->info arglist ; } while (0)
106 #else
107 #define DEB_LCP_KEEP(arglist) do { } while (0)
108 #endif
109 
110 #ifdef DEBUG_LCP_REL
111 #define DEB_LCP_REL(arglist) do { g_eventLogger->info arglist ; } while (0)
112 #else
113 #define DEB_LCP_REL(arglist) do { } while (0)
114 #endif
115 
116 #ifdef DEBUG_NR_SCAN
117 #define DEB_NR_SCAN(arglist) do { g_eventLogger->info arglist ; } while (0)
118 #else
119 #define DEB_NR_SCAN(arglist) do { } while (0)
120 #endif
121 
122 #ifdef DEBUG_NR_SCAN_EXTRA
123 #define DEB_NR_SCAN_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
124 #else
125 #define DEB_NR_SCAN_EXTRA(arglist) do { } while (0)
126 #endif
127 
128 #ifdef VM_TRACE
129 #define dbg(x) globalSignalLoggers.log x
130 #else
131 #define dbg(x)
132 #endif
133 
134 void
prepare_scan_ctx(Uint32 scanPtrI)135 Dbtup::prepare_scan_ctx(Uint32 scanPtrI)
136 {
137   (void)scanPtrI;
138 }
139 
140 void
execACC_SCANREQ(Signal * signal)141 Dbtup::execACC_SCANREQ(Signal* signal)
142 {
143   jamEntry();
144   const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
145   const AccScanReq* const req = &reqCopy;
146   ScanOpPtr scanPtr;
147   scanPtr.i = RNIL;
148   do {
149     // find table and fragment
150     TablerecPtr tablePtr;
151     tablePtr.i = req->tableId;
152     ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
153     FragrecordPtr fragPtr;
154     Uint32 fragId = req->fragmentNo;
155     fragPtr.i = RNIL;
156     getFragmentrec(fragPtr, fragId, tablePtr.p);
157     ndbrequire(fragPtr.i != RNIL);
158     Fragrecord& frag = *fragPtr.p;
159     // flags
160     Uint32 bits = 0;
161 
162     if (AccScanReq::getLcpScanFlag(req->requestInfo))
163     {
164       jam();
165       bits |= ScanOp::SCAN_LCP;
166       scanPtr.i = c_lcp_scan_op;
167       ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
168       ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i);
169       ndbrequire(scanPtr.p->m_state == ScanOp::First);
170     }
171     else if (AccScanReq::getCopyFragScanFlag(req->requestInfo))
172     {
173       jam();
174       bits |= ScanOp::SCAN_COPY_FRAG;
175       scanPtr.i = c_copy_frag_scan_op;
176       ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
177       ndbrequire(scanPtr.p->m_state == ScanOp::First);
178       ndbrequire(scanPtr.p->m_bits == 0);
179     }
180     else
181     {
182       // seize from pool and link to per-fragment list
183       if (!c_scanOpPool.seize(scanPtr))
184       {
185         jam();
186         break;
187       }
188       Local_ScanOp_list list(c_scanOpPool, frag.m_scanList);
189       list.addFirst(scanPtr);
190       jam();
191     }
192 
193     if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
194         && tablePtr.p->m_no_of_disk_attributes)
195     {
196       jam();
197       bits |= ScanOp::SCAN_DD;
198     }
199 
200     bool mm = (bits & ScanOp::SCAN_DD);
201     if ((tablePtr.p->m_attributes[mm].m_no_of_varsize +
202          tablePtr.p->m_attributes[mm].m_no_of_dynamic) > 0)
203     {
204       if (bits & ScanOp::SCAN_DD)
205       {
206         // only dd scan varsize pages
207         // mm always has a fixed part
208         jam();
209         bits |= ScanOp::SCAN_VS;
210       }
211     }
212 
213     if (! AccScanReq::getReadCommittedFlag(req->requestInfo))
214     {
215       if (AccScanReq::getLockMode(req->requestInfo) == 0)
216       {
217         jam();
218         bits |= ScanOp::SCAN_LOCK_SH;
219       }
220       else
221       {
222         jam();
223         bits |= ScanOp::SCAN_LOCK_EX;
224       }
225     }
226 
227     if (AccScanReq::getNRScanFlag(req->requestInfo))
228     {
229       jam();
230       bits |= ScanOp::SCAN_NR;
231       scanPtr.p->m_endPage = req->maxPage;
232       if (req->maxPage != RNIL && req->maxPage > frag.m_max_page_cnt)
233       {
234         DEB_NR_SCAN(("%u %u endPage: %u (noOfPages: %u maxPage: %u)",
235                      tablePtr.i,
236                      fragId,
237                      req->maxPage,
238                      fragPtr.p->noOfPages,
239                      fragPtr.p->m_max_page_cnt));
240       }
241     }
242     else if (AccScanReq::getLcpScanFlag(req->requestInfo))
243     {
244       jam();
245       ndbrequire((bits & ScanOp::SCAN_DD) == 0);
246       ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
247     }
248     else
249     {
250       jam();
251       scanPtr.p->m_endPage = RNIL;
252     }
253 
254     if (bits & ScanOp::SCAN_VS)
255     {
256       jam();
257       ndbrequire((bits & ScanOp::SCAN_NR) == 0);
258       ndbrequire((bits & ScanOp::SCAN_LCP) == 0);
259     }
260 
261     // set up scan op
262     ScanOp& scan = *scanPtr.p;
263     scan.m_state = ScanOp::First;
264     scan.m_bits = bits;
265     scan.m_userPtr = req->senderData;
266     scan.m_userRef = req->senderRef;
267     scan.m_tableId = tablePtr.i;
268     scan.m_fragId = frag.fragmentId;
269     scan.m_fragPtrI = fragPtr.i;
270     scan.m_transId1 = req->transId1;
271     scan.m_transId2 = req->transId2;
272     scan.m_savePointId = req->savePointId;
273     scan.m_accLockOp = RNIL;
274     scan.m_last_seen = __LINE__;
275 
276     // conf
277     AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
278     conf->scanPtr = req->senderData;
279     conf->accPtr = scanPtr.i;
280     conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
281     signal->theData[8] = 0;
282     /* Return ACC_SCANCONF */
283     return;
284   } while (0);
285   signal->theData[8] = AccScanRef::TupNoFreeScanOp; /* Failure */
286   /* Return ACC_SCANREF */
287 }
288 
289 void
execNEXT_SCANREQ(Signal * signal)290 Dbtup::execNEXT_SCANREQ(Signal* signal)
291 {
292   jamEntryDebug();
293   const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
294   const NextScanReq* const req = &reqCopy;
295   ScanOpPtr scanPtr;
296   scanPtr.i = req->accPtr;
297   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
298   ScanOp& scan = *scanPtr.p;
299   switch (req->scanFlag) {
300   case NextScanReq::ZSCAN_NEXT:
301     jam();
302     break;
303   case NextScanReq::ZSCAN_COMMIT:
304     jam();
305     // Fall through
306   case NextScanReq::ZSCAN_NEXT_COMMIT:
307     jam();
308     if ((scan.m_bits & ScanOp::SCAN_LOCK) != 0) {
309       jam();
310       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
311       lockReq->returnCode = RNIL;
312       lockReq->requestInfo = AccLockReq::Unlock;
313       lockReq->accOpPtr = req->accOperationPtr;
314       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
315           signal, AccLockReq::UndoSignalLength);
316       jamEntry();
317       ndbrequire(lockReq->returnCode == AccLockReq::Success);
318       removeAccLockOp(scan, req->accOperationPtr);
319     }
320     if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
321       signal->theData[0] = 0; /* Success */
322       /**
323        * signal->theData[0] = 0 means return signal
324        * NEXT_SCANCONF for NextScanReq::ZSCAN_COMMIT
325        */
326       return;
327     }
328     break;
329   case NextScanReq::ZSCAN_CLOSE:
330     jam();
331     if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
332       jam();
333       ndbrequire(scan.m_accLockOp != RNIL);
334       // use ACC_ABORTCONF to flush out any reply in job buffer
335       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
336       lockReq->returnCode = RNIL;
337       lockReq->requestInfo = AccLockReq::AbortWithConf;
338       lockReq->accOpPtr = scan.m_accLockOp;
339       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
340 		     signal, AccLockReq::UndoSignalLength);
341       jamEntry();
342       ndbrequire(lockReq->returnCode == AccLockReq::Success);
343       scan.m_last_seen = __LINE__;
344       scan.m_state = ScanOp::Aborting;
345       return;
346     }
347     if (scan.m_state == ScanOp::Locked) {
348       jam();
349       ndbrequire(scan.m_accLockOp != RNIL);
350       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
351       lockReq->returnCode = RNIL;
352       lockReq->requestInfo = AccLockReq::Abort;
353       lockReq->accOpPtr = scan.m_accLockOp;
354       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
355 		     signal, AccLockReq::UndoSignalLength);
356       jamEntry();
357       ndbrequire(lockReq->returnCode == AccLockReq::Success);
358       scan.m_accLockOp = RNIL;
359     }
360     scan.m_last_seen = __LINE__;
361     scan.m_state = ScanOp::Aborting;
362     scanClose(signal, scanPtr);
363     return;
364   case NextScanReq::ZSCAN_NEXT_ABORT:
365     ndbabort();
366   default:
367     ndbabort();
368   }
369   // start looking for next scan result
370   AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
371   checkReq->accPtr = scanPtr.i;
372   checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
373   EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
374   jamEntryDebug();
375 }
376 
377 void
execACC_CHECK_SCAN(Signal * signal)378 Dbtup::execACC_CHECK_SCAN(Signal* signal)
379 {
380   jamEntryDebug();
381   const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
382   const AccCheckScan* const req = &reqCopy;
383   ScanOpPtr scanPtr;
384   scanPtr.i = req->accPtr;
385   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
386   ScanOp& scan = *scanPtr.p;
387   // fragment
388   FragrecordPtr fragPtr;
389   fragPtr.i = scan.m_fragPtrI;
390   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
391   Fragrecord& frag = *fragPtr.p;
392   bool wait_for_scan_lock_record = false;
393   if (scan.m_bits & ScanOp::SCAN_LOCK &&
394       c_freeScanLock == RNIL)
395   {
396     ScanLockPtr allocPtr;
397     if (likely((scan.m_bits & ScanOp::SCAN_COPY_FRAG) == 0))
398     {
399       if (likely(c_scanLockPool.seize(allocPtr)))
400       {
401         c_freeScanLock = allocPtr.i;
402       }
403       else
404       {
405         jam();
406         wait_for_scan_lock_record = true;
407       }
408     }
409     else
410     {
411       jam();
412       /**
413        * Copy fragment scans use a preallocated scan lock record to avoid
414        * risk of not getting a scan lock record.
415        */
416       c_freeScanLock = c_copy_frag_scan_lock;
417     }
418   }
419   if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP &&
420       (scan.m_bits & ScanOp::SCAN_LOCK_WAIT ||
421        wait_for_scan_lock_record))
422   {
423     /**
424      * Go to sleep for 1 millisecond while we are waiting for a
425      * row lock or the scan lock record to store the row lock in.
426      *
427      * Could also be that we are waiting for a lock record to become
428      * available.
429      */
430     jam();
431     CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
432     cls->scanPtrI = scan.m_userPtr;
433     if (wait_for_scan_lock_record)
434     {
435       jam();
436       cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT_STOPPABLE;
437     }
438     else
439     {
440       jam();
441       cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT;
442     }
443     EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
444     if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
445     {
446       jamEntry();
447       release_c_free_scan_lock();
448       return;
449     }
450     jamEntry();
451     ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
452     /* Fall through, we will send NEXT_SCANCONF, this will detect close */
453   }
454   if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT ||
455       wait_for_scan_lock_record)
456   {
457     jam();
458     /**
459      * LQH asks if we are waiting for lock and we tell it to ask again
460      * The reason to go back to LQH here is to ensure that the scan can be
461      * closed if TC asked LQH to close the scan in the middle of the scan
462      * process.
463      * We go this path also when we could not allocate a lock record and
464      * it is time to go to LQH to check status before we go to sleep.
465      */
466     release_c_free_scan_lock();
467     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
468     conf->scanPtr = scan.m_userPtr;
469     conf->accOperationPtr = RNIL;       // no tuple returned
470     conf->fragId = frag.fragmentId;
471     // if TC has ordered scan close, it will be detected here
472     sendSignal(scan.m_userRef,
473                GSN_NEXT_SCANCONF,
474                signal,
475                NextScanConf::SignalLengthNoTuple,
476                JBB);
477     return;     // stop
478   }
479 
480   const bool lcp = (scan.m_bits & ScanOp::SCAN_LCP);
481 
482   if (scan.m_state == ScanOp::First)
483   {
484     if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
485     {
486       jam();
487       /**
488        * Handle lcp keep list already here
489        *   So that scan state is not altered
490        *   if lcp_keep rows are found in ScanOp::First
491        */
492       scan.m_last_seen = __LINE__;
493       handle_lcp_keep(signal, fragPtr, scanPtr.p);
494       release_c_free_scan_lock();
495       return;
496     }
497     jam();
498     scanFirst(signal, scanPtr);
499   }
500   if (scan.m_state == ScanOp::Next)
501   {
502     jam();
503     bool immediate = scanNext(signal, scanPtr);
504     if (! immediate) {
505       jam();
506       // time-slicing via TUP or PGMAN
507       release_c_free_scan_lock();
508       return;
509     }
510     jam();
511   }
512   scanReply(signal, scanPtr);
513 }
514 
515 void
scanReply(Signal * signal,ScanOpPtr scanPtr)516 Dbtup::scanReply(Signal* signal, ScanOpPtr scanPtr)
517 {
518   ScanOp& scan = *scanPtr.p;
519   FragrecordPtr fragPtr;
520   fragPtr.i = scan.m_fragPtrI;
521   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
522   Fragrecord& frag = *fragPtr.p;
523   // for reading tuple key in Current state
524   Uint32* pkData = (Uint32*)c_dataBuffer;
525   unsigned pkSize = 0;
526   if (scan.m_state == ScanOp::Current) {
527     // found an entry to return
528     jamDebug();
529     ndbrequire(scan.m_accLockOp == RNIL);
530     Uint32 scan_bits = scan.m_bits;
531     if (scan_bits & ScanOp::SCAN_LOCK)
532     {
533       jam();
534       ndbrequire((scan_bits & ScanOp::SCAN_LCP) == 0);
535       scan.m_last_seen = __LINE__;
536       // read tuple key - use TUX routine
537       const ScanPos& pos = scan.m_scanPos;
538       const Local_key& key_mm = pos.m_key_mm;
539       TablerecPtr tablePtr;
540       tablePtr.i = fragPtr.p->fragTableId;
541       ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
542       int ret = tuxReadPk((Uint32*)fragPtr.p,
543                           (Uint32*)tablePtr.p,
544                           pos.m_realpid_mm,
545                           key_mm.m_page_idx,
546 			  pkData, true);
547       ndbrequire(ret > 0);
548       pkSize = ret;
549       dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
550       // get read lock or exclusive lock
551       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
552       lockReq->returnCode = RNIL;
553       lockReq->requestInfo = (scan.m_bits & ScanOp::SCAN_LOCK_SH) ?
554         AccLockReq::LockShared : AccLockReq::LockExclusive;
555       lockReq->accOpPtr = RNIL;
556       lockReq->userPtr = scanPtr.i;
557       lockReq->userRef = reference();
558       lockReq->tableId = scan.m_tableId;
559       lockReq->fragId = frag.fragmentId;
560       lockReq->fragPtrI = RNIL; // no cached frag ptr yet
561       lockReq->hashValue = md5_hash((Uint64*)pkData, pkSize);
562       lockReq->page_id = key_mm.m_page_no;
563       lockReq->page_idx = key_mm.m_page_idx;
564       lockReq->transId1 = scan.m_transId1;
565       lockReq->transId2 = scan.m_transId2;
566       lockReq->isCopyFragScan = ((scan.m_bits & ScanOp::SCAN_COPY_FRAG) != 0);
567       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
568           signal, AccLockReq::LockSignalLength);
569       jamEntryDebug();
570       switch (lockReq->returnCode) {
571       case AccLockReq::Success:
572       {
573         jam();
574         scan.m_state = ScanOp::Locked;
575         scan.m_accLockOp = lockReq->accOpPtr;
576         break;
577       }
578       case AccLockReq::IsBlocked:
579       {
580         jam();
581         // normal lock wait
582         scan.m_state = ScanOp::Blocked;
583         scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
584         scan.m_accLockOp = lockReq->accOpPtr;
585         // LQH will wake us up
586         CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
587         cls->scanPtrI = scan.m_userPtr;
588         cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT;
589         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
590         if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
591         {
592           jamEntry();
593           /* Normal path */
594           release_c_free_scan_lock();
595           return;
596         }
597         jamEntry();
598         /* DBTC has most likely aborted due to timeout */
599         ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
600         /* Ensure that we send NEXT_SCANCONF immediately to close */
601         scan.m_state = ScanOp::Last;
602         break;
603       }
604       case AccLockReq::Refused:
605       {
606         jam();
607         // we cannot see deleted tuple (assert only)
608         ndbassert(false);
609         // skip it
610         scan.m_state = ScanOp::Next;
611         CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
612         cls->scanPtrI = scan.m_userPtr;
613         cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT;
614         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
615         if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
616         {
617           jamEntry();
618           release_c_free_scan_lock();
619           return;
620         }
621         jamEntry();
622         ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
623         /* Ensure that we send NEXT_SCANCONF immediately to close */
624         scan.m_state = ScanOp::Last;
625         break;
626         ndbassert(signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK);
627         return;
628       }
629       case AccLockReq::NoFreeOp:
630       {
631         jam();
632         // stay in Current state
633         ndbrequire((scan.m_bits & ScanOp::SCAN_COPY_FRAG) == 0);
634         scan.m_state = ScanOp::Current;
635         CheckLcpStop* cls = (CheckLcpStop*) signal->theData;
636         cls->scanPtrI = scan.m_userPtr;
637         cls->scanState = CheckLcpStop::ZSCAN_RESOURCE_WAIT_STOPPABLE;
638         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
639         if (signal->theData[0] == CheckLcpStop::ZTAKE_A_BREAK)
640         {
641           jamEntry();
642           release_c_free_scan_lock();
643           return;
644         }
645         jamEntry();
646         ndbrequire(signal->theData[0] == CheckLcpStop::ZABORT_SCAN);
647         /* Ensure that we send NEXT_SCANCONF immediately to close */
648         scan.m_state = ScanOp::Last;
649         break;
650       }
651       default:
652         ndbabort();
653       }
654       ndbassert(c_freeScanLock != RNIL);
655     }
656     else
657     {
658       ndbassert(c_freeScanLock == RNIL);
659       scan.m_state = ScanOp::Locked;
660     }
661   }
662 
663   if (scan.m_state == ScanOp::Locked)
664   {
665     // we have lock or do not need one
666     jamDebug();
667     // conf signal
668     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
669     conf->scanPtr = scan.m_userPtr;
670     // the lock is passed to LQH
671     Uint32 accLockOp = scan.m_accLockOp;
672     if (accLockOp != RNIL) {
673       scan.m_accLockOp = RNIL;
674       // remember it until LQH unlocks it
675       addAccLockOp(scan, accLockOp);
676       scan.m_last_seen = __LINE__;
677     } else {
678       ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK));
679       // operation RNIL in LQH would signal no tuple returned
680       accLockOp = (Uint32)-1;
681       scan.m_last_seen = __LINE__;
682     }
683     release_c_free_scan_lock();
684     const ScanPos& pos = scan.m_scanPos;
685     conf->accOperationPtr = accLockOp;
686     conf->fragId = frag.fragmentId;
687     conf->localKey[0] = pos.m_key_mm.m_page_no;
688     conf->localKey[1] = pos.m_key_mm.m_page_idx;
689     // next time look for next entry
690     scan.m_state = ScanOp::Next;
691     prepare_scanTUPKEYREQ(pos.m_key_mm.m_page_no,
692                           pos.m_key_mm.m_page_idx);
693     /**
694      * Running the lock code takes some extra execution time, one could
695      * have this effect the number of tuples to read in one time slot.
696      * We decided to ignore this here.
697      */
698     signal->setLength(NextScanConf::SignalLengthNoGCI);
699     c_lqh->exec_next_scan_conf(signal);
700     return;
701   }
702   if (scan.m_state == ScanOp::Last)
703   {
704     jam();
705     release_c_free_scan_lock();
706     scan.m_last_seen = __LINE__;
707     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
708     conf->scanPtr = scan.m_userPtr;
709     conf->accOperationPtr = RNIL;
710     conf->fragId = RNIL;
711     signal->setLength(NextScanConf::SignalLengthNoTuple);
712     c_lqh->exec_next_scan_conf(signal);
713     return;
714   }
715   else if (scan.m_state == ScanOp::Invalid)
716   {
717     jam();
718     scan.m_last_seen = __LINE__;
719     NextScanRef* const ref = (NextScanRef*)signal->getDataPtrSend();
720     ref->scanPtr = scan.m_userPtr;
721     ref->errorCode = m_scan_error_code;
722     c_lqh->exec_next_scan_ref(signal);
723     return;
724   }
725   ndbabort();
726 }
727 
728 /*
729  * Lock succeeded (after delay) in ACC.  If the lock is for current
730  * entry, set state to Locked.  If the lock is for an entry we were
731  * moved away from, simply unlock it.  Finally, if we are closing the
732  * scan, do nothing since we have already sent an abort request.
733  */
734 void
execACCKEYCONF(Signal * signal)735 Dbtup::execACCKEYCONF(Signal* signal)
736 {
737   jamEntry();
738   ScanOpPtr scanPtr;
739   scanPtr.i = signal->theData[0];
740 
741   Uint32 localKey1 = signal->theData[3];
742   Uint32 localKey2 = signal->theData[4];
743   Local_key tmp;
744   tmp.m_page_no = localKey1;
745   tmp.m_page_idx = localKey2;
746 
747   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
748   ScanOp& scan = *scanPtr.p;
749   ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
750   scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
751   if (scan.m_state == ScanOp::Blocked) {
752     // the lock wait was for current entry
753     jam();
754 
755     if (likely(scan.m_scanPos.m_key_mm.m_page_no == tmp.m_page_no &&
756                scan.m_scanPos.m_key_mm.m_page_idx == tmp.m_page_idx))
757     {
758       jam();
759       scan.m_state = ScanOp::Locked;
760       // LQH has the ball
761       return;
762     }
763     else
764     {
765       jam();
766       /**
767        * This means that there was DEL/INS on rowid that we tried to lock
768        *   and the primary key that was previously located on this rowid
769        *   (scanPos.m_key_mm) has moved.
770        *   (DBACC keeps of track of primary keys)
771        *
772        * We don't care about the primary keys, but is interested in ROWID
773        *   so rescan this position.
774        *   Which is implemented by using execACCKEYREF...
775        */
776       ndbout << "execACCKEYCONF "
777              << scan.m_scanPos.m_key_mm
778              << " != " << tmp << " ";
779       scan.m_bits |= ScanOp::SCAN_LOCK_WAIT;
780       execACCKEYREF(signal);
781       return;
782     }
783   }
784 
785   if (scan.m_state != ScanOp::Aborting) {
786     // we were moved, release lock
787     jam();
788     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
789     lockReq->returnCode = RNIL;
790     lockReq->requestInfo = AccLockReq::Abort;
791     lockReq->accOpPtr = scan.m_accLockOp;
792     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
793     jamEntry();
794     ndbrequire(lockReq->returnCode == AccLockReq::Success);
795     scan.m_accLockOp = RNIL;
796     // LQH has the ball
797     return;
798   }
799   // lose the lock
800   scan.m_accLockOp = RNIL;
801   // continue at ACC_ABORTCONF
802 }
803 
804 /*
805  * Lock failed (after delay) in ACC.  Probably means somebody ahead of
806  * us in lock queue deleted the tuple.
807  */
808 void
execACCKEYREF(Signal * signal)809 Dbtup::execACCKEYREF(Signal* signal)
810 {
811   jamEntry();
812   ScanOpPtr scanPtr;
813   scanPtr.i = signal->theData[0];
814   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
815   ScanOp& scan = *scanPtr.p;
816   ndbrequire(scan.m_bits & ScanOp::SCAN_LOCK_WAIT && scan.m_accLockOp != RNIL);
817   scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
818   if (scan.m_state != ScanOp::Aborting) {
819     jam();
820     // release the operation
821     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
822     lockReq->returnCode = RNIL;
823     lockReq->requestInfo = AccLockReq::Abort;
824     lockReq->accOpPtr = scan.m_accLockOp;
825     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
826     jamEntry();
827     ndbrequire(lockReq->returnCode == AccLockReq::Success);
828     scan.m_accLockOp = RNIL;
829     // scan position should already have been moved (assert only)
830     if (scan.m_state == ScanOp::Blocked) {
831       jam();
832       //ndbassert(false);
833       if (scan.m_bits & ScanOp::SCAN_NR)
834       {
835 	jam();
836         /**
837          * The tuple was locked and the transaction aborted. We need
838          * to re-read the tuple again to ensure that we don't miss
839          * out on deleting rows in the starting node that no longer
840          * exists in the live node.
841          */
842 	scan.m_state = ScanOp::Next;
843 	scan.m_scanPos.m_get = ScanPos::Get_tuple;
844 	DEB_NR_SCAN(("Ignoring scan.m_state == ScanOp::Blocked, refetch"));
845       }
846       else
847       {
848 	jam();
849 	scan.m_state = ScanOp::Next;
850 	DEB_NR_SCAN(("Ignoring scan.m_state == ScanOp::Blocked"));
851       }
852     }
853     // LQH has the ball
854     return;
855   }
856   // lose the lock
857   scan.m_accLockOp = RNIL;
858   // continue at ACC_ABORTCONF
859 }
860 
861 /*
862  * Received when scan is closing.  This signal arrives after any
863  * ACCKEYCON or ACCKEYREF which may have been in job buffer.
864  */
865 void
execACC_ABORTCONF(Signal * signal)866 Dbtup::execACC_ABORTCONF(Signal* signal)
867 {
868   jamEntry();
869   ScanOpPtr scanPtr;
870   scanPtr.i = signal->theData[0];
871   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
872   ScanOp& scan = *scanPtr.p;
873   ndbrequire(scan.m_state == ScanOp::Aborting);
874   c_lqh->setup_scan_pointers(scan.m_userPtr);
875   // most likely we are still in lock wait
876   if (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) {
877     jam();
878     scan.m_bits &= ~ ScanOp::SCAN_LOCK_WAIT;
879     scan.m_accLockOp = RNIL;
880   }
881   scanClose(signal, scanPtr);
882 }
883 
884 void
scanFirst(Signal *,ScanOpPtr scanPtr)885 Dbtup::scanFirst(Signal*, ScanOpPtr scanPtr)
886 {
887   ScanOp& scan = *scanPtr.p;
888   ScanPos& pos = scan.m_scanPos;
889   Local_key& key = pos.m_key;
890   const Uint32 bits = scan.m_bits;
891   // fragment
892   FragrecordPtr fragPtr;
893   fragPtr.i = scan.m_fragPtrI;
894   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
895   Fragrecord& frag = *fragPtr.p;
896 
897   if (bits & ScanOp::SCAN_NR)
898   {
899     if (scan.m_endPage == 0 && frag.m_max_page_cnt == 0)
900     {
901       jam();
902       scan.m_state = ScanOp::Last;
903       return;
904     }
905   }
906   else if (frag.noOfPages == 0)
907   {
908     jam();
909     if (!(bits & ScanOp::SCAN_LCP))
910     {
911       jam();
912       scan.m_state = ScanOp::Last;
913       return;
914     }
915     /**
916      * LCP scans will have to go through all pages even if no pages are still
917      * remaining to ensure that we reset the LCP scanned bits that possibly
918      * have been set before arriving here.
919      */
920   }
921 
922   if (bits & ScanOp::SCAN_LCP)
923   {
924     jam();
925     if (scan.m_endPage == 0)
926     {
927       jam();
928       /**
929        * Partition was empty at start of LCP, no records to report.
930        * In this case we cannot have set any LCP scanned bit since
931        * no page was around in table when the scan was started.
932        */
933       scan.m_last_seen = __LINE__;
934       scan.m_state = ScanOp::Last;
935       return;
936     }
937     c_backup->init_lcp_scan(scan.m_scanGCI,
938                             pos.m_lcp_scan_changed_rows_page);
939     scan.m_last_seen = __LINE__;
940   }
941 
942   if (! (bits & ScanOp::SCAN_DD)) {
943     key.m_file_no = ZNIL;
944     key.m_page_no = 0;
945     pos.m_get = ScanPos::Get_page_mm;
946 
947     // for MM scan real page id is cached for efficiency
948     pos.m_realpid_mm = RNIL;
949   } else {
950     Disk_alloc_info& alloc = frag.m_disk_alloc_info;
951     // for now must check disk part explicitly
952     if (alloc.m_extent_list.isEmpty()) {
953       jam();
954       scan.m_state = ScanOp::Last;
955       return;
956     }
957     pos.m_extent_info_ptr_i = alloc.m_extent_list.getFirst();
958     Extent_info* ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
959     key.m_file_no = ext->m_key.m_file_no;
960     key.m_page_no = ext->m_first_page_no;
961     pos.m_get = ScanPos::Get_page_dd;
962   }
963   key.m_page_idx = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
964   // let scanNext() do the work
965   scan.m_state = ScanOp::Next;
966 }
967 
968 #define ZSCAN_FOUND_TUPLE 1
969 #define ZSCAN_FOUND_DELETED_ROWID 2
970 #define ZSCAN_FOUND_PAGE_END 3
971 #define ZSCAN_FOUND_DROPPED_CHANGE_PAGE 4
972 #define ZSCAN_FOUND_NEXT_ROW 5
973 /**
974  * Start a scan of a page in LCP scan
975  * ----------------------------------
976  * We have seven options here for LCP scans:
977  * 1) The page entry is empty and was empty at start of
978  * LCP. In this case there is no flag set in the page
979  * map indicating that page was dropped since last it
980  * was dropped.
981  * 1a) The page was belonging to the CHANGED ROWS pages and the
982  * last LCP state was A. In this case we need to record a
983  * DELETE by PAGEID in the LCP.
984  *
985  * 1b) The page belonged to the CHANGED ROWS pages and the last
986  * LCP state was D. In this case we can ignore the page.
987  *
988  * 1c) The page was belonging to the ALL ROWS category.
989  * We can ignore it since we only record rows existing at start of
990  * the LCP.
991  * Then we continue with the next page.
992  *
993  * 2) The page entry is empty and it was recorded as being
994  * dropped since the LCP started. In this case the LCP scan
995  * have already taken care of this page, the needed information
996  * was sent to the LCP scan through the LCP keep list.
997  * 3) The page entry was not empty but the page map indicates
998  * that the page was dropped after the LCP scan started. In this
999  * tricky case the LCP scan started, the page was dropped, the
1000  * page was resurrected again and finally now we come here to
1001  * handle the page. Again in this case we can move on since the
1002  * page was handled at the time the page was dropped.
1003  *
1004  * 2) and 3) are found through either the LCP_SCANNED_BIT being
1005  * set in the page map, or by the page_to_skip_lcp bit being set
1006  * on the page object.
1007  *
1008  * 4) The page entry is non-empty. This is the normal page
1009  * handling where we scan one row at a time.
1010  *
1011  * Finally the case 4) can have four distinct options as well.
1012  * 4a) The page existed before the LCP started and had rows
1013  * in it that need to checked one by one. This is the normal
1014  * case and by far the most commonly executed.
1015  *
1016  * 4b) The page did not exist before the LCP scan was started, but
1017  * it was allocated after the LCP scan started and before we scanned
1018  * it (thus got the LCP skip bit set on the page). It belonged to
1019  * the ALL ROWS pages and thus the page will be skipped.
1020  *
1021  * Discovered either by LCP_SCANNED_BIT or by page_to_skip_lcp bit
1022  * being set on the page.
1023  *
1024  * 4c) Same as 4b) except that it belongs to the CHANGED ROWS pages.
1025  * Also the last LCP state was D. Page is ignored.
1026  *
1027  * 4d) Same as 4c) except that last LCP state was A. In this we
1028  * record the page as a DELETE by PAGEID in the LCP.
1029  */
1030 Uint32
prepare_lcp_scan_page(ScanOp & scan,Local_key & key,Uint32 * next_ptr,Uint32 * prev_ptr)1031 Dbtup::prepare_lcp_scan_page(ScanOp& scan,
1032                              Local_key& key,
1033                              Uint32 *next_ptr,
1034                              Uint32 *prev_ptr)
1035 {
1036   ScanPos& pos = scan.m_scanPos;
1037   bool lcp_page_already_scanned = get_lcp_scanned_bit(next_ptr);
1038   if (lcp_page_already_scanned)
1039   {
1040     jam();
1041     /* Coverage tested */
1042 #ifdef DEBUG_LCP_SCANNED_BIT
1043     if (next_ptr)
1044     {
1045       g_eventLogger->info("(%u)tab(%u,%u).%u"
1046                           " reset_lcp_scanned_bit(2)",
1047                           instance(),
1048                           m_curr_fragptr.p->fragTableId,
1049                           m_curr_fragptr.p->fragmentId,
1050                           key.m_page_no);
1051     }
1052 #endif
1053     reset_lcp_scanned_bit(next_ptr);
1054     c_backup->skip_page_lcp_scanned_bit();
1055     /* Either 2) or 3) as described above */
1056     /**
1057      * No state in page map to update, the page hasn't been
1058      * defined yet, so the position in page map is empty.
1059      */
1060     pos.m_get = ScanPos::Get_next_page_mm;
1061     scan.m_last_seen = __LINE__;
1062     return ZSCAN_FOUND_PAGE_END; // incr loop count
1063   }
1064   else if (unlikely(pos.m_realpid_mm == RNIL))
1065   {
1066     bool is_last_lcp_state_A = !get_last_lcp_state(prev_ptr);
1067     bool need_record_dropped_change =
1068       pos.m_lcp_scan_changed_rows_page && is_last_lcp_state_A;
1069     /**
1070      * Case 1) from above
1071      * If we come here without having LCP_SCANNED_BIT set then
1072      * we haven't released the page during LCP scan. Thus the
1073      * new last LCP state is D. Ensure that LAST_LCP_FREE_BIT
1074     * is set to indicate that LCP state is D for this LCP.
1075      */
1076     DEB_LCP_DEL2(("(%u)tab(%u,%u) page(%u),"
1077                   " is_last_lcp_state_A: %u, CHANGED: %u",
1078                   instance(),
1079                   m_curr_fragptr.p->fragTableId,
1080                   m_curr_fragptr.p->fragmentId,
1081                   key.m_page_no,
1082                   is_last_lcp_state_A,
1083                   pos.m_lcp_scan_changed_rows_page));
1084 
1085     set_last_lcp_state(prev_ptr, true);
1086     if (!need_record_dropped_change)
1087     {
1088       jam();
1089       /* Coverage tested */
1090       /* LCP case 1b) and 1c) above goes this way */
1091       scan.m_last_seen = __LINE__;
1092       pos.m_get = ScanPos::Get_next_page_mm;
1093       c_backup->skip_empty_page_lcp();
1094       return ZSCAN_FOUND_PAGE_END; // incr loop count
1095     }
1096     else
1097     {
1098       jam();
1099       /* Coverage tested */
1100       /* 1a) as described above */
1101       scan.m_last_seen = __LINE__;
1102       pos.m_get = ScanPos::Get_next_page_mm;
1103       c_backup->record_dropped_empty_page_lcp();
1104       return ZSCAN_FOUND_DROPPED_CHANGE_PAGE;
1105     }
1106   }
1107   else
1108   {
1109     jam();
1110     /**
1111      * Case 4) above, we need to set the last LCP state flag
1112      * on the pos object to ensure that we know when a row
1113      * needs to be DELETE by ROWID or if it needs to be ignored.
1114      */
1115     pos.m_is_last_lcp_state_D = get_last_lcp_state(prev_ptr);
1116     scan.m_last_seen = __LINE__;
1117   }
1118   return ZSCAN_FOUND_TUPLE;
1119 }
1120 
1121 Uint32
handle_lcp_skip_page(ScanOp & scan,Local_key key,Page * page)1122 Dbtup::handle_lcp_skip_page(ScanOp& scan,
1123                             Local_key key,
1124                             Page* page)
1125 {
1126   ScanPos& pos = scan.m_scanPos;
1127   /**
1128    * The page was allocated after the LCP started, so it can only
1129    * contain rows that was allocated after start of LCP and should
1130    * thus not be part of LCP. It is case 4b), 4c) or 4d). We need to
1131    * clear the skip bit on the page. We need to get the old lcp state
1132    * to be able to decide if it is 4c) or 4d). We also need to set
1133    * the last LCP* state to D.
1134    */
1135   DEB_LCP_SKIP(("(%u)Clear LCP_SKIP on tab(%u,%u), page(%u), change: %u, D: %u",
1136                 instance(),
1137                 m_curr_fragptr.p->fragTableId,
1138                 m_curr_fragptr.p->fragmentId,
1139                 key.m_page_no,
1140                 pos.m_lcp_scan_changed_rows_page,
1141                 pos.m_is_last_lcp_state_D));
1142 
1143   page->clear_page_to_skip_lcp();
1144   set_last_lcp_state(m_curr_fragptr.p,
1145                      key.m_page_no,
1146                      true /* Set state to D */);
1147 
1148   if (pos.m_lcp_scan_changed_rows_page && !pos.m_is_last_lcp_state_D)
1149   {
1150     jam();
1151     /* Coverage tested */
1152     /**
1153      * Case 4d) from above
1154      * At start of LCP the page was dropped, we have information that
1155      * the page was dropped after the previous LCP. Thus we need to
1156      * record the entire page as DELETE by PAGEID.
1157      */
1158     scan.m_last_seen = __LINE__;
1159     pos.m_get = ScanPos::Get_next_page_mm;
1160     c_backup->record_late_alloc_page_lcp();
1161     return ZSCAN_FOUND_DROPPED_CHANGE_PAGE;
1162   }
1163   jam();
1164   /* Coverage tested */
1165   /**
1166    * Case 4b) and 4c) from above
1167    * For ALL ROWS pages the rows should be skipped for LCP, we clear
1168    * the LCP skip flag on page in this case to speed up skipping.
1169    *
1170    * We need to keep track of the state Get_next_page_mm when checking
1171    * if a rowid is part of the remaining lcp set. If we do a real-time
1172    * break right after setting Get_next_page_mm we need to move the
1173    * page number forward one step since we have actually completed the
1174    * current page number.
1175    */
1176   scan.m_last_seen = __LINE__;
1177   pos.m_get = ScanPos::Get_next_page_mm;
1178   c_backup->page_to_skip_lcp(!pos.m_is_last_lcp_state_D);
1179   return ZSCAN_FOUND_PAGE_END; //incr loop count
1180 }
1181 
1182 Uint32
handle_scan_change_page_rows(ScanOp & scan,Fix_page * fix_page,Tuple_header * tuple_header_ptr,Uint32 & foundGCI)1183 Dbtup::handle_scan_change_page_rows(ScanOp& scan,
1184                                     Fix_page *fix_page,
1185                                     Tuple_header *tuple_header_ptr,
1186                                     Uint32 & foundGCI)
1187 {
1188   ScanPos& pos = scan.m_scanPos;
1189   Local_key& key = pos.m_key;
1190   /**
1191    * Coming here means that the following condition is true.
1192    * bits & ScanOp::SCAN_LCP && pos.m_lcp_changed_page
1193    *
1194    * We have 3 cases here,
1195    * foundGCI == 0:
1196    *   This means that the row has not been committed yet
1197    *   and it has not had any previous rows in this row
1198    *   id either. However the previous LCP might still have
1199    *   had a row in this position since we could have
1200    *   deallocated a page and allocated it again between
1201    *   2 LCPs. In this case we have to ensure that the
1202    *   row id is deleted as part of the restore.
1203    *
1204    * foundGCI > scanGCI
1205    * Record has changed since last LCP
1206    *   if header says tuple is free then the row is a deleted
1207    *   row and we record it
1208    *   otherwise it is a normal row to be recorded in normal
1209    *   manner for LCPs.
1210    *
1211    * We record deleted rowid's only if scanGCI which indicates
1212    * that we are recording only changes from this row. We need
1213    * not record deleted rowids for those parts where we record
1214    * all rows.
1215    */
1216   Uint32 thbits = tuple_header_ptr->m_header_bits;
1217   if ((foundGCI = *tuple_header_ptr->get_mm_gci(m_curr_tabptr.p)) >
1218        scan.m_scanGCI)
1219   {
1220     if (unlikely(thbits & Tuple_header::LCP_DELETE))
1221     {
1222       jam();
1223       /* Ensure that LCP_DELETE bit is clear before we move on */
1224       /* Coverage tested */
1225       tuple_header_ptr->m_header_bits =
1226         thbits & (~Tuple_header::LCP_DELETE);
1227       updateChecksum(tuple_header_ptr,
1228                      m_curr_tabptr.p,
1229                      thbits,
1230                      tuple_header_ptr->m_header_bits);
1231       fix_page->set_change_maps(key.m_page_idx);
1232       jamDebug();
1233       jamLineDebug((Uint16)key.m_page_idx);
1234       DEB_LCP_DEL(("(%u)Reset LCP_DELETE on tab(%u,%u),"
1235                    " row(%u,%u), header: %x",
1236                    instance(),
1237                    m_curr_fragptr.p->fragTableId,
1238                    m_curr_fragptr.p->fragmentId,
1239                    key.m_page_no,
1240                    key.m_page_idx,
1241                    thbits));
1242       ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1243       scan.m_last_seen = __LINE__;
1244       return ZSCAN_FOUND_DELETED_ROWID;
1245     }
1246     else if (! (thbits & Tuple_header::FREE ||
1247                 thbits & Tuple_header::DELETE_WAIT ||
1248                 thbits & Tuple_header::ALLOC))
1249     {
1250       jam();
1251       /**
1252        * Tuple has changed since last LCP, we need to record
1253        * the row as a changed row unless the LCP_SKIP bit is
1254        * set on the rowid which means that the row was inserted
1255        * after starting the LCP.
1256        */
1257       scan.m_last_seen = __LINE__;
1258       return ZSCAN_FOUND_TUPLE;
1259     }
1260     else if (scan.m_scanGCI > 0 &&
1261              !(thbits & Tuple_header::LCP_SKIP))
1262     {
1263       jam();
1264       /**
1265        * We have found a row which is free, we are however scanning
1266        * CHANGED ROWS pages and thus we need to insert a DELETE by
1267        * ROWID in LCP since the page was deleted since the last
1268        * LCP was executed. We check that LCP_SKIP bit isn't set, if
1269        * LCP_SKIP bit is set it means that the tuple was deleted
1270        * since the LCP started and we have already recorded the
1271        * row present at start of LCP when the tuple was deleted.
1272        *
1273        * If we delete it after LCP start we will certainly set
1274        * the GCI on the record > scanGCI, so it is an important
1275        * check for LCP_SKIP bit set.
1276        */
1277       scan.m_last_seen = __LINE__;
1278       return ZSCAN_FOUND_DELETED_ROWID;
1279     }
1280     else if (unlikely(thbits & Tuple_header::LCP_SKIP))
1281     {
1282       /* Ensure that LCP_SKIP bit is clear before we move on */
1283       jam();
1284       /* Coverage tested */
1285       tuple_header_ptr->m_header_bits =
1286         thbits & (~Tuple_header::LCP_SKIP);
1287       DEB_LCP_SKIP(("(%u) 2 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
1288                     ", header: %x",
1289                     instance(),
1290                     m_curr_fragptr.p->fragTableId,
1291                     m_curr_fragptr.p->fragmentId,
1292                     key.m_page_no,
1293                     key.m_page_idx,
1294                     thbits));
1295       updateChecksum(tuple_header_ptr,
1296                      m_curr_tabptr.p,
1297                      thbits,
1298                      tuple_header_ptr->m_header_bits);
1299       fix_page->set_change_maps(key.m_page_idx);
1300       jamDebug();
1301       jamLineDebug((Uint16)key.m_page_idx);
1302     }
1303     else
1304     {
1305       jamDebug();
1306       DEB_LCP_SKIP_EXTRA(("(%u)Skipped tab(%u,%u), row(%u,%u),"
1307                     " foundGCI: %u, scanGCI: %u, header: %x",
1308                     instance(),
1309                     m_curr_fragptr.p->fragTableId,
1310                     m_curr_fragptr.p->fragmentId,
1311                     key.m_page_no,
1312                     key.m_page_idx,
1313                     foundGCI,
1314                     scan.m_scanGCI,
1315                     thbits));
1316       ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1317       /* Coverage tested */
1318     }
1319     jam();
1320     scan.m_last_seen = __LINE__;
1321     /* Continue with next row */
1322     return ZSCAN_FOUND_NEXT_ROW;
1323   }
1324   else
1325   {
1326     /**
1327      * When setting LCP_DELETE flag we must also have deleted the
1328      * row and set rowGCI > scanGCI. So can't be set if we arrive
1329      * here.
1330      */
1331     if (unlikely(thbits & Tuple_header::LCP_DELETE))
1332     {
1333       g_eventLogger->info("(%u) tab(%u,%u) row(%u,%u)"
1334                           " LCP_DELETE set on rowid not yet used",
1335                           instance(),
1336                           m_curr_fragptr.p->fragTableId,
1337                           m_curr_fragptr.p->fragmentId,
1338                           key.m_page_no,
1339                           key.m_page_idx);
1340       ndbrequire(!(thbits & Tuple_header::LCP_DELETE));
1341     }
1342     if (foundGCI == 0 && thbits & Tuple_header::LCP_SKIP)
1343     {
1344       jam();
1345       /* Coverage tested */
1346       /**
1347        * BUG28372628:
1348        * ------------
1349        * LCP_SKIP flag is set when we perform a DELETE of a row
1350        * while an LCP is ongoing. During normal traffic operations
1351        * this means that the GCI is set to the GCI of the transaction.
1352        * The only other case where we can set LCP_SKIP is when a
1353        * DELETE operation arrives as part of COPY FRAG from live node
1354        * to starting node.
1355        *
1356        * In this case the GCI is set to the same GCI that the row in
1357        * the starting node have. If the GCI on the starting node is
1358        * not 0, then the GCI is always bigger than the GCI we are
1359        * storing locally, so we won't arrive in this path.
1360        *
1361        * There is however a case where the GCI is 0 in the live node.
1362        * This happens when the row has the state FREE_RECORD. This
1363        * means that the row is in a new page and the row hasn't been
1364        * used yet.
1365        * In this case we need to copy the row over to the starting node
1366        * to ensure that the row is deleted if it exists on the starting
1367        * node.
1368        *
1369        * If there is a row in this position AND a local LCP is ongoing,
1370        * in this case we could set the LCP_SKIP flag although the GCI
1371        * is set to 0.
1372        *
1373        * This case will only happen under the following condition.
1374        * 1) A row must have existed in this rowid before the starting node
1375        * stopped and is thus restored in the RESTORE, REBUILD, execute
1376        * REDO phase.
1377        * 2) The row must have been deleted together with all other rows
1378        *    in the same page such that the page of the row is dropped.
1379        * 3) At least one row in this page must have been inserted again,
1380        *    but the row in question must still be empty in the live node.
1381        * 4) A local LCP must be ongoing while COPY FRAGMENT of this
1382        *    fragment is ongoing, this can only happen if we start a
1383        *    full local LCP during COPY FRAGMENT. This in turn can only
1384        *    happen if the UNDO log for disk data parts is filled to the
1385        *    extent that we must ensure that an LCP is completed before
1386        *    the COPY FRAGMENT is completed.
1387        *
1388        * If all four conditions are met we could end up here with
1389        * LCP_SKIP bit set.
1390        */
1391       tuple_header_ptr->m_header_bits =
1392         thbits & (~Tuple_header::LCP_SKIP);
1393       DEB_LCP_SKIP(("(%u) 4 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
1394                     ", header: %x",
1395                     instance(),
1396                     m_curr_fragptr.p->fragTableId,
1397                     m_curr_fragptr.p->fragmentId,
1398                     key.m_page_no,
1399                     key.m_page_idx,
1400                     thbits));
1401       updateChecksum(tuple_header_ptr,
1402                      m_curr_tabptr.p,
1403                      thbits,
1404                      tuple_header_ptr->m_header_bits);
1405       fix_page->set_change_maps(key.m_page_idx);
1406       jamDebug();
1407       jamLineDebug((Uint16)key.m_page_idx);
1408       ndbrequire(c_lqh->is_full_local_lcp_running());
1409       ndbrequire(c_lqh->is_full_local_lcp_running());
1410     }
1411     else if (foundGCI == 0 && scan.m_scanGCI > 0)
1412     {
1413       /* Coverage tested */
1414       jam();
1415       scan.m_last_seen = __LINE__;
1416       return ZSCAN_FOUND_DELETED_ROWID;
1417     }
1418     else
1419     {
1420       jam();
1421       /* Coverage tested */
1422       ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1423       DEB_LCP_SKIP_EXTRA(("(%u)Skipped tab(%u,%u), row(%u,%u),"
1424                     " foundGCI: %u, scanGCI: %u, header: %x",
1425                     instance(),
1426                     m_curr_fragptr.p->fragTableId,
1427                     m_curr_fragptr.p->fragmentId,
1428                     key.m_page_no,
1429                     key.m_page_idx,
1430                     foundGCI,
1431                     scan.m_scanGCI,
1432                     thbits));
1433     }
1434   }
1435   scan.m_last_seen = __LINE__;
1436   return ZSCAN_FOUND_NEXT_ROW;
1437   /* Continue LCP scan, no need to handle this row in this LCP */
1438 }
1439 
1440  /**
1441  * LCP scanning of CHANGE ROW pages:
1442  * ---------------------------------
1443  * The below description is implemented by the setup_change_page_for_scan and
1444  * handle_scan_change_page_rows methods.
1445  *
1446  * When scanning changed pages we only need to record those rows that actually
1447  * changed. There are two things that we need to ensure here. The first is
1448  * that we need to ensure that we restore the correct data. The second is that
1449  * we ensure that each checkpoint maintains structural consistency.
1450  *
1451  * To prove that we will restore the correct data we notice that the last
1452  * change to restore is in a previous checkpoint.
1453  *
1454  * In the previous checkpoint we wrote all rows that changed in the first GCI
1455  * that wasn't completed before we started the GCI or in any later GCI.
1456  * From this follows that we will definitely have written all changes since
1457  * the last checkpoint and even more than that.
1458  *
1459  * Given that we restore using multiple LCPs there could be a risk that we cut
1460  * away the LCP part where the changed row was recorded. This is not possible
1461  * for the following reason:
1462  * Restore of a page always start at a LCP where the page was fully written.
1463  * If this happened after the change we know that the record is there.
1464  * If the change happened after the LCP where ALL changes were recorded we
1465  * know that the LCP part is part of the restore AND we know that our change is
1466  * in this LCP part.
1467  *
1468  * From this it follows that we will restore the correct data since no changes
1469  * will be missing from the restored data.
1470  *
1471  * Next we need to verify that maintain structural consistency.This means that
1472  * we must restore exactly the set of rows that was present at the start of
1473  * the LCP that we are restoring.
1474  *
1475  * To maintain this we need to ensure that any INSERTs that happened after
1476  * start of the previous LCP but before we scanned this row is not missed due
1477  * to that no changes occurred in this page since we last scanned it. To ensure
1478  * that we don't miss those rows we will notice that those rows will always
1479  * be marked with an LCP_DELETE flag for CHANGE pages. This means that when we
1480  * encounter a row with this flag we need to set the bit in the change map to
1481  * ensure that this row is recorded in the next LCP.
1482  *
1483  * Next we need to handle DELETEs that occur after the LCP started but before
1484  * we scanned the page. All these rows have the LCP_SKIP bit set. This means
1485  * that when we encounter the LCP_SKIP for CHANGE pages we should ensure that
1486  * the row is checked also in the next LCP by setting the change map to
1487  * indicate this.
1488  *
1489  * Finally if there are so many deletes that the state on the page is deleted
1490  * since the page is dropped, this we need not worry about since this is
1491  * handled in the same manner as the original partial LCP solution. So the
1492  * proof of this applies.
1493  *
1494  * Finally UPDATEs that occur after the LCP start but before we scan the row
1495  * will be recorded in the previous LCP and will not require setting any bits
1496  * in the change map. This is in line with normal behaviour of the LCPs, the
1497  * LCP is structurally consistent with the start of the LCP (the exact same
1498  * set of rows exists that existed at start of LCP). The data is however not
1499  * necessarily consistent since we rely o* the REDO log to bring data
1500  * consistency.
1501  *
1502  * The major benefit of these change map pages comes when an entire page can
1503  * be skipped. In this case we can change scanning hundreds of rows to a
1504  * simple check of a small bitmap on the page. To handle very large databases
1505  * well we implement the bitmaps using a sort of BLOOM filter.
1506  *
1507  * We have 8 bits that indicate changes in 4 kB of the page. If this bit isn't
1508  * set we can skip an entire 4 kB part of page that could easily contain up to
1509  * a bit more than * one hundred rows.
1510  *
1511  * Finally we have a bitmap consisting of 128 bits that each means we can skip
1512  * 256 bytes at a time when a bit isn't set.
1513  *
1514  * One problem with scanning using those bitmaps is that there is a cost
1515  * attached to skipping rows since it is harder to prefetch data. Thus we will
1516  * ignore the small area change bitmap when we have enough bits set and simply
1517  * scan all rows, we will still check the large area change bitmap though
1518  * also in this case.
1519  *
1520  * One special case we need to be careful with is when a new page has been
1521  * allocated. If this new page is reusing a previously used page slot and
1522  * thus reusing row ids we need to ensure that we scan the entire page.
1523  * This is required to generate DELETE BY ROWID for all row ids not yet
1524  * inserted into (there could be old inserts into these row ids in older
1525  * LCP data files, so important to remove those to get a consistent LCP.
1526  * We solve this by always ensuring that we scan the page the first time
1527  * by setting all bits in the change map and thus ensuring that the
1528  * m_all_rows is set to true while scanning the page. We could be more
1529  * elaborate and only set it on pages that reuse a page slot or we could
1530  * even use a bit in the tuple header for it. But this method should be
1531  * good enough for now.
1532  */
1533 Uint32
setup_change_page_for_scan(ScanOp & scan,Fix_page * fix_page,Local_key & key,Uint32 size)1534 Dbtup::setup_change_page_for_scan(ScanOp& scan,
1535                                   Fix_page *fix_page,
1536                                   Local_key& key,
1537                                   Uint32 size)
1538 {
1539   ScanPos& pos = scan.m_scanPos;
1540   /**
1541    * This is the first row of the page, we need to decide how
1542    * to scan this page or possibly even that we don't need to
1543    * scan it at all since no changes exist on the page. No need
1544    * to check this once we started scanning the page.
1545    */
1546   if (!fix_page->get_any_changes())
1547   {
1548     /**
1549      * We only check this condition for the first row in the page.
1550      * If we passed this point we will start clearing the bits on
1551      * the page piece by piece, thus this check is only ok at the
1552      * first row of the page.
1553      *
1554      * No one has touched the page since the start of the
1555      * previous LCP. It is possible that some updates occurred
1556      * after the start of the LCP but before the previous LCP
1557      * scanned this page. These updates will have been recorded
1558      * in the previous LCP and thus as proved above will be part
1559      * of the previous LCP that will be part of the recovery
1560      * processing.
1561      */
1562 #ifdef VM_TRACE
1563     Uint32 debug_idx = key.m_page_idx;
1564     do
1565     {
1566       Tuple_header* tuple_header_ptr;
1567       tuple_header_ptr = (Tuple_header*)&fix_page->m_data[debug_idx];
1568       Uint32 thbits = tuple_header_ptr->m_header_bits;
1569       if (thbits & Tuple_header::LCP_DELETE ||
1570           thbits & Tuple_header::LCP_SKIP)
1571       {
1572         g_eventLogger->info("(%u)LCP_DELETE on page with no"
1573                             " changes tab(%u,%u), page(%u,%u)"
1574                             ", thbits: %x",
1575                             instance(),
1576                             m_curr_fragptr.p->fragTableId,
1577                             m_curr_fragptr.p->fragmentId,
1578                             key.m_page_no,
1579                             key.m_page_idx,
1580                             thbits);
1581         ndbrequire(!(thbits & Tuple_header::LCP_DELETE));
1582         ndbrequire(!(thbits & Tuple_header::LCP_SKIP));
1583       }
1584       debug_idx += size;
1585     } while ((debug_idx + size) <= Fix_page::DATA_WORDS);
1586 #endif
1587     DEB_LCP_FILTER(("(%u) tab(%u,%u) page(%u) filtered out",
1588                     instance(),
1589                     m_curr_fragptr.p->fragTableId,
1590                     m_curr_fragptr.p->fragmentId,
1591                     fix_page->frag_page_id));
1592     scan.m_last_seen = __LINE__;
1593     pos.m_get = ScanPos::Get_next_page_mm;
1594     c_backup->skip_no_change_page();
1595     return ZSCAN_FOUND_PAGE_END;
1596   }
1597   Uint32 num_changes = fix_page->get_num_changes();
1598   num_changes = 16;
1599   if (num_changes <= 15)
1600   {
1601     jam();
1602     /**
1603      * We will check every individual small area and also
1604      * check the large areas. There are only a few areas
1605      * that actually contain changes.
1606      * In this case we will not use any prefetches since
1607      * it is hard to predict which cache lines we will
1608      * actually read.
1609      *
1610      * When NDB is used with very large data sizes this
1611      * will be the most common code path since this only
1612      * looks at one individual page. If there is
1613      * 1 TB of data memory this means that we have
1614      * 32M of 32kB pages and thus the update frequency
1615      * must be at least 500M updates per LCP for the
1616      * number of changes to exceed 15 on most pages.
1617      * This is clearly not going to be the common case.
1618      *
1619      * For smaller databases with say 1 GB of data memory
1620      * there will be only 32k pages and thus around
1621      * 500k updates per LCP will be sufficient to exceed
1622      * 15 updates per page in the common case. Thus much
1623      * more likely.
1624      *
1625      * We keep the bits here until we have passed them with
1626      * the scan. Exactly the same proof that this works on
1627      * a page level now applies on the row level.
1628      *
1629      * Thus when we check the large area bit and find that no
1630      * changes have occurred we also know that no small area
1631      * bits are set, so no need to reset those. We know that
1632      * no one has touched those pages since the start of the
1633      * last LCP apart possibly from updates that doesn't change
1634      * structural consistency of the LCP.
1635      *
1636      * We initialise both the small area check index and the
1637      * large area check index to 0 to ensure that we check
1638      * already at the first row both of those areas.
1639      */
1640     pos.m_all_rows = false;
1641     pos.m_next_small_area_check_idx = 0;
1642     pos.m_next_large_area_check_idx = 0;
1643     ndbrequire(!fix_page->get_and_clear_change_while_lcp_scan());
1644     fix_page->set_page_being_lcp_scanned();
1645   }
1646   else
1647   {
1648     jam();
1649     /**
1650      * There are more than 15 parts that have changed.
1651      * In this case we expect to gain more from checking
1652      * all rows since this means that we can prefetch
1653      * memory to the CPU caches when we scan in linear
1654      * order.
1655      *
1656      * In this case we can clear the small area change map and
1657      * the large area change map already here since we won't
1658      * clear any bits during the page scan.
1659      *
1660      * With 15 changes or more the likelihhod is very high that all
1661      * 8 large areas are also set. So we will ignore checking these
1662      * to avoid extra costs attached to checking this on
1663      * each row.
1664      *
1665      * We set area check indexes to an impossible value to ensure
1666      * that we don't use those by mistake.
1667      */
1668     pos.m_all_rows = true;
1669     fix_page->clear_small_change_map();
1670     fix_page->clear_large_change_map();
1671     pos.m_next_small_area_check_idx = RNIL;
1672     pos.m_next_large_area_check_idx = RNIL;
1673     ndbassert(fix_page->verify_change_maps(jamBuffer()));
1674   }
1675   return ZSCAN_FOUND_TUPLE;
1676 }
1677 
1678 Uint32
move_to_next_change_page_row(ScanOp & scan,Fix_page * fix_page,Tuple_header ** tuple_header_ptr,Uint32 & loop_count,Uint32 size)1679 Dbtup::move_to_next_change_page_row(ScanOp & scan,
1680                                     Fix_page *fix_page,
1681                                     Tuple_header **tuple_header_ptr,
1682                                     Uint32 & loop_count,
1683                                     Uint32 size)
1684 {
1685   ScanPos& pos = scan.m_scanPos;
1686   Local_key& key = pos.m_key;
1687   jam();
1688   ndbrequire(pos.m_next_large_area_check_idx != RNIL &&
1689              pos.m_next_small_area_check_idx != RNIL);
1690   do
1691   {
1692     loop_count++;
1693     if (pos.m_next_large_area_check_idx == key.m_page_idx)
1694     {
1695       jamDebug();
1696       jamLineDebug(Uint16(key.m_page_idx));
1697       pos.m_next_large_area_check_idx =
1698         fix_page->get_next_large_idx(key.m_page_idx, size);
1699       if (!fix_page->get_large_change_map(key.m_page_idx))
1700       {
1701         jamDebug();
1702         DEB_LCP_FILTER(("(%u) tab(%u,%u) page(%u) large area filtered"
1703                         ", start_idx: %u",
1704                         instance(),
1705                         m_curr_fragptr.p->fragTableId,
1706                         m_curr_fragptr.p->fragmentId,
1707                         fix_page->frag_page_id,
1708                         key.m_page_idx));
1709 
1710         if (unlikely((pos.m_next_large_area_check_idx + size) >
1711                       Fix_page::DATA_WORDS))
1712         {
1713           jamDebug();
1714           return ZSCAN_FOUND_PAGE_END;
1715         }
1716         jamDebug();
1717         /**
1718          * We have moved forward to a new large area. We assume that all
1719          * small areas we move past don't have their bits set.
1720          * It is important to start checking immediately the small area
1721          * since we have no idea if the first small area is to be checked
1722          * or not.
1723          */
1724         Uint32 next_to_check = pos.m_next_large_area_check_idx;
1725         key.m_page_idx = next_to_check;
1726         pos.m_next_small_area_check_idx = next_to_check;
1727         continue;
1728       }
1729     }
1730     if (pos.m_next_small_area_check_idx == key.m_page_idx)
1731     {
1732       jamDebug();
1733       jamLineDebug(Uint16(key.m_page_idx));
1734       pos.m_next_small_area_check_idx =
1735         fix_page->get_next_small_idx(key.m_page_idx, size);
1736       if (!fix_page->get_and_clear_change_maps(key.m_page_idx))
1737       {
1738         jamDebug();
1739         DEB_LCP_FILTER(("(%u) tab(%u,%u) page(%u) small area filtered"
1740                         ", start_idx: %u",
1741                         instance(),
1742                         m_curr_fragptr.p->fragTableId,
1743                         m_curr_fragptr.p->fragmentId,
1744                         fix_page->frag_page_id,
1745                         key.m_page_idx));
1746         if (unlikely((pos.m_next_small_area_check_idx + size) >
1747                       Fix_page::DATA_WORDS))
1748         {
1749           jamDebug();
1750           ndbassert(fix_page->verify_change_maps(jamBuffer()));
1751           return ZSCAN_FOUND_PAGE_END;
1752         }
1753         jamDebug();
1754         ndbassert(fix_page->verify_change_maps(jamBuffer()));
1755         /**
1756          * Since 1024 is a multiple of 64 there is no risk that we move
1757          * ourselves past the next large area check.
1758          */
1759         key.m_page_idx = pos.m_next_small_area_check_idx;
1760         ndbrequire(key.m_page_idx <= pos.m_next_large_area_check_idx);
1761         continue;
1762       }
1763     }
1764     break;
1765   } while (1);
1766   (*tuple_header_ptr) = (Tuple_header*)&fix_page->m_data[key.m_page_idx];
1767   jamDebug();
1768   jamLineDebug(Uint16(key.m_page_idx));
1769   ndbassert(fix_page->verify_change_maps(jamBuffer()));
1770   return ZSCAN_FOUND_TUPLE;
1771 }
1772 
1773 /**
1774  * Handling heavy insert and delete activity during LCP scans
1775  * ----------------------------------------------------------
1776  * As part of the LCP we need to record all rows that existed at the beginning
1777  * of the LCP. This means that any rows that are inserted after the LCP
1778  * started can be skipped. This is a common activity during database load
1779  * activity, so we ensure that the LCP can run quick in this case to provide
1780  * much CPU resources for the insert activity. Also important to make good
1781  * progress on LCPs to ensure that we can free REDO log space to avoid running
1782  * out of this resource.
1783  *
1784  * We use three ways to signal that a row or a set of rows is not needed to
1785  * record during an LCP.
1786  *
1787  * 1) We record the maximum page number at the start of the LCP, we never
1788  *    need to scan beyond this point, there can only be pages here that
1789  *    won't need recording in an LCP. We also avoid setting LCP_SKIP bits
1790  *    on these pages and rows.
1791  *    This will cover the common case of a small set of pages at the
1792  *    start of the LCP that grows quickly during the LCP scan.
1793  *
1794  * 2) If a page was allocated after the LCP started, then it can only contain
1795  *    rows that won't need recording in the LCP. If the page number was
1796  *    within the maximum page number at start of LCP, and beyond the page
1797  *    currently checked in LCP, then we will record the LCP skip information
1798  *    in the page header. So when the LCP scan reaches this page it will
1799  *    quickly move on to the next page since the page didn't have any records
1800  *    eligible for LCP recording. After skipping the page we clear the LCP
1801  *    skip flag since the rows should be recorded in the next LCP.
1802  *
1803  * 3) In case a row is allocated in a page that existed at start of LCP, then
1804  *    we record the LCP skip information in the tuple header unless the row
1805  *    has already been checked by the current LCP. We skip all rows with this
1806  *    bit set and reset it to ensure that we record it in the next LCP.
1807  */
1808 
1809 bool
scanNext(Signal * signal,ScanOpPtr scanPtr)1810 Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
1811 {
1812   ScanOp& scan = *scanPtr.p;
1813   ScanPos& pos = scan.m_scanPos;
1814   Local_key& key = pos.m_key;
1815   const Uint32 bits = scan.m_bits;
1816   // table
1817   TablerecPtr tablePtr;
1818   tablePtr.i = scan.m_tableId;
1819   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
1820   Tablerec& table = *tablePtr.p;
1821   m_curr_tabptr = tablePtr;
1822   // fragment
1823   FragrecordPtr fragPtr;
1824   fragPtr.i = scan.m_fragPtrI;
1825   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1826   Fragrecord& frag = *fragPtr.p;
1827   m_curr_fragptr = fragPtr;
1828   // tuple found
1829   Tuple_header* tuple_header_ptr = 0;
1830   Uint32 thbits = 0;
1831   Uint32 loop_count = 0;
1832   Uint32 foundGCI;
1833 
1834   const bool mm_index = (bits & ScanOp::SCAN_DD);
1835   const bool lcp = (bits & ScanOp::SCAN_LCP);
1836 
1837   const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0) ?
1838     table.m_offsets[mm_index].m_fix_header_size : 1;
1839   const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
1840 
1841   if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
1842   {
1843     jam();
1844     /**
1845      * Handle lcp keep list here too, due to scanCont
1846      */
1847     /* Coverage tested */
1848     handle_lcp_keep(signal, fragPtr, scanPtr.p);
1849     scan.m_last_seen = __LINE__;
1850     return false;
1851   }
1852 
1853   switch(pos.m_get){
1854   case ScanPos::Get_next_tuple:
1855     jam();
1856     key.m_page_idx += size;
1857     pos.m_get = ScanPos::Get_page;
1858     pos.m_realpid_mm = RNIL;
1859     break;
1860   case ScanPos::Get_tuple:
1861     jam();
1862     /**
1863      * We need to refetch page after timeslice
1864      */
1865     pos.m_get = ScanPos::Get_page;
1866     pos.m_realpid_mm = RNIL;
1867     break;
1868   default:
1869     break;
1870   }
1871 
1872   while (true) {
1873     switch (pos.m_get) {
1874     case ScanPos::Get_next_page:
1875       // move to next page
1876       jam();
1877       {
1878         if (! (bits & ScanOp::SCAN_DD))
1879           pos.m_get = ScanPos::Get_next_page_mm;
1880         else
1881           pos.m_get = ScanPos::Get_next_page_dd;
1882       }
1883       continue;
1884     case ScanPos::Get_page:
1885       // get real page
1886       jam();
1887       {
1888         if (! (bits & ScanOp::SCAN_DD))
1889           pos.m_get = ScanPos::Get_page_mm;
1890         else
1891           pos.m_get = ScanPos::Get_page_dd;
1892       }
1893       continue;
1894     case ScanPos::Get_next_page_mm:
1895       // move to next logical TUP page
1896       jam();
1897       {
1898         /**
1899          * Code for future activation, see  below for more details.
1900          * bool break_flag;
1901          * break_flag = false;
1902          */
1903         key.m_page_no++;
1904         if (likely(bits & ScanOp::SCAN_LCP))
1905         {
1906           jam();
1907           /* Coverage tested path */
1908           /**
1909            * We could be scanning for a long time and only finding LCP_SKIP
1910            * records, we need to keep the LCP watchdog aware that we are
1911            * progressing, so we report each change to a new page by reporting
1912            * the id of the next page to scan.
1913            */
1914           c_backup->update_lcp_pages_scanned(signal,
1915                       c_lqh->get_scan_api_op_ptr(scan.m_userPtr),
1916                       key.m_page_no,
1917                       scan.m_scanGCI,
1918                       pos.m_lcp_scan_changed_rows_page);
1919           scan.m_last_seen = __LINE__;
1920         }
1921         if (unlikely(key.m_page_no >= frag.m_max_page_cnt))
1922         {
1923           if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
1924           {
1925             if (key.m_page_no < scan.m_endPage)
1926             {
1927               jam();
1928               DEB_NR_SCAN(("scanning page %u", key.m_page_no));
1929               goto cont;
1930             }
1931             jam();
1932             // no more pages, scan ends
1933             pos.m_get = ScanPos::Get_undef;
1934             scan.m_state = ScanOp::Last;
1935             return true;
1936           }
1937           else if (bits & ScanOp::SCAN_LCP &&
1938                    key.m_page_no < scan.m_endPage)
1939           {
1940             /**
1941              * We come here with ScanOp::SCAN_LCP set AND
1942              * frag.m_max_page_cnt < scan.m_endPage. In this case
1943              * it is still ok to finish the LCP scan. The missing
1944              * pages are handled when they are dropped, so before
1945              * we drop a page we record all entries that needs
1946              * recording for the LCP. These have been sent to the
1947              * LCP keep list. Since when we come here the LCP keep
1948              * list is empty we are done with the scan.
1949              *
1950              * We will however continue the scan for LCP scans. The
1951              * reason is that we might have set the LCP_SCANNED_BIT
1952              * on pages already dropped. So we need to continue scanning
1953              * to ensure that all the lcp scanned bits are reset.
1954              *
1955              * For the moment this code is unreachable since m_max_page_cnt
1956              * cannot decrease. Thus m_max_page_cnt cannot be smaller
1957              * than scan.m_endPage since scan.m_endPage is initialised to
1958              * m_max_page_cnt at start of scan.
1959              *
1960              * This is currently not implemented. So we
1961              * will make this code path using an ndbrequire instead.
1962              *
1963              * We keep the code as comments to be activated when we implement
1964              * the possibility to release pages in the directory.
1965              */
1966             ndbabort();
1967             /* We will not scan this page, so reset flag immediately */
1968             // reset_lcp_scanned_bit(fragPtr.p, key.m_page_no);
1969             // scan.m_last_seen = __LINE__;
1970             // break_flag = true;
1971           }
1972           else
1973           {
1974             // no more pages, scan ends
1975             pos.m_get = ScanPos::Get_undef;
1976             scan.m_last_seen = __LINE__;
1977             scan.m_state = ScanOp::Last;
1978             return true;
1979           }
1980         }
1981         if (unlikely((bits & ScanOp::SCAN_LCP) &&
1982                      (key.m_page_no >= scan.m_endPage)))
1983         {
1984           jam();
1985           /**
1986            * We have arrived at a page number that didn't exist at start of
1987            * LCP, we can quit the LCP scan since we cannot find any more
1988            * pages that are containing rows to be saved in LCP.
1989            */
1990           // no more pages, scan ends
1991           pos.m_get = ScanPos::Get_undef;
1992           scan.m_last_seen = __LINE__;
1993           scan.m_state = ScanOp::Last;
1994           return true;
1995         }
1996         /**
1997          * Activate this code if we implement support for decreasing
1998          * frag.m_max_page_cnt
1999          *
2000          * if (break_flag)
2001          * {
2002          * jam();
2003          * pos.m_get = ScanPos::Get_next_page_mm;
2004          * scan.m_last_seen = __LINE__;
2005          * break; // incr loop count
2006          * }
2007          */
2008     cont:
2009         key.m_page_idx = first;
2010         pos.m_get = ScanPos::Get_page_mm;
2011         // clear cached value
2012         pos.m_realpid_mm = RNIL;
2013       }
2014       /*FALLTHRU*/
2015     case ScanPos::Get_page_mm:
2016       // get TUP real page
2017       {
2018         PagePtr pagePtr;
2019         loop_count+= 4;
2020         if (pos.m_realpid_mm == RNIL)
2021         {
2022           Uint32 *next_ptr, *prev_ptr;
2023           pos.m_realpid_mm = getRealpidScan(fragPtr.p,
2024                                             key.m_page_no,
2025                                             &next_ptr,
2026                                             &prev_ptr);
2027           if (bits & ScanOp::SCAN_LCP)
2028           {
2029             jam();
2030             Uint32 ret_val = prepare_lcp_scan_page(scan,
2031                                                    key,
2032                                                    next_ptr,
2033                                                    prev_ptr);
2034             if (ret_val == ZSCAN_FOUND_PAGE_END)
2035               break;
2036             else if (ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE)
2037              goto record_dropped_change_page;
2038             /* else continue */
2039           }
2040           else if (unlikely(pos.m_realpid_mm == RNIL))
2041           {
2042             jam();
2043             if (bits & ScanOp::SCAN_NR)
2044             {
2045               jam();
2046               goto nopage;
2047             }
2048             pos.m_get = ScanPos::Get_next_page_mm;
2049             break; // incr loop count
2050           }
2051           else
2052           {
2053             jam();
2054           }
2055         }
2056         else
2057         {
2058           jam();
2059         }
2060 	c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
2061         /**
2062          * We are in the process of performing a Full table scan, this can be
2063          * either due to a user requesting a full table scan, it can also be
2064          * as part of Node Recovery where we are assisting the starting node
2065          * to be synchronized (SCAN_NR set) and it is also used for LCP scans
2066          * (SCAN_LCP set).
2067          *
2068          * We know that we will touch all cache lines where there is a tuple
2069          * header and all scans using main memory pages are done on the fixed
2070          * pages. To speed up scan processing we will prefetch such that we
2071          * always are a few tuples ahead. We scan ahead 4 tuples here and then
2072          * we scan yet one more ahead at each new tuple we get to. We only need
2073          * initialise by scanning 3 rows ahead since we will immediately fetch
2074          * the fourth one before looking at the first row.
2075          *
2076          * PREFETCH_SCAN_TUPLE:
2077          */
2078         if (likely((key.m_page_idx + (size * 3)) <= Fix_page::DATA_WORDS))
2079         {
2080           struct Tup_fixsize_page *page_ptr =
2081             (struct Tup_fixsize_page*)pagePtr.p;
2082           NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx,
2083                                               size));
2084           NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + size,
2085                                               size));
2086           NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + (size * 2),
2087                                               size));
2088         }
2089         if (bits & ScanOp::SCAN_LCP)
2090         {
2091           if (pagePtr.p->is_page_to_skip_lcp())
2092           {
2093             Uint32 ret_val = handle_lcp_skip_page(scan,
2094                                                   key,
2095                                                   pagePtr.p);
2096             if (ret_val == ZSCAN_FOUND_PAGE_END)
2097             {
2098               jamDebug();
2099               break;
2100             }
2101             else
2102             {
2103               jamDebug();
2104               ndbrequire(ret_val == ZSCAN_FOUND_DROPPED_CHANGE_PAGE);
2105               goto record_dropped_change_page;
2106             }
2107           }
2108           else if (pos.m_lcp_scan_changed_rows_page)
2109           {
2110             /* CHANGE page is accessed */
2111             if (key.m_page_idx == 0)
2112             {
2113               jamDebug();
2114               /* First access of a CHANGE page */
2115               Uint32 ret_val = setup_change_page_for_scan(scan,
2116                                                           (Fix_page*)pagePtr.p,
2117                                                           key,
2118                                                           size);
2119               if (ret_val == ZSCAN_FOUND_PAGE_END)
2120               {
2121                 jamDebug();
2122                 /* No changes found on page level bitmaps */
2123                 break;
2124               }
2125               else
2126               {
2127                 ndbrequire(ret_val == ZSCAN_FOUND_TUPLE);
2128               }
2129             }
2130           }
2131           else
2132           {
2133             /* LCP ALL page is accessed */
2134             jamDebug();
2135             /**
2136              * Make sure those values have defined values if we were to enter
2137              * the wrong path for some reason. These values will lead to a
2138              * crash if we try to run the CHANGE page code for an ALL page.
2139              */
2140             pos.m_all_rows = false;
2141             pos.m_next_small_area_check_idx = RNIL;
2142             pos.m_next_large_area_check_idx = RNIL;
2143           }
2144         }
2145         /* LCP normal case 4a) above goes here */
2146 
2147     nopage:
2148         pos.m_page = pagePtr.p;
2149         pos.m_get = ScanPos::Get_tuple;
2150       }
2151       continue;
2152     case ScanPos::Get_next_page_dd:
2153       // move to next disk page
2154       jam();
2155       {
2156         Disk_alloc_info& alloc = frag.m_disk_alloc_info;
2157         Local_fragment_extent_list list(c_extent_pool, alloc.m_extent_list);
2158         Ptr<Extent_info> ext_ptr;
2159         c_extent_pool.getPtr(ext_ptr, pos.m_extent_info_ptr_i);
2160         Extent_info* ext = ext_ptr.p;
2161         key.m_page_no++;
2162         if (key.m_page_no >= ext->m_first_page_no + alloc.m_extent_size) {
2163           // no more pages in this extent
2164           jam();
2165           if (! list.next(ext_ptr)) {
2166             // no more extents, scan ends
2167             jam();
2168             pos.m_get = ScanPos::Get_undef;
2169             scan.m_state = ScanOp::Last;
2170             return true;
2171           } else {
2172             // move to next extent
2173             jam();
2174             pos.m_extent_info_ptr_i = ext_ptr.i;
2175             ext = c_extent_pool.getPtr(pos.m_extent_info_ptr_i);
2176             key.m_file_no = ext->m_key.m_file_no;
2177             key.m_page_no = ext->m_first_page_no;
2178           }
2179         }
2180         key.m_page_idx = first;
2181         pos.m_get = ScanPos::Get_page_dd;
2182         /*
2183           read ahead for scan in disk order
2184           do read ahead every 8:th page
2185         */
2186         if ((bits & ScanOp::SCAN_DD) &&
2187             (((key.m_page_no - ext->m_first_page_no) & 7) == 0))
2188         {
2189           jam();
2190           // initialize PGMAN request
2191           Page_cache_client::Request preq;
2192           preq.m_page = pos.m_key;
2193           preq.m_callback = TheNULLCallback;
2194 
2195           // set maximum read ahead
2196           Uint32 read_ahead = m_max_page_read_ahead;
2197 
2198           while (true)
2199           {
2200             // prepare page read ahead in current extent
2201             Uint32 page_no = preq.m_page.m_page_no;
2202             Uint32 page_no_limit = page_no + read_ahead;
2203             Uint32 limit = ext->m_first_page_no + alloc.m_extent_size;
2204             if (page_no_limit > limit)
2205             {
2206               jam();
2207               // read ahead crosses extent, set limit for this extent
2208               read_ahead = page_no_limit - limit;
2209               page_no_limit = limit;
2210               // and make sure we only read one extra extent next time around
2211               if (read_ahead > alloc.m_extent_size)
2212                 read_ahead = alloc.m_extent_size;
2213             }
2214             else
2215             {
2216               jam();
2217               read_ahead = 0; // no more to read ahead after this
2218             }
2219             // do read ahead pages for this extent
2220             while (page_no < page_no_limit)
2221             {
2222               // page request to PGMAN
2223               jam();
2224               preq.m_page.m_page_no = page_no;
2225               preq.m_table_id = frag.fragTableId;
2226               preq.m_fragment_id = frag.fragmentId;
2227               int flags = Page_cache_client::DISK_SCAN;
2228               // ignore result
2229               Page_cache_client pgman(this, c_pgman);
2230               pgman.get_page(signal, preq, flags);
2231               jamEntry();
2232               page_no++;
2233             }
2234             if (!read_ahead || !list.next(ext_ptr))
2235             {
2236               // no more extents after this or read ahead done
2237               jam();
2238               break;
2239             }
2240             // move to next extent and initialize PGMAN request accordingly
2241             Extent_info* ext = c_extent_pool.getPtr(ext_ptr.i);
2242             preq.m_page.m_file_no = ext->m_key.m_file_no;
2243             preq.m_page.m_page_no = ext->m_first_page_no;
2244           }
2245         } // if ScanOp::SCAN_DD read ahead
2246       }
2247       /*FALLTHRU*/
2248     case ScanPos::Get_page_dd:
2249       // get global page in PGMAN cache
2250       jam();
2251       {
2252         // check if page is un-allocated or empty
2253 	if (likely(! (bits & ScanOp::SCAN_NR)))
2254 	{
2255           D("Tablespace_client - scanNext");
2256 	  Tablespace_client tsman(signal, this, c_tsman,
2257                          frag.fragTableId,
2258                          frag.fragmentId,
2259                          c_lqh->getCreateSchemaVersion(frag.fragTableId),
2260                          frag.m_tablespace_id);
2261 	  unsigned uncommitted, committed;
2262 	  uncommitted = committed = ~(unsigned)0;
2263 	  int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
2264 	  ndbrequire(ret == 0);
2265 	  if (committed == 0 && uncommitted == 0) {
2266 	    // skip empty page
2267 	    jam();
2268 	    pos.m_get = ScanPos::Get_next_page_dd;
2269 	    break; // incr loop count
2270 	  }
2271 	}
2272         // page request to PGMAN
2273         Page_cache_client::Request preq;
2274         preq.m_page = pos.m_key;
2275         preq.m_table_id = frag.fragTableId;
2276         preq.m_fragment_id = frag.fragmentId;
2277         preq.m_callback.m_callbackData = scanPtr.i;
2278         preq.m_callback.m_callbackFunction =
2279           safe_cast(&Dbtup::disk_page_tup_scan_callback);
2280         int flags = Page_cache_client::DISK_SCAN;
2281         Page_cache_client pgman(this, c_pgman);
2282         Ptr<GlobalPage> pagePtr;
2283         int res = pgman.get_page(signal, preq, flags);
2284         pagePtr = pgman.m_ptr;
2285         jamEntry();
2286         if (res == 0) {
2287           jam();
2288           // request queued
2289           pos.m_get = ScanPos::Get_tuple;
2290           return false;
2291         }
2292         else if (res < 0)
2293         {
2294           jam();
2295           if (res == -1)
2296           {
2297             jam();
2298             m_scan_error_code = Uint32(~0);
2299           }
2300           else
2301           {
2302             jam();
2303             res = -res;
2304             m_scan_error_code = res;
2305           }
2306           /* Flag to reply code that we have an error */
2307           scan.m_state = ScanOp::Invalid;
2308           return true;
2309         }
2310         ndbrequire(res > 0);
2311         pos.m_page = (Page*)pagePtr.p;
2312       }
2313       pos.m_get = ScanPos::Get_tuple;
2314       continue;
2315       // get tuple
2316       // move to next tuple
2317     case ScanPos::Get_next_tuple:
2318       // move to next fixed size tuple
2319       jam();
2320       {
2321         key.m_page_idx += size;
2322         pos.m_get = ScanPos::Get_tuple;
2323       }
2324       /*FALLTHRU*/
2325     case ScanPos::Get_tuple:
2326       // get fixed size tuple
2327       jam();
2328       if ((bits & ScanOp::SCAN_VS) == 0)
2329       {
2330         Fix_page* page = (Fix_page*)pos.m_page;
2331         if (key.m_page_idx + size <= Fix_page::DATA_WORDS)
2332 	{
2333 	  pos.m_get = ScanPos::Get_next_tuple;
2334 	  if (unlikely((bits & ScanOp::SCAN_NR) &&
2335               pos.m_realpid_mm == RNIL))
2336           {
2337             /**
2338              * pos.m_page isn't initialized this path, so handle early
2339              * We're doing a node restart and we are scanning beyond our
2340              * existing rowid's since starting node had those rowid's
2341              * defined.
2342              */
2343             jam();
2344             foundGCI = 0;
2345             goto found_deleted_rowid;
2346           }
2347 #ifdef VM_TRACE
2348           if (! (bits & ScanOp::SCAN_DD))
2349           {
2350             Uint32 realpid = getRealpidCheck(fragPtr.p, key.m_page_no);
2351             ndbrequire(pos.m_realpid_mm == realpid);
2352           }
2353 #endif
2354           tuple_header_ptr = (Tuple_header*)&page->m_data[key.m_page_idx];
2355 
2356           if ((key.m_page_idx + (size * 4)) <= Fix_page::DATA_WORDS)
2357           {
2358             /**
2359              * Continue staying ahead of scan on this page by prefetching
2360              * a row 4 tuples ahead of this tuple, prefetched the first 3
2361              * at PREFETCH_SCAN_TUPLE.
2362              */
2363             struct Tup_fixsize_page *page_ptr =
2364               (struct Tup_fixsize_page*)page;
2365             NDB_PREFETCH_READ(page_ptr->get_ptr(key.m_page_idx + (size * 3),
2366                                                 size));
2367           }
2368 	  if (likely((! ((bits & ScanOp::SCAN_NR) ||
2369                          (bits & ScanOp::SCAN_LCP))) ||
2370                      ((bits & ScanOp::SCAN_LCP) &&
2371                       !pos.m_lcp_scan_changed_rows_page)))
2372           {
2373             jam();
2374             /**
2375              * We come here for normal full table scans and also for LCP
2376              * scans where we scan ALL ROWS pages.
2377              *
2378              * We simply check if the row is free, if it isn't then we will
2379              * handle it. For LCP scans we will also check at found_tuple that
2380              * the LCP_SKIP bit isn't set. If it is then the rowid was empty
2381              * at start of LCP. If the rowid is free AND we are scanning an
2382              * ALL ROWS page then the LCP_SKIP cannot be set, this is set only
2383              * for CHANGED ROWS pages when deleting tuples.
2384              *
2385              * Free rowid's might have existed at start of LCP. This was
2386              * handled by using the LCP keep list when tuple was deleted.
2387              * So when we come here we don't have to worry about LCP scanning
2388              * those rows.
2389              *
2390              * LCP_DELETE flag can never be set on ALL ROWS pages.
2391              *
2392              * The state Tuple_header::ALLOC means that the row is being
2393              * inserted, it thus have no current committed state and is
2394              * thus here equivalent to the FREE state for LCP scans.
2395              */
2396             thbits = tuple_header_ptr->m_header_bits;
2397             if ((bits & ScanOp::SCAN_LCP) &&
2398                 (thbits & Tuple_header::LCP_DELETE))
2399             {
2400               g_eventLogger->info("(%u)LCP_DELETE on tab(%u,%u), row(%u,%u)"
2401                                   " ALL ROWS page, header: %x",
2402                                   instance(),
2403                                   fragPtr.p->fragTableId,
2404                                   fragPtr.p->fragmentId,
2405                                   key.m_page_no,
2406                                   key.m_page_idx,
2407                                   thbits);
2408               ndbabort();
2409             }
2410 	    if (! ((thbits & Tuple_header::FREE ||
2411                     thbits & Tuple_header::DELETE_WAIT) ||
2412                    ((bits & ScanOp::SCAN_LCP) &&
2413                     (thbits & Tuple_header::ALLOC))))
2414 	    {
2415               jam();
2416               scan.m_last_seen = __LINE__;
2417               goto found_tuple;
2418 	    }
2419             /**
2420              * Ensure that LCP_SKIP bit is clear before we move on
2421              * It could be set if the row was inserted after LCP
2422              * start and then followed by a delete of the row before
2423              * we arrive here.
2424              */
2425             if ((bits & ScanOp::SCAN_LCP) &&
2426                 (thbits & Tuple_header::LCP_SKIP))
2427             {
2428               jam();
2429               tuple_header_ptr->m_header_bits =
2430                 thbits & (~Tuple_header::LCP_SKIP);
2431               DEB_LCP_SKIP(("(%u)Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
2432                             ", header: %x"
2433                             ", new header: %x"
2434                             ", tuple_header_ptr: %p",
2435                             instance(),
2436                             fragPtr.p->fragTableId,
2437                             fragPtr.p->fragmentId,
2438                             key.m_page_no,
2439                             key.m_page_idx,
2440                             thbits,
2441                             tuple_header_ptr->m_header_bits,
2442                             tuple_header_ptr));
2443               updateChecksum(tuple_header_ptr,
2444                              tablePtr.p,
2445                              thbits,
2446                              tuple_header_ptr->m_header_bits);
2447             }
2448             scan.m_last_seen = __LINE__;
2449 	  }
2450 	  else if (bits & ScanOp::SCAN_NR)
2451 	  {
2452             thbits = tuple_header_ptr->m_header_bits;
2453 	    if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
2454                  scan.m_scanGCI ||
2455                 foundGCI == 0)
2456 	    {
2457               /**
2458                * foundGCI == 0 means that the row is initialised but has not
2459                * yet been committed as part of insert transaction. All other
2460                * rows have the GCI entry set to last GCI it was changed, this
2461                * is true for even deleted rows as long as the page is still
2462                * maintained by the fragment.
2463                *
2464                * When foundGCI == 0 there are two cases.
2465                * The first case is that thbits == Fix_page::FREE_RECORD.
2466                * In this case the tuple doesn't exist and should be
2467                * deleted if existing in the starting node.
2468                * As part of Fix_page::FREE_RECORD the Tuple_header::FREE
2469                * bit is set. So this is handled below.
2470                * The second case is that thbits == Tuple_header::ALLOC.
2471                * In this case the tuple is currently being inserted, but the
2472                * transaction isn't yet committed. In this case we will follow
2473                * the found_tuple path. This means that we will attempt to
2474                * lock the tuple, this will be unsuccessful since the row
2475                * is currently being inserted and is locked for write.
2476                * When the commit happens the row lock is released and the
2477                * copy scan will continue on this row. It will send an INSERT
2478                * to the starting node. Most likely the INSERT transaction
2479                * was started after the copy scan started, in this case the
2480                * INSERT will simply be converted to an UPDATE by the starting
2481                * node. If the insert was started before the new replica of
2482                * the fragment was included, the INSERT will be performed.
2483                * This is the reason why we have to go the extra mile here to
2484                * ensure that we don't lose records that are being inserted as
2485                * part of long transactions.
2486                *
2487                * The final problem is when the INSERT is aborted. In this case
2488                * we return from the lock row in execACCKEYREF. Since the row
2489                * is now in the Tuple_header::FREE state we must re-read the
2490                * row again. This is handled by changing the pos.m_get state
2491                * to Get_tuple instead of Get_next_tuple.
2492                */
2493               if (! (thbits & Tuple_header::FREE ||
2494                      thbits & Tuple_header::DELETE_WAIT))
2495 	      {
2496 		jam();
2497 		goto found_tuple;
2498 	      }
2499 	      else
2500 	      {
2501 		goto found_deleted_rowid;
2502 	      }
2503 	    }
2504 	    else if ((thbits & Fix_page::FREE_RECORD) != Fix_page::FREE_RECORD &&
2505 		      tuple_header_ptr->m_operation_ptr_i != RNIL)
2506 	    {
2507 	      jam();
2508 	      goto found_tuple; // Locked tuple...
2509 	      // skip free tuple
2510 	    }
2511             DEB_NR_SCAN_EXTRA(("(%u)NR_SCAN_SKIP:tab(%u,%u) row(%u,%u),"
2512                                " recGCI: %u, scanGCI: %u, header: %x",
2513                                instance(),
2514                                fragPtr.p->fragTableId,
2515                                fragPtr.p->fragmentId,
2516                                key.m_page_no,
2517                                key.m_page_idx,
2518                                foundGCI,
2519                                scan.m_scanGCI,
2520                                thbits));
2521 	  }
2522           else
2523           {
2524             ndbrequire(c_backup->is_partial_lcp_enabled());
2525             ndbrequire((bits & ScanOp::SCAN_LCP) &&
2526                        pos.m_lcp_scan_changed_rows_page);
2527             Uint32 ret_val;
2528             if (!pos.m_all_rows)
2529             {
2530               ret_val = move_to_next_change_page_row(scan,
2531                                                      page,
2532                                                      &tuple_header_ptr,
2533                                                      loop_count,
2534                                                      size);
2535               if (ret_val == ZSCAN_FOUND_PAGE_END)
2536               {
2537                 /**
2538                  * We have finished scanning a CHANGE PAGE row where we
2539                  * checked even the parts of a page. In this case we
2540                  * perform very detailed analysis that we clear all bits
2541                  * while scanning. To handle this we will set a special
2542                  * bit if anyone updates any row in the page while
2543                  * we are scanning in this mode. This ensures that the
2544                  * flag bits are in read-only mode and only updated by
2545                  * LCP scanning. We don't track which part of page is
2546                  * updated in this case, so if any updates have been
2547                  * performed on page in this state, all bits on page
2548                  * are set to ensure that we will scan the entire page
2549                  * in the next LCP scan.
2550                  */
2551                 ndbassert(!page->get_any_changes());
2552                 page->clear_page_being_lcp_scanned();
2553                 if (page->get_and_clear_change_while_lcp_scan())
2554                 {
2555                   jamDebug();
2556                   page->set_all_change_map();
2557                 }
2558                 /**
2559                  * We've finished scanning a page that was using filtering using
2560                  * the bitmaps on the page. We are ready to set the last LCP
2561                  * state to A.
2562                  */
2563                 /* Coverage tested */
2564                 set_last_lcp_state(fragPtr.p,
2565                                    key.m_page_no,
2566                                    false /* Set state to A */);
2567                 scan.m_last_seen = __LINE__;
2568                 pos.m_get = ScanPos::Get_next_page;
2569                 break;
2570               }
2571             }
2572             ret_val = handle_scan_change_page_rows(scan,
2573                                                    page,
2574                                                    tuple_header_ptr,
2575                                                    foundGCI);
2576             if (likely(ret_val == ZSCAN_FOUND_TUPLE))
2577             {
2578               thbits = tuple_header_ptr->m_header_bits;
2579               goto found_tuple;
2580             }
2581             else if (ret_val == ZSCAN_FOUND_DELETED_ROWID)
2582               goto found_deleted_rowid;
2583             ndbrequire(ret_val == ZSCAN_FOUND_NEXT_ROW);
2584           }
2585         }
2586         else
2587         {
2588           jam();
2589           /**
2590            * We've finished scanning a page, for LCPs we are ready to
2591            * set the last LCP state to A.
2592            */
2593           if (bits & ScanOp::SCAN_LCP)
2594           {
2595             jam();
2596             /* Coverage tested */
2597             set_last_lcp_state(fragPtr.p,
2598                                key.m_page_no,
2599                                false /* Set state to A */);
2600             if (!pos.m_all_rows)
2601             {
2602               ndbassert(page->verify_change_maps(jamBuffer()));
2603             }
2604             scan.m_last_seen = __LINE__;
2605           }
2606           // no more tuples on this page
2607           pos.m_get = ScanPos::Get_next_page;
2608         }
2609       }
2610       else
2611       {
2612         jam();
2613         Var_page * page = (Var_page*)pos.m_page;
2614         if (key.m_page_idx < page->high_index)
2615         {
2616           jam();
2617           pos.m_get = ScanPos::Get_next_tuple;
2618           if (!page->is_free(key.m_page_idx))
2619           {
2620             tuple_header_ptr = (Tuple_header*)page->get_ptr(key.m_page_idx);
2621             thbits = tuple_header_ptr->m_header_bits;
2622             goto found_tuple;
2623           }
2624         }
2625         else
2626         {
2627           jam();
2628           // no more tuples on this page
2629           pos.m_get = ScanPos::Get_next_page;
2630           break;
2631         }
2632       }
2633       break; // incr loop count
2634   found_tuple:
2635       // found possible tuple to return
2636       jam();
2637       {
2638         // caller has already set pos.m_get to next tuple
2639         if (likely(! (bits & ScanOp::SCAN_LCP &&
2640                       thbits & Tuple_header::LCP_SKIP)))
2641         {
2642           Local_key& key_mm = pos.m_key_mm;
2643           if (likely(! (bits & ScanOp::SCAN_DD)))
2644           {
2645             key_mm = pos.m_key;
2646             // real page id is already set
2647             if (bits & ScanOp::SCAN_LCP)
2648             {
2649               c_backup->update_pause_lcp_counter(loop_count);
2650             }
2651           }
2652           else
2653           {
2654             tuple_header_ptr->get_base_record_ref(key_mm);
2655             // recompute for each disk tuple
2656             pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
2657           }
2658           // TUPKEYREQ handles savepoint stuff
2659           scan.m_state = ScanOp::Current;
2660           return true;
2661         }
2662         else
2663         {
2664           jam();
2665           /* Clear LCP_SKIP bit so that it will not show up in next LCP */
2666           tuple_header_ptr->m_header_bits =
2667             thbits & ~(Uint32)Tuple_header::LCP_SKIP;
2668 
2669           DEB_LCP_SKIP(("(%u) 3 Reset LCP_SKIP on tab(%u,%u), row(%u,%u)"
2670                         ", header: %x",
2671                         instance(),
2672                         fragPtr.p->fragTableId,
2673                         fragPtr.p->fragmentId,
2674                         key.m_page_no,
2675                         key.m_page_idx,
2676                         thbits));
2677 
2678           updateChecksum(tuple_header_ptr,
2679                          tablePtr.p,
2680                          thbits,
2681                          tuple_header_ptr->m_header_bits);
2682           scan.m_last_seen = __LINE__;
2683         }
2684       }
2685       break;
2686 
2687   record_dropped_change_page:
2688       {
2689         ndbrequire(c_backup->is_partial_lcp_enabled());
2690         c_backup->update_pause_lcp_counter(loop_count);
2691         record_delete_by_pageid(signal,
2692                                 frag.fragTableId,
2693                                 frag.fragmentId,
2694                                 scan,
2695                                 key.m_page_no,
2696                                 size,
2697                                 true);
2698         return false;
2699       }
2700 
2701   found_deleted_rowid:
2702 
2703       ndbrequire((bits & ScanOp::SCAN_NR) ||
2704                  (bits & ScanOp::SCAN_LCP));
2705       if (!(bits & ScanOp::SCAN_LCP && pos.m_is_last_lcp_state_D))
2706       {
2707         ndbrequire(bits & ScanOp::SCAN_NR ||
2708                   pos.m_lcp_scan_changed_rows_page);
2709 
2710         Local_key& key_mm = pos.m_key_mm;
2711         if (! (bits & ScanOp::SCAN_DD))
2712         {
2713           jam();
2714           key_mm = pos.m_key;
2715           // caller has already set pos.m_get to next tuple
2716           // real page id is already set
2717         }
2718         else
2719         {
2720           jam();
2721           /**
2722            * Currently dead code since NR scans never use Disk data scans.
2723            */
2724           ndbrequire(bits & ScanOp::SCAN_NR);
2725           tuple_header_ptr->get_base_record_ref(key_mm);
2726           // recompute for each disk tuple
2727           pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
2728 
2729           Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
2730           tuple_header_ptr =
2731             (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
2732           if ((foundGCI = *tuple_header_ptr->get_mm_gci(tablePtr.p)) >
2733                scan.m_scanGCI ||
2734               foundGCI == 0)
2735           {
2736             thbits = tuple_header_ptr->m_header_bits;
2737             if (! (thbits & Tuple_header::FREE ||
2738                    thbits & Tuple_header::DELETE_WAIT))
2739             {
2740               jam();
2741               break;
2742             }
2743             jam();
2744           }
2745         }
2746         /**
2747          * This code handles Node recovery, the row might still exist at the
2748          * starting node although it no longer exists at this live node. We
2749          * send a DELETE by ROWID to the starting node.
2750          *
2751          * This code is also used by LCPs to record deleted row ids.
2752          */
2753         c_backup->update_pause_lcp_counter(loop_count);
2754         record_delete_by_rowid(signal,
2755                                frag.fragTableId,
2756                                frag.fragmentId,
2757                                scan,
2758                                pos.m_key_mm,
2759                                foundGCI,
2760                                true);
2761         // TUPKEYREQ handles savepoint stuff
2762         return false;
2763       }
2764       scan.m_last_seen = __LINE__;
2765       break; // incr loop count
2766     default:
2767       ndbabort();
2768     }
2769     loop_count+= 4;
2770     if (loop_count >= 512)
2771     {
2772       jam();
2773       if (bits & ScanOp::SCAN_LCP)
2774       {
2775         jam();
2776         c_backup->update_pause_lcp_counter(loop_count);
2777         if (!c_backup->check_pause_lcp())
2778         {
2779           loop_count = 0;
2780           continue;
2781         }
2782         c_backup->pausing_lcp(5,loop_count);
2783       }
2784       break;
2785     }
2786   }
2787   // TODO: at drop table we have to flush and terminate these
2788   jam();
2789   scan.m_last_seen = __LINE__;
2790   signal->theData[0] = ZTUP_SCAN;
2791   signal->theData[1] = scanPtr.i;
2792   if (!c_lqh->rt_break_is_scan_prioritised(scan.m_userPtr))
2793   {
2794     jam();
2795     sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
2796   }
2797   else
2798   {
2799     /**
2800      * Sending with bounded delay means that we allow all signals in job buffer
2801      * to be executed until the maximum is arrived at which is currently 100.
2802      * So sending with bounded delay means that we get more predictable delay.
2803      * It might be longer than with priority B, but it will never be longer
2804      * than 100 signals.
2805      */
2806     jam();
2807 //#ifdef VM_TRACE
2808     c_debug_count++;
2809     if (c_debug_count % 10000 == 0)
2810     {
2811       DEB_LCP_DELAY(("(%u)TupScan delayed 10000 times", instance()));
2812     }
2813 //#endif
2814     sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, BOUNDED_DELAY, 2);
2815   }
2816   return false;
2817 }
2818 
2819 void
record_delete_by_rowid(Signal * signal,Uint32 tableId,Uint32 fragmentId,ScanOp & scan,Local_key & key,Uint32 foundGCI,bool set_scan_state)2820 Dbtup::record_delete_by_rowid(Signal *signal,
2821                               Uint32 tableId,
2822                               Uint32 fragmentId,
2823                               ScanOp &scan,
2824                               Local_key &key,
2825                               Uint32 foundGCI,
2826                               bool set_scan_state)
2827 {
2828   const Uint32 bits = scan.m_bits;
2829   DEB_LCP_DEL_EXTRA(("(%u)Delete by rowid tab(%u,%u), row(%u,%u)",
2830                      instance(),
2831                      tableId,
2832                      fragmentId,
2833                      key.m_page_no,
2834                      key.m_page_idx));
2835   NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
2836   conf->scanPtr = scan.m_userPtr;
2837   conf->accOperationPtr = (bits & ScanOp::SCAN_LCP) ? Uint32(-1) : RNIL;
2838   conf->fragId = fragmentId;
2839   conf->localKey[0] = key.m_page_no;
2840   conf->localKey[1] = key.m_page_idx;
2841   conf->gci = foundGCI;
2842   if (set_scan_state)
2843     scan.m_state = ScanOp::Next;
2844   signal->setLength(NextScanConf::SignalLengthNoKeyInfo);
2845   c_lqh->exec_next_scan_conf(signal);
2846   return;
2847 }
2848 
2849 void
record_delete_by_pageid(Signal * signal,Uint32 tableId,Uint32 fragmentId,ScanOp & scan,Uint32 page_no,Uint32 record_size,bool set_scan_state)2850 Dbtup::record_delete_by_pageid(Signal *signal,
2851                                Uint32 tableId,
2852                                Uint32 fragmentId,
2853                                ScanOp &scan,
2854                                Uint32 page_no,
2855                                Uint32 record_size,
2856                                bool set_scan_state)
2857 {
2858   DEB_LCP_DEL_EXTRA(("(%u)Delete by pageid tab(%u,%u), page(%u)",
2859                      instance(),
2860                      tableId,
2861                      fragmentId,
2862                      page_no));
2863   jam();
2864   /**
2865    * Set page_idx to flag to LQH that it is a
2866    * DELETE by PAGEID, this also ensures that we go to the next
2867    * page when we return to continue the LCP scan.
2868    */
2869   Uint32 page_idx = ZNIL;
2870 
2871   NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
2872   conf->scanPtr = scan.m_userPtr;
2873   conf->accOperationPtr = Uint32(-1);
2874   conf->fragId = fragmentId;
2875   conf->localKey[0] = page_no;
2876   conf->localKey[1] = page_idx;
2877   conf->gci = record_size; /* Used to transport record size */
2878   if (set_scan_state)
2879     scan.m_state = ScanOp::Next;
2880   signal->setLength(NextScanConf::SignalLengthNoKeyInfo);
2881   c_lqh->exec_next_scan_conf(signal);
2882 }
2883 
2884 /**
2885  * The LCP requires that some rows which are deleted during the main-memory
2886  * scan of fragments with disk-data parts are included in the main-memory LCP.
2887  * This is done so that during recovery, the main-memory part can be used to
2888  * find the disk-data part again, so that it can be deleted during Redo
2889  * application.
2890  *
2891  * This is implemented by copying the row content into
2892  * 'undo memory' / copy tuple space, and adding it to a per-fragment
2893  * 'lcp keep list', before deleting it at transaction commit time.
2894  * The row content is then only reachable via the lcp keep list, and does not
2895  * cause any ROWID reuse issues (899).
2896  *
2897  * The LCP scan treats the fragment's 'lcp keep list' as a top-priority source
2898  * of rows to be included in the fragment LCP, so rows should only be kept
2899  * momentarily.
2900  *
2901  * As these rows exist solely in DBTUP undo memory, it is not necessary to
2902  * perform the normal ACC locking protocols etc, but it is necessary to prepare
2903  * TUP for the coming TUPKEYREQ...
2904  *
2905  * The principle behind the LCP keep list is described in more detail in
2906  * the research paper:
2907  * Recovery Principles of MySQL Cluster 5.1 presented at VLDB in 2005.
2908  * The main thought is that we restore the disk data part to the point in time
2909  * when we start the LCP on the fragment. Thus we need to ensure that any rows
2910  * that exist at start of LCP also exist in the LCP and vice versa any row
2911  * that didn't exist at start of LCP doesn't exist in LCP. Updates of rows
2912  * don't matter since the REDO log application will ensure that the row
2913  * gets synchronized.
2914  *
2915  * An important part of this is to record the number of pages at start of LCP.
2916  * We don't need to worry about scanning pages deleted during LCP since the
2917  * LCP keep list ensures that those rows were checkpointed before being
2918  * deleted.
2919  */
2920 void
handle_lcp_keep(Signal * signal,FragrecordPtr fragPtr,ScanOp * scanPtrP)2921 Dbtup::handle_lcp_keep(Signal* signal,
2922                        FragrecordPtr fragPtr,
2923                        ScanOp* scanPtrP)
2924 {
2925   TablerecPtr tablePtr;
2926   tablePtr.i = scanPtrP->m_tableId;
2927   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
2928 
2929   ndbrequire(!fragPtr.p->m_lcp_keep_list_head.isNull());
2930   Local_key tmp = fragPtr.p->m_lcp_keep_list_head;
2931   Uint32 * copytuple = get_copy_tuple_raw(&tmp);
2932   if (copytuple[0] == FREE_PAGE_RNIL)
2933   {
2934     jam();
2935     ndbrequire(c_backup->is_partial_lcp_enabled());
2936     /* Handle DELETE by ROWID or DELETE by PAGEID */
2937     Uint32 num_entries = copytuple[4];
2938     Uint32 page_id = copytuple[5];
2939     Uint16 *page_index_array = (Uint16*)&copytuple[6];
2940     c_backup->change_current_page_temp(page_id);
2941     if (page_index_array[0] == ZNIL)
2942     {
2943       jam();
2944       /* DELETE by PAGEID */
2945       const Uint32 size = tablePtr.p->m_offsets[MM].m_fix_header_size;
2946       Local_key key;
2947       key.m_page_no = page_id;
2948       key.m_page_idx = ZNIL;
2949       ndbrequire(num_entries == 1);
2950       DEB_LCP_KEEP(("(%u)tab(%u,%u) page(%u): Handle LCP keep DELETE by PAGEID",
2951                     instance(),
2952                     fragPtr.p->fragTableId,
2953                     fragPtr.p->fragmentId,
2954                     page_id));
2955       remove_top_from_lcp_keep_list(fragPtr.p, copytuple, tmp);
2956       c_backup->lcp_keep_delete_by_page_id();
2957       record_delete_by_pageid(signal,
2958                               fragPtr.p->fragTableId,
2959                               fragPtr.p->fragmentId,
2960                               *scanPtrP,
2961                               page_id,
2962                               size,
2963                               false);
2964       c_undo_buffer.free_copy_tuple(&tmp);
2965     }
2966     else
2967     {
2968       jam();
2969       /* DELETE by ROWID */
2970       Local_key key;
2971       key.m_page_no = page_id;
2972       ndbrequire(num_entries > 0);
2973       num_entries--;
2974       key.m_page_no = page_id;
2975       key.m_page_idx = page_index_array[num_entries];
2976       copytuple[4] = num_entries;
2977       c_backup->lcp_keep_delete_row();
2978       DEB_LCP_KEEP(("(%u)tab(%u,%u) page(%u,%u): "
2979                     "Handle LCP keep DELETE by ROWID",
2980                     instance(),
2981                     fragPtr.p->fragTableId,
2982                     fragPtr.p->fragmentId,
2983                     key.m_page_no,
2984                     key.m_page_idx));
2985       if (num_entries == 0)
2986       {
2987         jam();
2988         remove_top_from_lcp_keep_list(fragPtr.p, copytuple, tmp);
2989       }
2990       record_delete_by_rowid(signal,
2991                              fragPtr.p->fragTableId,
2992                              fragPtr.p->fragmentId,
2993                              *scanPtrP,
2994                              key,
2995                              0,
2996                              false);
2997       if (num_entries == 0)
2998       {
2999         jam();
3000         c_undo_buffer.free_copy_tuple(&tmp);
3001       }
3002     }
3003   }
3004   else
3005   {
3006     jam();
3007     /**
3008      * tmp points to copy tuple. We need real page id to change to correct
3009      * current page temporarily. This can be found in copytuple[0]
3010      * where handle_lcp_keep_commit puts it.
3011      */
3012     c_backup->change_current_page_temp(copytuple[0]);
3013     c_backup->lcp_keep_row();
3014     remove_top_from_lcp_keep_list(fragPtr.p, copytuple, tmp);
3015     DEB_LCP_KEEP(("(%u)tab(%u,%u) row(%u,%u) page(%u,%u): Handle LCP keep"
3016                   " insert entry",
3017                   instance(),
3018                   fragPtr.p->fragTableId,
3019                   fragPtr.p->fragmentId,
3020                   copytuple[0],
3021                   copytuple[1],
3022                   tmp.m_page_no,
3023                   tmp.m_page_idx));
3024     Local_key save = tmp;
3025     setCopyTuple(tmp.m_page_no, tmp.m_page_idx);
3026     prepare_scanTUPKEYREQ(tmp.m_page_no, tmp.m_page_idx);
3027     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
3028     conf->scanPtr = scanPtrP->m_userPtr;
3029     conf->accOperationPtr = (Uint32)-1;
3030     conf->fragId = fragPtr.p->fragmentId;
3031     conf->localKey[0] = tmp.m_page_no;
3032     conf->localKey[1] = tmp.m_page_idx;
3033     signal->setLength(NextScanConf::SignalLengthNoGCI);
3034     c_lqh->exec_next_scan_conf(signal);
3035     c_undo_buffer.free_copy_tuple(&save);
3036     return;
3037   }
3038 }
3039 
3040 void
remove_top_from_lcp_keep_list(Fragrecord * fragPtrP,Uint32 * copytuple,Local_key tmp)3041 Dbtup::remove_top_from_lcp_keep_list(Fragrecord *fragPtrP,
3042                                      Uint32 *copytuple,
3043                                      Local_key tmp)
3044 {
3045   memcpy(&fragPtrP->m_lcp_keep_list_head,
3046          copytuple+2,
3047          sizeof(Local_key));
3048 
3049   if (fragPtrP->m_lcp_keep_list_head.isNull())
3050   {
3051     jam();
3052     DEB_LCP_KEEP(("(%u) tab(%u,%u) tmp(%u,%u) keep_list(%u,%u):"
3053                   " LCP keep list empty again",
3054                   instance(),
3055                   fragPtrP->fragTableId,
3056                   fragPtrP->fragmentId,
3057                   tmp.m_page_no,
3058                   tmp.m_page_idx,
3059                   fragPtrP->m_lcp_keep_list_tail.m_page_no,
3060                   fragPtrP->m_lcp_keep_list_tail.m_page_idx));
3061     ndbrequire(tmp.m_page_no == fragPtrP->m_lcp_keep_list_tail.m_page_no);
3062     ndbrequire(tmp.m_page_idx == fragPtrP->m_lcp_keep_list_tail.m_page_idx);
3063     fragPtrP->m_lcp_keep_list_tail.setNull();
3064   }
3065   else
3066   {
3067     jam();
3068     DEB_LCP_KEEP(("(%u)tab(%u,%u) move LCP keep head(%u,%u),tail(%u,%u)",
3069                   instance(),
3070                   fragPtrP->fragTableId,
3071                   fragPtrP->fragmentId,
3072                   fragPtrP->m_lcp_keep_list_head.m_page_no,
3073                   fragPtrP->m_lcp_keep_list_head.m_page_idx,
3074                   fragPtrP->m_lcp_keep_list_tail.m_page_no,
3075                   fragPtrP->m_lcp_keep_list_tail.m_page_idx));
3076   }
3077 }
3078 
3079 void
handle_lcp_drop_change_page(Fragrecord * fragPtrP,Uint32 logicalPageId,PagePtr pagePtr,bool delete_by_pageid)3080 Dbtup::handle_lcp_drop_change_page(Fragrecord *fragPtrP,
3081                                    Uint32 logicalPageId,
3082                                    PagePtr pagePtr,
3083                                    bool delete_by_pageid)
3084 {
3085   /**
3086    * We are performing an LCP scan currently. This page is part of the
3087    * CHANGED ROWS pages. This means that we need to record all rows
3088    * that was deleted at start of LCP. If the row was deleted since the
3089    * last LCP scan then we need to record it as a DELETE by ROWID in
3090    * the LCP. The rows that was deleted after LCP start have already
3091    * been handled. Those that have been handled have got the LCP_SKIP
3092    * bit set in the tuple header. Those not handled we need to check
3093    * the Row GCI to see if it is either 0 or >= scanGCI. If so then
3094    * we need to record them as part of LCP.
3095    *
3096    * We store all the rowid's we find to record as DELETE by ROWID in
3097    * in a local data array on the stack before we start writing them
3098    * into the LCP keep list.
3099    *
3100    * We depend on that allocation of copy tuple will always succeed.
3101    * Since we always will release the page we are scanning we hold
3102    * that page until we know that copy tuple allocation succeeded.
3103    * If not, we do not release the scanned page, rather only change
3104    * resource type of it in memory manager.  The latter is done by
3105    * a two step operation.  First account page as unused but do not
3106    * put it in any kind of free list.  Then account it as a copy
3107    * tuple page.
3108    *
3109    * This procedure will guarantee that we have space to record the
3110    * DELETE by ROWIDs in the LCP keep list.
3111    *
3112    * An especially complex case happens when the LCP scan is in the
3113    * middle of scanning this page. This could happen due to an
3114    * inopportune real-time break in combination with multiple
3115    * deletes happening within this real-time break.
3116    *
3117    * If page_to_skip_lcp bit was set we will perform delete_by_pageid
3118    * here. So we need not worry about this flag in call to
3119    * is_rowid_in_remaining_lcp_set for each row in loop, this call will
3120    * ensure that we will skip any rows already handled by the LCP scan.
3121    */
3122   ScanOpPtr scanPtr;
3123   TablerecPtr tablePtr;
3124   scanPtr.i = fragPtrP->m_lcp_scan_op;
3125   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3126   tablePtr.i = fragPtrP->fragTableId;
3127   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3128   Uint32 scanGCI = scanPtr.p->m_scanGCI;
3129   Uint32 idx = 0; /* First record index */
3130   Uint32 size = tablePtr.p->m_offsets[MM].m_fix_header_size; /* Row size */
3131   Fix_page *page = (Fix_page*)pagePtr.p;
3132   Uint32 found_idx_count = 0;
3133   ndbrequire(size >= 4);
3134   Uint16 found_idx[2048]; /* Fixed size header never smaller than 16 bytes */
3135   DEB_LCP_REL(("(%u)tab(%u,%u)page(%u) handle_lcp_drop_page,"
3136                " delete_by_page: %u",
3137                instance(),
3138                fragPtrP->fragTableId,
3139                fragPtrP->fragmentId,
3140                logicalPageId,
3141                delete_by_pageid));
3142   if (!delete_by_pageid)
3143   {
3144     jam();
3145     Local_key key;
3146     /* Coverage tested */
3147     key.m_page_no = logicalPageId;
3148     while ((idx + size) <= Fix_page::DATA_WORDS)
3149     {
3150       Tuple_header *th = (Tuple_header*)&page->m_data[idx];
3151       Uint32 thbits = th->m_header_bits;
3152       Uint32 rowGCI = *th->get_mm_gci(tablePtr.p);
3153       bool lcp_skip_not_set =
3154         (thbits & Tuple_header::LCP_SKIP) ? false : true;
3155       ndbrequire(thbits & Tuple_header::FREE);
3156       ndbrequire(!(thbits & Tuple_header::LCP_DELETE) || lcp_skip_not_set);
3157       /**
3158        * We ignore LCP_DELETE on row here since if it is set then we also
3159        * know that LCP_SKIP isn't set, also we know rowGCI > scanGCI since the
3160        * row was inserted after start of LCP. So we will definitely record it
3161        * here for DELETE by ROWID.
3162        */
3163       key.m_page_idx = idx;
3164       bool is_in_remaining_lcp_set =
3165         is_rowid_in_remaining_lcp_set(pagePtr.p,
3166                                       fragPtrP,
3167                                       key,
3168                                       *scanPtr.p,
3169                                       0);
3170       if ((rowGCI > scanGCI || rowGCI == 0) &&
3171           lcp_skip_not_set &&
3172           is_in_remaining_lcp_set)
3173       {
3174         /* Coverage tested */
3175         jam();
3176         jamLine((Uint16)idx);
3177         found_idx[found_idx_count] = idx;
3178         found_idx_count++;
3179         DEB_LCP_REL(("(%u)tab(%u,%u)page(%u,%u) Keep_list DELETE_BY_ROWID",
3180                      instance(),
3181                      fragPtrP->fragTableId,
3182                      fragPtrP->fragmentId,
3183                      logicalPageId,
3184                      idx));
3185       }
3186       else
3187       {
3188         /* Coverage tested */
3189         DEB_LCP_REL(("(%u)tab(%u,%u)page(%u,%u) skipped "
3190                      "lcp_skip_not_set: %u, rowGCI: %u"
3191                      " scanGCI: %u, in LCP set: %u",
3192                      instance(),
3193                      fragPtrP->fragTableId,
3194                      fragPtrP->fragmentId,
3195                      logicalPageId,
3196                      idx,
3197                      lcp_skip_not_set,
3198                      rowGCI,
3199                      scanGCI,
3200                      is_in_remaining_lcp_set));
3201       }
3202       idx += size;
3203     }
3204   }
3205   else
3206   {
3207     jam();
3208     //ndbassert(false); //COVERAGE TEST
3209     found_idx_count = 1;
3210     found_idx[0] = ZNIL; /* Indicates DELETE by PAGEID */
3211     DEB_LCP_REL(("(%u)tab(%u,%u)page(%u) Keep_list DELETE_BY_PAGEID",
3212                  instance(),
3213                  fragPtrP->fragTableId,
3214                  fragPtrP->fragmentId,
3215                  logicalPageId));
3216   }
3217   Local_key location;
3218   /**
3219    * We store the following content into the copy tuple with a set of
3220    * DELETE by ROWID.
3221    * 1) Header (4 words)
3222    * 2) Number of rowids stored (1 word)
3223    * 3) Page Id (1 word)
3224    * 4) Array of Page indexes (1/2 word per entry)
3225    */
3226   if (found_idx_count == 0)
3227   {
3228     /* Nothing to store, all rows were already handled. */
3229     jam();
3230     returnCommonArea(pagePtr.i, 1);
3231     return;
3232   }
3233   Uint32 words = 6 + ((found_idx_count + 1) / 2);
3234   if (likely(c_undo_buffer.alloc_copy_tuple(&location, words) != nullptr))
3235   {
3236     jam();
3237     returnCommonArea(pagePtr.i, 1);
3238   }
3239   else
3240   {
3241     jam();
3242     ndbrequire(returnCommonArea_for_reuse(pagePtr.i, 1));
3243     ndbrequire(c_undo_buffer.reuse_page_for_copy_tuple(pagePtr.i));
3244     ndbrequire(c_undo_buffer.alloc_copy_tuple(&location, words) != nullptr);
3245   }
3246   Uint32 * copytuple = get_copy_tuple_raw(&location);
3247   Local_key flag_key;
3248   flag_key.m_page_no = FREE_PAGE_RNIL;
3249   flag_key.m_page_idx = 0;
3250   flag_key.m_file_no = 0;
3251 
3252   copytuple[4] = found_idx_count;
3253   copytuple[5] = logicalPageId;
3254   memcpy(&copytuple[6], &found_idx[0], 2 * found_idx_count);
3255   insert_lcp_keep_list(fragPtrP,
3256                        location,
3257                        copytuple,
3258                        &flag_key);
3259 }
3260 
3261 void
insert_lcp_keep_list(Fragrecord * fragPtrP,Local_key location,Uint32 * copytuple,const Local_key * rowid)3262 Dbtup::insert_lcp_keep_list(Fragrecord *fragPtrP,
3263                             Local_key location,
3264                             Uint32 *copytuple,
3265                             const Local_key *rowid)
3266 {
3267   /**
3268    * Store original row-id in copytuple[0,1]
3269    * Store next-ptr in copytuple[2,3] (set to RNIL/RNIL)
3270    */
3271   assert(sizeof(Local_key) == 8);
3272   memcpy(copytuple+0, rowid, sizeof(Local_key));
3273   Local_key nil;
3274   nil.setNull();
3275   memcpy(copytuple+2, &nil, sizeof(Local_key));
3276   DEB_LCP_KEEP(("(%u)tab(%u,%u) Insert LCP keep for row(%u,%u)"
3277                 " from location page(%u,%u)",
3278                 instance(),
3279                 fragPtrP->fragTableId,
3280                 fragPtrP->fragmentId,
3281                 rowid->m_page_no,
3282                 rowid->m_page_idx,
3283                 location.m_page_no,
3284                 location.m_page_idx));
3285 
3286   /**
3287    * Link in the copy tuple into the LCP keep list.
3288    */
3289   if (fragPtrP->m_lcp_keep_list_tail.isNull())
3290   {
3291     jam();
3292     fragPtrP->m_lcp_keep_list_head = location;
3293   }
3294   else
3295   {
3296     jam();
3297     Uint32 *tail = get_copy_tuple_raw(&fragPtrP->m_lcp_keep_list_tail);
3298     Local_key nextptr;
3299     memcpy(&nextptr, tail+2, sizeof(Local_key));
3300     ndbrequire(nextptr.isNull());
3301     memcpy(tail+2, &location, sizeof(Local_key));
3302   }
3303   fragPtrP->m_lcp_keep_list_tail = location;
3304 }
3305 
3306 void
scanCont(Signal * signal,ScanOpPtr scanPtr)3307 Dbtup::scanCont(Signal* signal, ScanOpPtr scanPtr)
3308 {
3309   bool immediate = scanNext(signal, scanPtr);
3310   if (! immediate) {
3311     jam();
3312     // time-slicing again
3313     return;
3314   }
3315   scanReply(signal, scanPtr);
3316 }
3317 
3318 void
disk_page_tup_scan_callback(Signal * signal,Uint32 scanPtrI,Uint32 page_i)3319 Dbtup::disk_page_tup_scan_callback(Signal* signal, Uint32 scanPtrI, Uint32 page_i)
3320 {
3321   ScanOpPtr scanPtr;
3322   scanPtr.i = scanPtrI;
3323   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3324   ScanOp& scan = *scanPtr.p;
3325   c_lqh->setup_scan_pointers(scan.m_userPtr);
3326   ScanPos& pos = scan.m_scanPos;
3327   // get cache page
3328   Ptr<GlobalPage> gptr;
3329   m_global_page_pool.getPtr(gptr, page_i);
3330   pos.m_page = (Page*)gptr.p;
3331   // continue
3332   ndbrequire((scan.m_bits & ScanOp::SCAN_LOCK) == 0);
3333   /**
3334    * Since Disk scans can only scan read only and without locks we can bypass
3335    * the code in execACC_CHECK_SCAN and move directly to scanNext and
3336    * scanReply.
3337    */
3338   scanCont(signal, scanPtr);
3339 }
3340 
3341 void
scanClose(Signal * signal,ScanOpPtr scanPtr)3342 Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr)
3343 {
3344   ScanOp& scan = *scanPtr.p;
3345   ndbrequire(! (scan.m_bits & ScanOp::SCAN_LOCK_WAIT) && scan.m_accLockOp == RNIL);
3346   {
3347     /**
3348      * unlock all not unlocked by LQH
3349      * Ensure that LocalDLFifoList is destroyed before calling
3350      * EXECUTE_DIRECT on NEXT_SCANCONF which might end up
3351      * creating the same object further down the stack.
3352      */
3353     Local_ScanLock_fifo list(c_scanLockPool, scan.m_accLockOps);
3354     ScanLockPtr lockPtr;
3355     while (list.first(lockPtr)) {
3356       jam();
3357       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
3358       lockReq->returnCode = RNIL;
3359       lockReq->requestInfo = AccLockReq::Abort;
3360       lockReq->accOpPtr = lockPtr.p->m_accLockOp;
3361       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
3362       jamEntry();
3363       ndbrequire(lockReq->returnCode == AccLockReq::Success);
3364       list.remove(lockPtr);
3365       release_scan_lock(lockPtr);
3366     }
3367   }
3368   checkPoolShrinkNeed(DBTUP_SCAN_LOCK_TRANSIENT_POOL_INDEX,
3369                       c_scanLockPool);
3370   // send conf
3371   scan.m_last_seen = __LINE__;
3372   NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
3373   conf->scanPtr = scanPtr.p->m_userPtr;
3374   conf->accOperationPtr = RNIL;
3375   conf->fragId = RNIL;
3376   releaseScanOp(scanPtr);
3377   signal->setLength(NextScanConf::SignalLengthNoTuple);
3378   c_lqh->exec_next_scan_conf(signal);
3379   return;
3380 }
3381 
release_scan_lock(ScanLockPtr releasePtr)3382 void Dbtup::release_scan_lock(ScanLockPtr releasePtr)
3383 {
3384   if (likely(releasePtr.i != c_copy_frag_scan_lock))
3385   {
3386     c_scanLockPool.release(releasePtr);
3387   }
3388   else
3389   {
3390     jam();
3391     releasePtr.p->m_accLockOp = RNIL;
3392     releasePtr.p->prevList = RNIL;
3393     releasePtr.p->nextList = RNIL;
3394   }
3395 }
3396 
release_c_free_scan_lock()3397 void Dbtup::release_c_free_scan_lock()
3398 {
3399   if (c_freeScanLock != RNIL)
3400   {
3401     ScanLockPtr releasePtr;
3402     releasePtr.i = c_freeScanLock;
3403     ndbrequire(c_scanLockPool.getValidPtr(releasePtr));
3404     release_scan_lock(releasePtr);
3405     c_freeScanLock = RNIL;
3406     checkPoolShrinkNeed(DBTUP_SCAN_LOCK_TRANSIENT_POOL_INDEX,
3407                         c_scanLockPool);
3408   }
3409 }
3410 
3411 void
addAccLockOp(ScanOp & scan,Uint32 accLockOp)3412 Dbtup::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
3413 {
3414   Local_ScanLock_fifo list(c_scanLockPool, scan.m_accLockOps);
3415   ScanLockPtr lockPtr;
3416 #ifdef VM_TRACE
3417   list.first(lockPtr);
3418   while (lockPtr.i != RNIL) {
3419     ndbrequire(lockPtr.p->m_accLockOp != accLockOp);
3420     list.next(lockPtr);
3421   }
3422 #endif
3423   lockPtr.i = c_freeScanLock;
3424   c_freeScanLock = RNIL;
3425   ndbrequire(c_scanLockPool.getValidPtr(lockPtr));
3426   lockPtr.p->m_accLockOp = accLockOp;
3427   list.addLast(lockPtr);
3428 }
3429 
3430 void
removeAccLockOp(ScanOp & scan,Uint32 accLockOp)3431 Dbtup::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
3432 {
3433   Local_ScanLock_fifo list(c_scanLockPool, scan.m_accLockOps);
3434   ScanLockPtr lockPtr;
3435   list.first(lockPtr);
3436   while (lockPtr.i != RNIL) {
3437     if (lockPtr.p->m_accLockOp == accLockOp) {
3438       jam();
3439       break;
3440     }
3441     list.next(lockPtr);
3442   }
3443   ndbrequire(lockPtr.i != RNIL);
3444   list.remove(lockPtr);
3445   release_scan_lock(lockPtr);
3446   checkPoolShrinkNeed(DBTUP_SCAN_LOCK_TRANSIENT_POOL_INDEX,
3447                       c_scanLockPool);
3448 }
3449 
3450 void
stop_lcp_scan(Uint32 tableId,Uint32 fragId)3451 Dbtup::stop_lcp_scan(Uint32 tableId, Uint32 fragId)
3452 {
3453   jamEntry();
3454   TablerecPtr tablePtr;
3455   tablePtr.i = tableId;
3456   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3457 
3458   FragrecordPtr fragPtr;
3459   fragPtr.i = RNIL;
3460   getFragmentrec(fragPtr, fragId, tablePtr.p);
3461   ndbrequire(fragPtr.i != RNIL);
3462   Fragrecord& frag = *fragPtr.p;
3463 
3464   ndbrequire(frag.m_lcp_scan_op != RNIL && c_lcp_scan_op != RNIL);
3465   ScanOpPtr scanPtr;
3466   scanPtr.i = frag.m_lcp_scan_op;
3467   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3468   ndbrequire(scanPtr.p->m_fragPtrI != RNIL);
3469 
3470   fragPtr.p->m_lcp_scan_op = RNIL;
3471   scanPtr.p->m_fragPtrI = RNIL;
3472   scanPtr.p->m_tableId = RNIL;
3473 }
3474 
3475 void
releaseScanOp(ScanOpPtr & scanPtr)3476 Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
3477 {
3478   FragrecordPtr fragPtr;
3479   fragPtr.i = scanPtr.p->m_fragPtrI;
3480   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3481 
3482   if (scanPtr.p->m_bits & ScanOp::SCAN_LCP)
3483   {
3484     jam();
3485     /**
3486      * Ignore, handled in release_lcp_scan, an LCP scan
3487      * can happen in several scans, one per LCP file.
3488      */
3489   }
3490   else if ((scanPtr.p->m_bits & ScanOp::SCAN_COPY_FRAG) != 0)
3491   {
3492     jam();
3493     ndbrequire(c_copy_frag_scan_op == scanPtr.i);
3494     scanPtr.p->m_state = ScanOp::First;
3495     scanPtr.p->m_bits = 0;
3496   }
3497   else
3498   {
3499     jam();
3500     Local_ScanOp_list list(c_scanOpPool, fragPtr.p->m_scanList);
3501     list.remove(scanPtr);
3502     c_scanOpPool.release(scanPtr);
3503     checkPoolShrinkNeed(DBTUP_SCAN_OPERATION_TRANSIENT_POOL_INDEX,
3504                         c_scanOpPool);
3505   }
3506 }
3507 
3508 void
start_lcp_scan(Uint32 tableId,Uint32 fragId,Uint32 & max_page_cnt)3509 Dbtup::start_lcp_scan(Uint32 tableId,
3510                       Uint32 fragId,
3511                       Uint32 & max_page_cnt)
3512 {
3513   jamEntry();
3514   TablerecPtr tablePtr;
3515   tablePtr.i = tableId;
3516   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3517 
3518   FragrecordPtr fragPtr;
3519   fragPtr.i = RNIL;
3520   getFragmentrec(fragPtr, fragId, tablePtr.p);
3521   ndbrequire(fragPtr.i != RNIL);
3522   Fragrecord& frag = *fragPtr.p;
3523 
3524   ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
3525   frag.m_lcp_scan_op = c_lcp_scan_op;
3526   ScanOpPtr scanPtr;
3527   scanPtr.i = frag.m_lcp_scan_op;
3528   ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3529   ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
3530   new (scanPtr.p) ScanOp;
3531   scanPtr.p->m_fragPtrI = fragPtr.i;
3532   scanPtr.p->m_tableId = tableId;
3533   scanPtr.p->m_state = ScanOp::First;
3534   scanPtr.p->m_last_seen = __LINE__;
3535   scanPtr.p->m_endPage = frag.m_max_page_cnt;
3536   max_page_cnt = frag.m_max_page_cnt;
3537 
3538   ndbrequire(frag.m_lcp_keep_list_head.isNull());
3539   ndbrequire(frag.m_lcp_keep_list_tail.isNull());
3540 }
3541 
3542 void
lcp_frag_watchdog_print(Uint32 tableId,Uint32 fragId)3543 Dbtup::lcp_frag_watchdog_print(Uint32 tableId, Uint32 fragId)
3544 {
3545   TablerecPtr tablePtr;
3546   tablePtr.i = tableId;
3547   if (tableId > cnoOfTablerec)
3548   {
3549     jam();
3550     return;
3551   }
3552   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3553 
3554   FragrecordPtr fragPtr;
3555   fragPtr.i = RNIL;
3556   getFragmentrec(fragPtr, fragId, tablePtr.p);
3557   ndbrequire(fragPtr.i != RNIL);
3558   Fragrecord& frag = *fragPtr.p;
3559 
3560   if (c_lcp_scan_op == RNIL)
3561   {
3562     jam();
3563     g_eventLogger->info("No LCP scan ongoing in TUP tab(%u,%u)",
3564                         tableId, fragId);
3565     ndbabort();
3566   }
3567   else if (frag.m_lcp_scan_op == RNIL)
3568   {
3569     jam();
3570     DEB_LCP(("LCP scan stopped, signal to stop watchdog still in flight tab(%u,%u)",
3571              tableId, fragId));
3572   }
3573   else if (frag.m_lcp_scan_op != c_lcp_scan_op)
3574   {
3575     jam();
3576     g_eventLogger->info("Corrupt internal, LCP scan not on correct tab(%u,%u)",
3577                         tableId, fragId);
3578     ndbabort();
3579   }
3580   else
3581   {
3582     jam();
3583     ScanOpPtr scanPtr;
3584     scanPtr.i = frag.m_lcp_scan_op;
3585     ndbrequire(c_scanOpPool.getValidPtr(scanPtr));
3586     g_eventLogger->info("LCP Frag watchdog: tab(%u,%u), state: %u,"
3587                         " last seen line %u",
3588                         tableId, fragId,
3589                         scanPtr.p->m_state,
3590                         scanPtr.p->m_last_seen);
3591   }
3592 }
3593