1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #define DBTUP_C
27 #define DBTUP_PAGE_MAP_CPP
28 #include "Dbtup.hpp"
29 #include <RefConvert.hpp>
30 #include <ndb_limits.h>
31 #include <pc.hpp>
32 #include <signaldata/RestoreImpl.hpp>
33 #include "../backup/Backup.hpp"
34 
35 #define JAM_FILE_ID 415
36 
37 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
38 //#define DEBUG_LCP 1
39 //#define DEBUG_LCP_REL 1
40 //#define DEBUG_LCP_ALLOC 1
41 //#define DEBUG_LCP_FREE 1
42 //#define DEBUG_LCP_SKIP 1
43 //#define DEBUG_LCP_SCANNED_BIT 1
44 #endif
45 
46 #ifdef DEBUG_LCP
47 #define DEB_LCP(arglist) do { g_eventLogger->info arglist ; } while (0)
48 #else
49 #define DEB_LCP(arglist) do { } while (0)
50 #endif
51 
52 #ifdef DEBUG_LCP_REL
53 #define DEB_LCP_REL(arglist) do { g_eventLogger->info arglist ; } while (0)
54 #else
55 #define DEB_LCP_REL(arglist) do { } while (0)
56 #endif
57 
58 #ifdef DEBUG_LCP_ALLOC
59 #define DEB_LCP_ALLOC(arglist) do { g_eventLogger->info arglist ; } while (0)
60 #else
61 #define DEB_LCP_ALLOC(arglist) do { } while (0)
62 #endif
63 
64 #ifdef DEBUG_LCP_FREE
65 #define DEB_LCP_FREE(arglist) do { g_eventLogger->info arglist ; } while (0)
66 #else
67 #define DEB_LCP_FREE(arglist) do { } while (0)
68 #endif
69 
70 #ifdef DEBUG_LCP_SKIP
71 #define DEB_LCP_SKIP(arglist) do { g_eventLogger->info arglist ; } while (0)
72 #else
73 #define DEB_LCP_SKIP(arglist) do { } while (0)
74 #endif
75 
76 #ifdef DEBUG_LCP_SCANNED_BIT
77 #define DEB_LCP_SCANNED_BIT(arglist) \
78   do { g_eventLogger->info arglist ; } while (0)
79 #else
80 #define DEB_LCP_SCANNED_BIT(arglist) do { } while (0)
81 #endif
82 
83 #define DBUG_PAGE_MAP 0
84 
85 //
86 // PageMap is a service used by Dbtup to map logical page id's to physical
87 // page id's. The mapping is needs the fragment and the logical page id to
88 // provide the physical id.
89 //
90 // This is a part of Dbtup which is the exclusive user of a certain set of
91 // variables on the fragment record and it is the exclusive user of the
92 // struct for page ranges.
93 //
94 // The use of the fragment page map is described in some detail in Backup.cpp
95 // as part of the LCP description. We use 2 bits for important state info on
96 // this and the previous LCP state for a page.
97 //
98 // The following methods operate on the data handled by the page map class.
99 //
100 // Public methods
101 // insertPageRange(Uint32 startPageId,     # In
102 //                 Uint32 noPages)         # In
103 // Inserts a range of pages into the mapping structure.
104 //
105 // void releaseFragPage()
106 // Releases a page belonging to a fragment.
107 //
108 // Uint32 allocFragPages(Uint32 tafpNoAllocRequested)
109 // Allocate a set of pages to the fragment from the page manager
110 //
111 // Uint32 getEmptyPage()
112 // Get an empty page from the pool of empty pages on the fragment.
113 // It returns the physical page id of the empty page.
114 // Returns RNIL if no empty page is available.
115 //
116 // Uint32 getRealpid(Uint32 logicalPageId)
117 // Return the physical page id provided the logical page id
118 //
119 // void initializePageRange()
120 // Initialise free list of page ranges and initialise the page raneg records.
121 //
122 // void initFragRange()
123 // Initialise the fragment variables when allocating a fragment to a table.
124 //
125 // void initPageRangeSize(Uint32 size)
126 // Initialise the number of page ranges.
127 //
128 // Uint32 getNoOfPages()
129 // Get the number of pages on the fragment currently.
130 //
131 //
132 // Private methods
133 // Uint32 leafPageRangeFull(PageRangePtr currPageRangePtr)
134 //
135 // void errorHandler()
136 // Method to crash NDB kernel in case of weird data set-up
137 //
138 // void allocMoreFragPages()
139 // When no more empty pages are attached to the fragment and we need more
140 // we allocate more pages from the page manager using this method.
141 //
142 // Private data
143 // On the fragment record
144 // currentPageRange    # The current page range where to insert the next range
145 // rootPageRange       # The root of the page ranges owned
146 // nextStartRange      # The next page id to assign when expanding the
147 //                     # page map
148 // noOfPages           # The number of pages in the fragment
149 // emptyPrimPage       # The first page of the empty pages in the fragment
150 //
151 // The full page range struct
152 
153 Uint32*
init_page_map_entry(Fragrecord * regFragPtr,Uint32 logicalPageId)154 Dbtup::init_page_map_entry(Fragrecord *regFragPtr, Uint32 logicalPageId)
155 {
156   DEB_LCP(("(%u)init_page_map_entry tab(%u,%u):%u",
157           instance(),
158           regFragPtr->fragTableId,
159           regFragPtr->fragmentId,
160           logicalPageId));
161   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
162   Uint32 *prev_ptr = map.set(2 * logicalPageId + 1);
163   if (prev_ptr == 0)
164   {
165     jam();
166     return 0;
167   }
168   Uint32 *ptr = map.set(2 * logicalPageId);
169   if (ptr == 0)
170   {
171     jam();
172     (*prev_ptr) = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
173     return 0;
174   }
175   if (logicalPageId >= regFragPtr->m_max_page_cnt)
176   {
177     jam();
178     regFragPtr->m_max_page_cnt = logicalPageId + 1;
179     if (DBUG_PAGE_MAP)
180     {
181       g_eventLogger->info("(%u)allocIP: tab(%u,%u), new max: %u",
182                           instance(),
183                           regFragPtr->fragTableId,
184                           regFragPtr->fragmentId,
185                           regFragPtr->m_max_page_cnt);
186     }
187   }
188   (void)insert_free_page_id_list(regFragPtr,
189                                  logicalPageId,
190                                  ptr,
191                                  prev_ptr,
192                                  Uint32(0),
193                                  Uint32(0));
194   return map.get_dirty(2 * logicalPageId);
195 }
196 
getRealpid(Fragrecord * regFragPtr,Uint32 logicalPageId)197 Uint32 Dbtup::getRealpid(Fragrecord* regFragPtr, Uint32 logicalPageId)
198 {
199   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
200   Uint32 *ptr = map.get(2 * logicalPageId);
201   if (likely(ptr != 0))
202   {
203     ndbrequire((*ptr) != RNIL)
204     return ((*ptr) & PAGE_BIT_MASK);
205   }
206   ndbabort();
207   return RNIL;
208 }
209 
210 Uint32
getRealpidCheck(Fragrecord * regFragPtr,Uint32 logicalPageId)211 Dbtup::getRealpidCheck(Fragrecord* regFragPtr, Uint32 logicalPageId)
212 {
213   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
214   // logicalPageId might not be mapped yet,
215   // get_dirty returns NULL also in debug in this case.
216   Uint32 *ptr = map.get_dirty(2 * logicalPageId);
217   if (ptr == 0)
218   {
219     jam();
220     ptr = init_page_map_entry(regFragPtr, logicalPageId);
221   }
222   if (likely(ptr != 0))
223   {
224     Uint32 val = *ptr;
225     if ((val & FREE_PAGE_BIT) != 0)
226       return RNIL;
227     else
228       return (val & PAGE_BIT_MASK);
229   }
230   return RNIL;
231 }
232 
getNoOfPages(Fragrecord * const regFragPtr)233 Uint32 Dbtup::getNoOfPages(Fragrecord* const regFragPtr)
234 {
235   return regFragPtr->noOfPages;
236 }//Dbtup::getNoOfPages()
237 
238 void
init_page(Fragrecord * regFragPtr,PagePtr pagePtr,Uint32 pageId)239 Dbtup::init_page(Fragrecord* regFragPtr, PagePtr pagePtr, Uint32 pageId)
240 {
241   pagePtr.p->page_state = ~0;
242   pagePtr.p->frag_page_id = pageId;
243   pagePtr.p->physical_page_id = pagePtr.i;
244   pagePtr.p->nextList = RNIL;
245   pagePtr.p->prevList = RNIL;
246   pagePtr.p->m_flags = 0;
247   Tup_fixsize_page* fix_page = (Tup_fixsize_page*)pagePtr.p;
248   /**
249    * A new page is required to be fully scanned the first LCP after
250    * allocation to ensure that we generate DELETE BY ROWID for all
251    * positions that are not yet inserted into, this ensures that
252    * we don't leave deleted rows in change pages after an LCP.
253    */
254   fix_page->set_all_change_map();
255   fix_page->clear_max_gci();
256   ndbassert(fix_page->verify_change_maps(jamBuffer()));
257 }
258 
259 #ifdef VM_TRACE
260 #define do_check_page_map(x) check_page_map(x)
261 #if DBUG_PAGE_MAP
262 bool
find_page_id_in_list(Fragrecord * fragPtrP,Uint32 pageId)263 Dbtup::find_page_id_in_list(Fragrecord* fragPtrP, Uint32 pageId)
264 {
265   /* Don't use jam's here unless a jamBuf is sent in */
266   DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
267 
268   Uint32 prev = FREE_PAGE_RNIL;
269   Uint32 curr = fragPtrP->m_free_page_id_list;
270 
271   while (curr != FREE_PAGE_RNIL)
272   {
273     const Uint32 *prevPtr = map.get(2 * curr + 1);
274     ndbrequire(prevPtr != 0);
275     ndbrequire(prev == ((*prevPtr) & PAGE_BIT_MASK));
276     ndbrequire(((*prevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
277 
278     Uint32 *nextPtr = map.get(2 * curr);
279     ndbrequire(nextPtr != 0);
280     ndbrequire(((*nextPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
281 
282     if (curr == pageId)
283       return true;
284 
285     prev = curr;
286     curr = (*nextPtr);
287     curr &= PAGE_BIT_MASK;
288   }
289   return false;
290 }
291 
292 void
check_page_map(Fragrecord * fragPtrP)293 Dbtup::check_page_map(Fragrecord* fragPtrP)
294 {
295   /* Don't use jam's here unless a jamBuf is sent in */
296   Uint32 max = fragPtrP->m_max_page_cnt;
297   DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
298 
299   for (Uint32 i = 0; i<max; i++)
300   {
301     const Uint32 *ptr = map.get(2 * i);
302     if (ptr == 0)
303     {
304       ndbrequire(find_page_id_in_list(fragPtrP, i) == false);
305     }
306     else
307     {
308       if ((*ptr) == RNIL)
309       {
310         ndbrequire(find_page_id_in_list(fragPtrP, i) == false);
311       }
312       else
313       {
314         Uint32 realpid = ((*ptr) & (Uint32)~LCP_SCANNED_BIT);
315         if (realpid & FREE_PAGE_BIT)
316         {
317           ndbrequire(find_page_id_in_list(fragPtrP, i) == true);
318         }
319         else
320         {
321           PagePtr pagePtr;
322           c_page_pool.getPtr(pagePtr, realpid);
323           ndbrequire(pagePtr.p->frag_page_id == i);
324           ndbrequire(pagePtr.p->physical_page_id == realpid);
325           ndbrequire(find_page_id_in_list(fragPtrP, i) == false);
326         }
327       }
328     }
329   }
330 }
331 #else
check_page_map(Fragrecord *)332 void Dbtup::check_page_map(Fragrecord*) {}
333 #endif
334 #else
335 #define do_check_page_map(x)
336 #endif
337 
338 Uint32
getRealpidScan(Fragrecord * regFragPtr,Uint32 logicalPageId,Uint32 ** next_ptr,Uint32 ** prev_ptr)339 Dbtup::getRealpidScan(Fragrecord* regFragPtr,
340                       Uint32 logicalPageId,
341                       Uint32 **next_ptr,
342                       Uint32 **prev_ptr)
343 {
344   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
345   Uint32 * ptr = map.get_dirty(2 * logicalPageId);
346   if (ptr == 0 || (*ptr) == RNIL)
347   {
348     jam();
349     ptr = init_page_map_entry(regFragPtr, logicalPageId);
350     if (ptr == 0)
351     {
352       /**
353        * This logical page id doesn't have any reference at all in the page
354        * map. This means that it cannot have been used since the data node
355        * was started or since the fragment was created. So it can definitely
356        * not have any LCP_SCANNED_BIT set since this only happens when a
357        * page is being dropped, to be dropped a page has to be mapped and once
358        * it is mapped the map isn't removed.
359        */
360       jam();
361       *next_ptr = *prev_ptr = 0;
362       return RNIL;
363     }
364   }
365   ndbrequire(ptr != 0);
366   *next_ptr = ptr;
367   *prev_ptr = map.get_dirty(2 * logicalPageId + 1);
368   Uint32 val = *ptr;
369   ndbassert(val != RNIL);
370   if ((val & FREE_PAGE_BIT) != 0)
371   {
372     jam();
373     return RNIL;
374   }
375   else
376   {
377     jam();
378     return (val & PAGE_BIT_MASK);
379   }
380 }
381 
382 void
set_last_lcp_state(Fragrecord * regFragPtr,Uint32 logicalPageId,bool is_new_state_D)383 Dbtup::set_last_lcp_state(Fragrecord *regFragPtr,
384                           Uint32 logicalPageId,
385                           bool is_new_state_D)
386 {
387   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
388   Uint32 *ptr = map.set(2 * logicalPageId + 1);
389   ndbrequire(ptr != (Uint32*)0);
390   ndbassert((*ptr) != RNIL);
391   set_last_lcp_state(ptr, is_new_state_D);
392   do_check_page_map(regFragPtr);
393 }
394 
395 void
set_last_lcp_state(Uint32 * ptr,bool is_new_state_D)396 Dbtup::set_last_lcp_state(Uint32 *ptr, bool is_new_state_D)
397 {
398   if (unlikely(ptr == 0))
399   {
400     jam();
401     return;
402   }
403   Uint32 val = *ptr;
404   ndbassert((val & FREE_PAGE_BIT) == FREE_PAGE_BIT);
405   Uint32 new_last_lcp_state =
406     is_new_state_D ? LAST_LCP_FREE_BIT : 0;
407   val &= (Uint32)~LAST_LCP_FREE_BIT;
408   val |= new_last_lcp_state;
409   *ptr = val;
410 }
411 
412 bool
get_lcp_scanned_bit(Uint32 * next_ptr)413 Dbtup::get_lcp_scanned_bit(Uint32 *next_ptr)
414 {
415   if (next_ptr == 0)
416   {
417     jam();
418     return true;
419   }
420   if (((*next_ptr) & LCP_SCANNED_BIT) != 0)
421   {
422     jam();
423     return true;
424   }
425   jam();
426   return false;
427 }
428 
429 bool
get_lcp_scanned_bit(Fragrecord * regFragPtr,Uint32 logicalPageId)430 Dbtup::get_lcp_scanned_bit(Fragrecord *regFragPtr, Uint32 logicalPageId)
431 {
432   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
433   Uint32 *ptr = map.set(2 * logicalPageId);
434   return get_lcp_scanned_bit(ptr);
435 }
436 
437 /**
438  * Currently not used code, can be activated when we can decrease
439  * m_max_page_cnt.
440  *
441  *void
442  *Dbtup::reset_lcp_scanned_bit(Fragrecord *regFragPtr, Uint32 logicalPageId)
443  *{
444  *  DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
445  *  Uint32 *ptr = map.set(2 * logicalPageId);
446  *  ndbassert(ptr != 0);
447  *  ndbassert((*ptr) != RNIL);
448  *#ifdef DEBUG_LCP_SCANNED_BIT
449  *  if ((*ptr) & LCP_SCANNED_BIT)
450  *  {
451  *    g_eventLogger->info("(%u)tab(%u,%u):%u reset_lcp_scanned_bit",
452  *      instance(),
453  *      regFragPtr->fragTableId,
454  *      regFragPtr->fragmentId,
455  *      logicalPageId);
456  *  }
457  *#endif
458  *  *ptr = (*ptr) & (Uint32)~LCP_SCANNED_BIT;
459  *  do_check_page_map(regFragPtr);
460  *}
461  */
462 
463 void
reset_lcp_scanned_bit(Uint32 * next_ptr)464 Dbtup::reset_lcp_scanned_bit(Uint32 *next_ptr)
465 {
466   if (next_ptr == 0)
467   {
468     jam();
469     return;
470   }
471   *next_ptr = (*next_ptr) & (Uint32)~LCP_SCANNED_BIT;
472 }
473 
474 bool
get_last_lcp_state(Uint32 * prev_ptr)475 Dbtup::get_last_lcp_state(Uint32 *prev_ptr)
476 {
477   if (prev_ptr == 0)
478   {
479     jam();
480     /**
481      * If getRealpidScan returned a NULL pointer then the page
482      * definitely didn't exist at the last LCP.
483      */
484     return true;
485   }
486   if (((*prev_ptr) & LAST_LCP_FREE_BIT) != 0)
487   {
488     jam();
489     return true;
490   }
491   else
492   {
493     jam();
494     return false;
495   }
496 }
497 
498 Uint32
insert_new_page_into_page_map(EmulatedJamBuffer * jamBuf,Fragrecord * regFragPtr,PagePtr pagePtr,Uint32 noOfPagesAllocated)499 Dbtup::insert_new_page_into_page_map(EmulatedJamBuffer *jamBuf,
500                                      Fragrecord *regFragPtr,
501                                      PagePtr pagePtr,
502                                      Uint32 noOfPagesAllocated)
503 {
504   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
505   Uint32 pageId = regFragPtr->m_max_page_cnt;
506   Uint32 *ptr;
507   Uint32 *prev_ptr = 0;
508   if (pageId >= MAX_PAGES_IN_DYN_ARRAY ||
509       ((prev_ptr = map.set(2 * pageId + 1)) == 0) ||
510       ((ptr = map.set(2 * pageId)) == 0))
511   {
512     thrjam(jamBuf);
513     if (prev_ptr != 0)
514     {
515       jam();
516       *prev_ptr = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
517     }
518     returnCommonArea(pagePtr.i, noOfPagesAllocated);
519     return RNIL;
520   }
521   /**
522    * This should always get a new entry and this always is set initialised
523    * to RNIL.
524    */
525   ndbrequire(*ptr == RNIL);
526   *ptr = pagePtr.i;
527   /* Ensure LAST_LCP_FREE_BIT is initialised to 1 */
528   *prev_ptr = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
529   regFragPtr->m_max_page_cnt = pageId + 1;
530   if (DBUG_PAGE_MAP)
531   {
532     g_eventLogger->info("(%u)tab(%u,%u), new maxII: %u",
533                         instance(),
534                         regFragPtr->fragTableId,
535                         regFragPtr->fragmentId,
536                         regFragPtr->m_max_page_cnt);
537   }
538   return pageId;
539 }
540 
541 Uint32
remove_first_free_from_page_map(EmulatedJamBuffer * jamBuf,Fragrecord * regFragPtr,PagePtr pagePtr)542 Dbtup::remove_first_free_from_page_map(EmulatedJamBuffer *jamBuf,
543                                        Fragrecord *regFragPtr,
544                                        PagePtr pagePtr)
545 {
546   Uint32 pageId = regFragPtr->m_free_page_id_list;
547   DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
548   Uint32 *ptr = map.set(2 * pageId);
549   ndbrequire(ptr != 0);
550   ndbassert((*ptr) != RNIL);
551   Uint32 ptr_val = *ptr;
552   ndbrequire((ptr_val & FREE_PAGE_BIT) != 0);
553   Uint32 lcp_scanned_bit = ptr_val & LCP_SCANNED_BIT;
554   Uint32 next = ptr_val & PAGE_BIT_MASK;
555   *ptr = (pagePtr.i | lcp_scanned_bit);
556 
557 #ifdef DEBUG_LCP_SCANNED_BIT
558   if (lcp_scanned_bit)
559   {
560     g_eventLogger->info("(%u)tab(%u,%u):%u remove_first_free_from_page_map",
561                         instance(),
562                         regFragPtr->fragTableId,
563                         regFragPtr->fragmentId,
564                         pageId);
565   }
566 #endif
567 
568   if (next != FREE_PAGE_RNIL)
569   {
570     thrjam(jamBuf);
571     Uint32 * nextPrevPtr = map.set(2 * next + 1);
572     ndbrequire(nextPrevPtr != 0);
573     ndbassert((*nextPrevPtr) != RNIL);
574     ndbassert(((*nextPrevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
575     Uint32 last_lcp_free_bit = (*nextPrevPtr) & LAST_LCP_FREE_BIT;
576     *nextPrevPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT | last_lcp_free_bit;
577   }
578   regFragPtr->m_free_page_id_list = next;
579   DEB_LCP_FREE(("(%u)m_free_page_id_list(1), tab(%u,%u):%u",
580                 instance(),
581                 regFragPtr->fragTableId,
582                 regFragPtr->fragmentId,
583                 next));
584   return pageId;
585 }
586 
587 void
remove_page_id_from_dll(Fragrecord * fragPtrP,Uint32 page_no,Uint32 pagePtrI,Uint32 * ptr)588 Dbtup::remove_page_id_from_dll(Fragrecord *fragPtrP,
589                                Uint32 page_no,
590                                Uint32 pagePtrI,
591                                Uint32 *ptr)
592 {
593   DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
594   const Uint32 *prevPtr = map.set(2 * page_no + 1);
595   ndbrequire(prevPtr != 0);
596   ndbassert((*prevPtr) != RNIL);
597   ndbassert(((*prevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
598   Uint32 next = *ptr;
599   Uint32 prev = *prevPtr;
600   {
601     /**
602      * Set new entry in DynArray before list manipulations, ensure that
603      * we don't forget the LCP_SCANNED_BIT.
604      */
605     Uint32 lcp_scanned_bit = next & LCP_SCANNED_BIT;
606     *ptr = pagePtrI | lcp_scanned_bit;
607 #ifdef DEBUG_LCP_SCANNED_BIT
608     if (lcp_scanned_bit)
609     {
610       g_eventLogger->info("(%u)tab(%u,%u):%u remove_page_id_from_dll",
611                           instance(),
612                           fragPtrP->fragTableId,
613                           fragPtrP->fragmentId,
614                           page_no);
615     }
616 #endif
617   }
618   next &= PAGE_BIT_MASK;
619   prev &= PAGE_BIT_MASK;
620   if (next == FREE_PAGE_RNIL)
621   {
622     jam();
623     // This should be end of list...
624     if (prev == FREE_PAGE_RNIL)
625     {
626       jam();
627       /* page_no is both head and tail */
628       if (fragPtrP->m_free_page_id_list != page_no)
629       {
630         g_eventLogger->info("(%u)m_free_page_id_list = %u,"
631                             " tab(%u,%u):%u",
632                             instance(),
633                             fragPtrP->m_free_page_id_list,
634                             fragPtrP->fragTableId,
635                             fragPtrP->fragmentId,
636                             page_no);
637         ndbrequire(fragPtrP->m_free_page_id_list == page_no);
638       }
639       fragPtrP->m_free_page_id_list = FREE_PAGE_RNIL;
640       DEB_LCP_FREE(("(%u)m_free_page_id_list(2), tab(%u,%u):FREE_PAGE_RNIL",
641                      instance(),
642                      fragPtrP->fragTableId,
643                      fragPtrP->fragmentId));
644     }
645     else
646     {
647       jam();
648       /* page_no is tail, but not head */
649       Uint32 *prevNextPtr = map.set(2 * prev);
650       ndbrequire(prevNextPtr != 0);
651       ndbassert((*prevNextPtr) != RNIL);
652       Uint32 prevNext = *prevNextPtr;
653       ndbrequire(prevNext & FREE_PAGE_BIT);
654       Uint32 lcp_scanned_bit = prevNext & LCP_SCANNED_BIT;
655       ndbrequire((prevNext & PAGE_BIT_MASK) == page_no);
656       *prevNextPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT | lcp_scanned_bit;
657     }
658   }
659   else
660   {
661     jam();
662     Uint32 *nextPrevPtr = map.set(2 * next + 1);
663     ndbrequire(nextPrevPtr != 0);
664     ndbassert((*nextPrevPtr) != RNIL);
665     ndbassert(((*nextPrevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
666     Uint32 nextPrev = (*nextPrevPtr) & PAGE_BIT_MASK;
667     Uint32 last_lcp_free_bit = (*nextPrevPtr) & LAST_LCP_FREE_BIT;
668     ndbrequire(nextPrev == page_no);
669     *nextPrevPtr = prev | last_lcp_free_bit | FREE_PAGE_BIT;
670     if (prev == FREE_PAGE_RNIL)
671     {
672       jam();
673       /* page_no is head but not tail */
674       ndbrequire(fragPtrP->m_free_page_id_list == page_no);
675       fragPtrP->m_free_page_id_list = next;
676       DEB_LCP_FREE(("(%u)m_free_page_id_list(3), tab(%u,%u):%u",
677                      instance(),
678                      fragPtrP->fragTableId,
679                      fragPtrP->fragmentId,
680                      next));
681     }
682     else
683     {
684       jam();
685       /* page_no is neither head nor tail */
686       Uint32 *prevNextPtr = map.get(2 * prev);
687       ndbrequire(prevNextPtr != 0);
688       Uint32 prevNext = *prevNextPtr;
689       Uint32 lcp_scanned_bit = prevNext & LCP_SCANNED_BIT;
690       ndbrequire(prevNext & FREE_PAGE_BIT);
691       prevNext &= PAGE_BIT_MASK;
692       ndbrequire(prevNext == page_no);
693       *prevNextPtr = next | FREE_PAGE_BIT | lcp_scanned_bit;
694     }
695   }
696 }
697 
698 void
handle_lcp_skip_bit(EmulatedJamBuffer * jamBuf,Fragrecord * fragPtrP,PagePtr pagePtr,Uint32 page_no)699 Dbtup::handle_lcp_skip_bit(EmulatedJamBuffer *jamBuf,
700                            Fragrecord *fragPtrP,
701                            PagePtr pagePtr,
702                            Uint32 page_no)
703 {
704   Uint32 lcp_scan_ptr_i = fragPtrP->m_lcp_scan_op;
705   if (lcp_scan_ptr_i != RNIL)
706   {
707     thrjam(jamBuf);
708     DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
709     const Uint32 *ptr = map.set(2 * page_no);
710     ndbrequire(ptr != 0);
711     ndbassert((*ptr) != RNIL);
712     Uint32 lcp_scanned_bit = (*ptr) & LCP_SCANNED_BIT;
713     ScanOpPtr scanOp;
714     scanOp.i = lcp_scan_ptr_i;
715     ndbrequire(c_scanOpPool.getValidPtr(scanOp));
716     Local_key key;
717     key.m_page_no = page_no;
718     key.m_page_idx = ZNIL;
719     if (is_rowid_in_remaining_lcp_set(pagePtr.p,
720                                       fragPtrP,
721                                       key,
722                                       *scanOp.p,
723                                       2 /* Debug for LCP skip bit */))
724     {
725       thrjam(jamBuf);
726       if (lcp_scanned_bit == 0)
727       {
728         thrjam(jamBuf);
729         /**
730          * We allocated a page during an LCP, it was within the pages that
731          * will be checked during the LCP scan. The page has also not yet
732          * been scanned by the LCP. Given that we know that the page will
733          * only contain rows that would set the LCP_SKIP bit we will
734          * set the LCP skip on the page level instead to speed up LCP
735          * processing.
736          *
737          * We use this bit both for ALL ROWS pages and CHANGED ROWS pages.
738          * When we come to the scanning of this page we will decide what
739          * to do with the page whether to skip or record it as DELETE by
740          * PAGEID.
741          */
742         /* Coverage tested */
743         DEB_LCP_SKIP(("(%u)LCP_SKIP in tab(%u,%u):%u",
744                        instance(),
745                        fragPtrP->fragTableId,
746                        fragPtrP->fragmentId,
747                        page_no));
748         pagePtr.p->set_page_to_skip_lcp();
749         c_backup->alloc_page_after_lcp_start(page_no);
750       }
751       else
752       {
753         jam();
754         /* Coverage tested */
755         /**
756          * The page had already been handled since it had been dropped
757          * after LCP start and is now allocated again still before the
758          * LCP scan reached it. No need to do anything since its LCP
759          * scanning was handled at drop time.
760          */
761       }
762     }
763     else
764     {
765       if (lcp_scanned_bit)
766       {
767         g_eventLogger->info("(%u):lcp_scanned_bit crash on tab(%u,%u):%u",
768                             instance(),
769                             fragPtrP->fragTableId,
770                             fragPtrP->fragmentId,
771                             page_no);
772       }
773       ndbrequire(lcp_scanned_bit == 0);
774     }
775   }
776 }
777 
778 void
handle_new_page(EmulatedJamBuffer * jamBuf,Fragrecord * fragPtrP,Tablerec * tabPtrP,PagePtr pagePtr,Uint32 page_no)779 Dbtup::handle_new_page(EmulatedJamBuffer *jamBuf,
780                        Fragrecord *fragPtrP,
781                        Tablerec* tabPtrP,
782                        PagePtr pagePtr,
783                        Uint32 page_no)
784 {
785   DEB_LCP_ALLOC(("(%u)allocFragPage: tab(%u,%u) page(%u)",
786                  instance(),
787                  fragPtrP->fragTableId,
788                  fragPtrP->fragmentId,
789                  page_no));
790   c_page_pool.getPtr(pagePtr);
791   init_page(fragPtrP, pagePtr, page_no);
792   handle_lcp_skip_bit(jamBuf, fragPtrP, pagePtr, page_no);
793   convertThPage((Fix_page*)pagePtr.p, tabPtrP, MM);
794   {
795     LocalDLFifoList<Page_pool> free_pages(c_page_pool, fragPtrP->thFreeFirst);
796     pagePtr.p->page_state = ZTH_MM_FREE;
797     free_pages.addFirst(pagePtr);
798   }
799   if (DBUG_PAGE_MAP)
800   {
801     g_eventLogger->info("(%u)tab(%u,%u):%u alloc -> (%u max: %u)",
802                         instance(),
803                         fragPtrP->fragTableId,
804                         fragPtrP->fragmentId,
805                         page_no,
806                         pagePtr.i,
807                         fragPtrP->m_max_page_cnt);
808   }
809 
810   do_check_page_map(fragPtrP);
811 }
812 
813 Uint32
allocFragPage(EmulatedJamBuffer * jamBuf,Uint32 * err,Fragrecord * regFragPtr,Tablerec * regTabPtr)814 Dbtup::allocFragPage(EmulatedJamBuffer* jamBuf,
815                      Uint32 * err,
816                      Fragrecord* regFragPtr,
817                      Tablerec *regTabPtr)
818 {
819   PagePtr pagePtr;
820   Uint32 noOfPagesAllocated = 0;
821   Uint32 list = regFragPtr->m_free_page_id_list;
822 
823   allocConsPages(jamBuf, 1, noOfPagesAllocated, pagePtr.i);
824   if (noOfPagesAllocated == 0)
825   {
826     thrjam(jamBuf);
827     * err = ZMEM_NOMEM_ERROR;
828     return RNIL;
829   }//if
830 
831   Uint32 pageId;
832   if (list == FREE_PAGE_RNIL)
833   {
834     thrjam(jamBuf);
835     pageId = insert_new_page_into_page_map(jamBuf,
836                                            regFragPtr,
837                                            pagePtr,
838                                            noOfPagesAllocated);
839     DEB_LCP(("(%u)allocFragPage(1): tab(%u,%u):%u",
840             instance(),
841             regFragPtr->fragTableId,
842             regFragPtr->fragmentId,
843             pageId));
844     if (pageId == RNIL)
845     {
846       thrjam(jamBuf);
847       * err = ZMEM_NOMEM_ERROR;
848       return RNIL;
849     }
850   }
851   else
852   {
853     thrjam(jamBuf);
854     pageId = remove_first_free_from_page_map(jamBuf, regFragPtr, pagePtr);
855     DEB_LCP(("(%u)allocFragPage(2): tab(%u,%u):%u",
856             instance(),
857             regFragPtr->fragTableId,
858             regFragPtr->fragmentId,
859             pageId));
860   }
861   if (DBUG_PAGE_MAP)
862   {
863     DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
864     Uint32 *ptr = map.set(2 * pageId);
865     ndbrequire(ptr != 0);
866     ndbassert((*ptr) != RNIL);
867     g_eventLogger->info("(%u)tab(%u,%u) allocRI(%u %u max: %u next: %x)",
868                         instance(),
869                         regFragPtr->fragTableId,
870                         regFragPtr->fragmentId,
871                         pageId,
872                         pagePtr.i,
873                         regFragPtr->m_max_page_cnt,
874                         *ptr);
875   }
876   regFragPtr->noOfPages++;
877   handle_new_page(jamBuf, regFragPtr, regTabPtr, pagePtr, pageId);
878   return pagePtr.i;
879 }//Dbtup::allocFragPage()
880 
881 Uint32
allocFragPage(Uint32 * err,Tablerec * tabPtrP,Fragrecord * fragPtrP,Uint32 page_no)882 Dbtup::allocFragPage(Uint32 * err,
883                      Tablerec* tabPtrP, Fragrecord* fragPtrP, Uint32 page_no)
884 {
885   PagePtr pagePtr;
886   ndbrequire(page_no < MAX_PAGES_IN_DYN_ARRAY);
887   DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
888   DEB_LCP(("(%u)allocFragPage(3): tab(%u,%u):%u",
889           instance(),
890           fragPtrP->fragTableId,
891           fragPtrP->fragmentId,
892           page_no));
893   Uint32 *prev_ptr = map.set(2 * page_no + 1);
894   if (unlikely(prev_ptr == 0))
895   {
896     jam();
897     *err = ZMEM_NOMEM_ERROR;
898     return RNIL;
899   }
900   Uint32 * ptr = map.set(2 * page_no);
901   if (unlikely(ptr == 0))
902   {
903     jam();
904     *prev_ptr = FREE_PAGE_RNIL | LAST_LCP_FREE_BIT;
905     * err = ZMEM_NOMEM_ERROR;
906     return RNIL;
907   }
908   pagePtr.i = * ptr;
909   if (likely(pagePtr.i != RNIL && (pagePtr.i & FREE_PAGE_BIT) == 0))
910   {
911     jam();
912     return (pagePtr.i & PAGE_BIT_MASK);
913   }
914 
915   Uint32 noOfPagesAllocated = 0;
916   allocConsPages(jamBuffer(), 1, noOfPagesAllocated, pagePtr.i);
917   if (unlikely(noOfPagesAllocated == 0))
918   {
919     jam();
920     * err = ZMEM_NOMEM_ERROR;
921     return RNIL;
922   }
923 
924   if ((*ptr) == RNIL)
925   {
926     /**
927      * DynArr256 delivered a fresh new entry, so no flags are initialised
928      * and we will treat it as if it returned FREE_PAGE_BIT set,
929      * LCP_SCANNED_BIT not set, but also not in any free list, so no need
930      * to remove it from the doubly linked list. We will make use of entry
931      * for a new page, so we don't set the FREE_PAGE_BIT either, we simply
932      * insert the page pointer.
933      *
934      * Since it is the first occurrence of the page we initialise that the
935      * page was free at the last LCP. We always need to set the FREE_PAGE_BIT
936      * also to ensure that drop fragment doesn't drop things from the
937      * prev_ptr position.
938      */
939     jam();
940     *ptr = pagePtr.i;
941     *prev_ptr = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
942   }
943   else
944   {
945     jam();
946     /**
947      * This page id was in the doubly linked list free list, we need to remove
948      * it from this list.
949      */
950     remove_page_id_from_dll(fragPtrP, page_no, pagePtr.i, ptr);
951   }
952   if (DBUG_PAGE_MAP)
953   {
954     g_eventLogger->info("(%u)tab(%u,%u):%u alloc(%u max: %u next: %x)",
955                         instance(),
956                         fragPtrP->fragTableId,
957                         fragPtrP->fragmentId,
958                         page_no,
959                         pagePtr.i,
960                         fragPtrP->m_max_page_cnt,
961                         *ptr);
962   }
963   Uint32 max = fragPtrP->m_max_page_cnt;
964   fragPtrP->noOfPages++;
965 
966   if (page_no + 1 > max)
967   {
968     jam();
969     fragPtrP->m_max_page_cnt = page_no + 1;
970     if (DBUG_PAGE_MAP)
971     {
972       g_eventLogger->info("(%u)tab(%u,%u) new max: %u",
973                           instance(),
974                           fragPtrP->fragTableId,
975                           fragPtrP->fragmentId,
976                           fragPtrP->m_max_page_cnt);
977     }
978   }
979   handle_new_page(jamBuffer(), fragPtrP, tabPtrP, pagePtr, page_no);
980   return pagePtr.i;
981 }
982 
983 void
releaseFragPage(Fragrecord * fragPtrP,Uint32 logicalPageId,PagePtr pagePtr)984 Dbtup::releaseFragPage(Fragrecord* fragPtrP,
985                        Uint32 logicalPageId,
986                        PagePtr pagePtr)
987 {
988   DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
989   DEB_LCP_REL(("(%u)releaseFragPage: tab(%u,%u) page(%u)",
990                instance(),
991                fragPtrP->fragTableId,
992                fragPtrP->fragmentId,
993                logicalPageId));
994   Uint32 *next = map.set(2 * logicalPageId);
995   Uint32 *prev = map.set(2 * logicalPageId + 1);
996   ndbrequire(next != 0 && prev != 0);
997   ndbassert(((*prev) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
998 
999   bool page_freed = false;
1000   Uint32 lcp_scanned_bit = (*next) & LCP_SCANNED_BIT;
1001   Uint32 last_lcp_state = (*prev) & LAST_LCP_FREE_BIT;
1002   Uint32 lcp_scan_ptr_i = fragPtrP->m_lcp_scan_op;
1003   bool lcp_to_scan = false;
1004   if (lcp_scan_ptr_i != RNIL)
1005   {
1006     /**
1007      * We use the method is_rowid_in_remaining_lcp_set. We set the
1008      * key to the page and beyond the last row in the page. This means
1009      * that if the page is not fully scanned yet we will set the
1010      * LCP_SCANNED_BIT, in this case we might have already handled
1011      * some of the rows, so there is a small probability that we will
1012      * duplicate some DELETE BY ROWID, but it should only have a minor
1013      * performance impact. Otherwise we will ignore it.
1014      */
1015     ScanOpPtr scanOp;
1016     Local_key key;
1017     scanOp.i = lcp_scan_ptr_i;
1018     ndbrequire(c_scanOpPool.getValidPtr(scanOp));
1019     key.m_page_no = logicalPageId;
1020     key.m_page_idx = ZNIL;
1021     if (is_rowid_in_remaining_lcp_set(pagePtr.p,
1022                                       fragPtrP,
1023                                       key,
1024                                       *scanOp.p,
1025                                       1 /* Debug for LCP scanned bit */) ||
1026         pagePtr.p->is_page_to_skip_lcp())
1027     {
1028       jam();
1029       lcp_to_scan = true;
1030       if (lcp_scanned_bit == 0)
1031       {
1032         jam();
1033         /**
1034          * Page is being dropped and it is part of LCP and has not
1035          * yet been scanned by LCP. This means we need to act right
1036          * now before we release the page and record the needed
1037          * information. Also we haven't already dropped the page
1038          * already before in this LCP scan.
1039          *
1040          * is_rowid_in_remaining_lcp_set is normally false when
1041          * is_page_to_skip_lcp is true. The problem however is that
1042          * this is a state on the page, and the page is being dropped,
1043          * so in this case we need to ensure that the page is not
1044          * containing any row at restore. We ensure this by using
1045          * a DELETE BY PAGEID in this case. We also flag that the
1046          * page have been scanned for LCP in the page map. It is
1047          * possible to arrive here after allocate, drop, allocate
1048          * and drop again. In this case the LCP scanned bit will
1049          * still be set and we can ignore the page.
1050          *
1051          * We will release the page during handle_lcp_drop_change
1052          * to ensure that we are certain to get the space we
1053          * need to store the space needed to store things into the
1054          * LCP keep list.
1055          *
1056          * If the SKIP_LCP flag was set on the page then this page
1057          * was added since the start of the LCP and in that case
1058          * we record the page as a DELETE by PAGEID and then sets
1059          * the LCP_SCANNED_BIT to ensure that any further allocation
1060          * and release of the fragment before LCP scan has passed it
1061          * is ignored.
1062          *
1063          * For ALL ROWS pages the LCP scan is always completed in
1064          * this state, all the rows existing at start of LCP has been
1065          * deleted and all of those were put into LCP through the LCP
1066          * keep list. So we ensure that the page is ignored for the
1067          * rest of the LCP scan by setting the LCP_SCANNED_BIT here
1068          * as well.
1069          *
1070          * No need to clear the page to skip lcp flag here since the
1071          * page is dropped immediately following this.
1072          *
1073          * If last page state was D the page will be empty at the
1074          * previous LCP, so this means that there is no need to
1075          * delete rows that won't be there. There is not really any
1076          * problem in performing deletes in this case, but it would
1077          * cause unnecessary work. The rows that was present in the
1078          * page at start of LCP have already been handled through
1079          * LCP keep list.
1080          *
1081          * If last state was A there was a set of rows installed by
1082          * the previous LCP, some of those rows will remain if we
1083          * don't ensure that they are removed. So in this case we
1084          * remove all rows on the page that hasn't got the LCP_SKIP
1085          * flag set. Those rows with this flag set have been handled
1086          * by the LCP keep list before arriving here.
1087          */
1088         bool is_change_part = c_backup->is_change_part_state(logicalPageId);
1089         DEB_LCP_SCANNED_BIT(("(%u)Set lcp_scanned_bit on tab(%u,%u):%u",
1090                              instance(),
1091                              fragPtrP->fragTableId,
1092                              fragPtrP->fragmentId,
1093                              logicalPageId));
1094         lcp_scanned_bit = LCP_SCANNED_BIT;
1095         bool page_to_skip_lcp = pagePtr.p->is_page_to_skip_lcp();
1096         Uint32 new_last_lcp_state;
1097         if (page_to_skip_lcp)
1098         {
1099           new_last_lcp_state = LAST_LCP_FREE_BIT;
1100           c_backup->alloc_dropped_page_after_lcp_start(is_change_part);
1101         }
1102         else
1103         {
1104           new_last_lcp_state = 0;
1105           c_backup->dropped_page_after_lcp_start(is_change_part,
1106                                                  (last_lcp_state == 0));
1107         }
1108         if (is_change_part && (last_lcp_state == 0))
1109         {
1110           /**
1111            * Page is a change page and last LCP state was A.
1112            * We set page_freed to true, the reason is that we're
1113            * "stealing" the page to be deleted for use by the
1114            * LCP keep free list. This removes any possibility that
1115            * we will run out of memory for this operation.
1116            *
1117            * The page is removed later when the LCP keep list operation
1118            * is completed.
1119            */
1120           jam();
1121           /* Coverage tested */
1122           c_page_pool.getPtr(pagePtr);
1123           bool delete_by_pageid = page_to_skip_lcp;
1124           page_freed = true;
1125           ndbrequire(c_backup->is_partial_lcp_enabled());
1126           handle_lcp_drop_change_page(fragPtrP,
1127                                       logicalPageId,
1128                                       pagePtr,
1129                                       delete_by_pageid);
1130         }
1131         else
1132         {
1133           /* Coverage tested */
1134           DEB_LCP_REL(("(%u) change_part: %u, last_lcp_state: %u "
1135                     "in tab(%u,%u) page(%u)",
1136                    instance(),
1137                    is_change_part,
1138                    last_lcp_state,
1139                    fragPtrP->fragTableId,
1140                    fragPtrP->fragmentId,
1141                    logicalPageId));
1142         }
1143         last_lcp_state = new_last_lcp_state;
1144       }
1145       else
1146       {
1147         DEB_LCP_REL(("(%u)lcp_scanned_bit already set when page released"
1148                      "in tab(%u,%u) page(%u)",
1149                      instance(),
1150                      fragPtrP->fragTableId,
1151                      fragPtrP->fragmentId,
1152                      logicalPageId));
1153         /* Coverage tested */
1154       }
1155     }
1156     else
1157     {
1158       /* Coverage tested */
1159     }
1160   }
1161   if (!lcp_to_scan)
1162   {
1163     if (unlikely(lcp_scanned_bit != 0))
1164     {
1165       g_eventLogger->info("(%u)tab(%u,%u):%u crash lcp_scanned_bit set",
1166                           instance(),
1167                           fragPtrP->fragTableId,
1168                           fragPtrP->fragmentId,
1169                           logicalPageId);
1170       ndbrequire(lcp_scanned_bit == 0);
1171     }
1172   }
1173   if (!page_freed)
1174   {
1175     jam();
1176     returnCommonArea(pagePtr.i, 1);
1177   }
1178 
1179 #ifdef DEBUG_LCP_SCANNED_BIT
1180   if (lcp_scanned_bit)
1181   {
1182     g_eventLogger->info("(%u)tab(%u,%u):%u set lcp_scanned_bit",
1183                         instance(),
1184                         fragPtrP->fragTableId,
1185                         fragPtrP->fragmentId,
1186                         logicalPageId);
1187 
1188   }
1189 #endif
1190 
1191   const char *where = insert_free_page_id_list(fragPtrP,
1192                                                logicalPageId,
1193                                                next,
1194                                                prev,
1195                                                lcp_scanned_bit,
1196                                                last_lcp_state);
1197   fragPtrP->noOfPages--;
1198   if (DBUG_PAGE_MAP)
1199   {
1200     g_eventLogger->info("(%u)tab(%u,%u):%u release(%u)@%s",
1201                         instance(),
1202                         fragPtrP->fragTableId,
1203                         fragPtrP->fragmentId,
1204                         logicalPageId,
1205                         pagePtr.i,
1206                         where);
1207   }
1208   do_check_page_map(fragPtrP);
1209 }
1210 
1211 const char*
insert_free_page_id_list(Fragrecord * fragPtrP,Uint32 logicalPageId,Uint32 * next,Uint32 * prev,Uint32 lcp_scanned_bit,Uint32 last_lcp_state)1212 Dbtup::insert_free_page_id_list(Fragrecord *fragPtrP,
1213                                 Uint32 logicalPageId,
1214                                 Uint32 *next,
1215                                 Uint32 *prev,
1216                                 Uint32 lcp_scanned_bit,
1217                                 Uint32 last_lcp_state)
1218 {
1219   /**
1220    * Add to head or tail of list...
1221    */
1222   DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
1223   Uint32 list = fragPtrP->m_free_page_id_list;
1224   const char * where = 0;
1225   if (list == FREE_PAGE_RNIL)
1226   {
1227     jam();
1228     *next = FREE_PAGE_RNIL | FREE_PAGE_BIT | lcp_scanned_bit;
1229     *prev = FREE_PAGE_RNIL | FREE_PAGE_BIT | last_lcp_state;
1230     fragPtrP->m_free_page_id_list = logicalPageId;
1231     DEB_LCP_FREE(("(%u)m_free_page_id_list(4), tab(%u,%u):%u",
1232                   instance(),
1233                   fragPtrP->fragTableId,
1234                   fragPtrP->fragmentId,
1235                   logicalPageId));
1236     where = (const char*)"empty";
1237   }
1238   else
1239   {
1240     jam();
1241     *next = list | FREE_PAGE_BIT | lcp_scanned_bit;
1242     *prev = FREE_PAGE_RNIL | FREE_PAGE_BIT | last_lcp_state;
1243     fragPtrP->m_free_page_id_list = logicalPageId;
1244     DEB_LCP_FREE(("(%u)m_free_page_id_list(5), tab(%u,%u):%u",
1245                   instance(),
1246                   fragPtrP->fragTableId,
1247                   fragPtrP->fragmentId,
1248                   logicalPageId));
1249     Uint32 *nextPrevPtr = map.set(2 * list + 1);
1250     ndbrequire(nextPrevPtr != 0);
1251     ndbassert((*nextPrevPtr) != RNIL);
1252     Uint32 nextPrev = *nextPrevPtr;
1253     Uint32 this_last_lcp_state = nextPrev & LAST_LCP_FREE_BIT;
1254     ndbrequire(((*nextPrevPtr) & PAGE_BIT_MASK) == FREE_PAGE_RNIL);
1255     *nextPrevPtr = logicalPageId | FREE_PAGE_BIT | this_last_lcp_state;
1256     where = (const char*)"head";
1257   }
1258   return where;
1259 }
1260 
errorHandler(Uint32 errorCode)1261 void Dbtup::errorHandler(Uint32 errorCode)
1262 {
1263   switch (errorCode) {
1264   case 0:
1265     jam();
1266     break;
1267   case 1:
1268     jam();
1269     break;
1270   case 2:
1271     jam();
1272     break;
1273   default:
1274     jam();
1275   }
1276   ndbabort();
1277 }//Dbtup::errorHandler()
1278 
1279 void
rebuild_page_free_list(Signal * signal)1280 Dbtup::rebuild_page_free_list(Signal* signal)
1281 {
1282   Ptr<Fragoperrec> fragOpPtr;
1283   fragOpPtr.i = signal->theData[1];
1284   Uint32 pageId = signal->theData[2];
1285   Uint32 tail = signal->theData[3];
1286   ptrCheckGuard(fragOpPtr, cnoOfFragoprec, fragoperrec);
1287 
1288   Ptr<Fragrecord> fragPtr;
1289   fragPtr.i= fragOpPtr.p->fragPointer;
1290   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1291 
1292   if (pageId == fragPtr.p->m_max_page_cnt)
1293   {
1294     do_check_page_map(fragPtr.p);
1295     RestoreLcpConf* conf = (RestoreLcpConf*)signal->getDataPtrSend();
1296     conf->senderRef = reference();
1297     conf->senderData = fragOpPtr.p->m_senderData;
1298     conf->restoredLcpId = fragOpPtr.p->m_restoredLcpId;
1299     conf->restoredLocalLcpId = fragOpPtr.p->m_restoredLocalLcpId;
1300     conf->maxGciCompleted = fragOpPtr.p->m_maxGciCompleted;
1301     conf->afterRestore = 1;
1302     sendSignal(fragOpPtr.p->m_senderRef,
1303 	       GSN_RESTORE_LCP_CONF, signal,
1304 	       RestoreLcpConf::SignalLength, JBB);
1305 
1306     releaseFragoperrec(fragOpPtr);
1307     return;
1308   }
1309 
1310   DynArr256 map(c_page_map_pool, fragPtr.p->m_page_map);
1311   Uint32 *nextPtr = map.set(2 * pageId);
1312   Uint32 *prevPtr = map.set(2 * pageId + 1);
1313 
1314   // Out of memory ?? Should not be possible here/now
1315   ndbrequire(nextPtr != 0 && prevPtr != 0);
1316 
1317   /**
1318    * This is called as part of restore, the pages that are defined here
1319    * should have the LAST_LCP_FREE_BIT initialised to 0, those that are
1320    * not yet allocated should have their state initialised to 1.
1321    *
1322    * LAST_LCP_FREE_BIT indicates state at last LCP, at restart the last
1323    * LCP is the restore and this has now been completed. So after this
1324    * we need to have a well defined state of LAST_LCP_FREE.
1325    * Later allocations of new page ids are always assumed to not be part
1326    * of the last LCP.
1327    *
1328    * Using Partial LCP the restore process can call releaseFragPage.
1329    * In this case *nextPtr isn't equal to RNIL, but FREE_PAGE_BIT of
1330    * *nextPagePtr is set instead. So need to check for either of those
1331    * two events.
1332    */
1333   if (*nextPtr == RNIL || ((((*nextPtr) & FREE_PAGE_BIT) != 0)))
1334   {
1335     jam();
1336     /**
1337      * An unallocated page id...put in free list
1338      */
1339 
1340 #if DBUG_PAGE_MAP
1341     const char * where;
1342 #endif
1343     if (tail == RNIL)
1344     {
1345       jam();
1346       ndbrequire(fragPtr.p->m_free_page_id_list == FREE_PAGE_RNIL);
1347       fragPtr.p->m_free_page_id_list = pageId;
1348       DEB_LCP_FREE(("(%u)m_free_page_id_list(6), tab(%u,%u):%u",
1349                     instance(),
1350                     fragPtr.p->fragTableId,
1351                     fragPtr.p->fragmentId,
1352                     pageId));
1353       *nextPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT;
1354       *prevPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
1355 #if DBUG_PAGE_MAP
1356       where = "head";
1357 #endif
1358     }
1359     else
1360     {
1361       jam();
1362       ndbrequire(fragPtr.p->m_free_page_id_list != FREE_PAGE_RNIL);
1363 
1364       *nextPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT;
1365       *prevPtr = tail | FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
1366 
1367       Uint32 * prevNextPtr = map.set(2 * tail);
1368       ndbrequire(prevNextPtr != 0);
1369       ndbrequire((*prevNextPtr) == (FREE_PAGE_RNIL | FREE_PAGE_BIT));
1370       *prevNextPtr = pageId | FREE_PAGE_BIT;
1371 #if DBUG_PAGE_MAP
1372       where = "tail";
1373 #endif
1374       DEB_LCP_FREE(("(%u)tab(%u,%u):%u into free page id list",
1375                     instance(),
1376                     fragPtr.p->fragTableId,
1377                     fragPtr.p->fragmentId,
1378                     pageId));
1379     }
1380     tail = pageId;
1381 #if DBUG_PAGE_MAP
1382     g_eventLogger->info("(%u)tab(%u,%u):%u adding page to free list @ %s",
1383                         instance(),
1384                         fragPtr.p->fragTableId,
1385                         fragPtr.p->fragmentId,
1386                         pageId,
1387                         where);
1388 #endif
1389   }
1390   else
1391   {
1392     jam();
1393     /* Clear LAST_LCP_FREE_BIT and set FREE_PAGE_BIT */
1394     DEB_LCP_FREE(("(%u)tab(%u,%u):%u, next: %x, prev: %x",
1395                    instance(),
1396                    fragPtr.p->fragTableId,
1397                    fragPtr.p->fragmentId,
1398                    pageId,
1399                    *nextPtr,
1400                    *prevPtr));
1401     *prevPtr = (*prevPtr) & PAGE_BIT_MASK;
1402     *prevPtr = (*prevPtr) | FREE_PAGE_BIT;
1403   }
1404 
1405   signal->theData[0] = ZREBUILD_FREE_PAGE_LIST;
1406   signal->theData[1] = fragOpPtr.i;
1407   signal->theData[2] = pageId + 1;
1408   signal->theData[3] = tail;
1409   sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
1410 }
1411