1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #define DBTUP_C
27 #define DBTUP_PAGE_MAP_CPP
28 #include "Dbtup.hpp"
29 #include <RefConvert.hpp>
30 #include <ndb_limits.h>
31 #include <pc.hpp>
32 #include <signaldata/RestoreImpl.hpp>
33 #include "../backup/Backup.hpp"
34
35 #define JAM_FILE_ID 415
36
37 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
38 //#define DEBUG_LCP 1
39 //#define DEBUG_LCP_REL 1
40 //#define DEBUG_LCP_ALLOC 1
41 //#define DEBUG_LCP_FREE 1
42 //#define DEBUG_LCP_SKIP 1
43 //#define DEBUG_LCP_SCANNED_BIT 1
44 #endif
45
46 #ifdef DEBUG_LCP
47 #define DEB_LCP(arglist) do { g_eventLogger->info arglist ; } while (0)
48 #else
49 #define DEB_LCP(arglist) do { } while (0)
50 #endif
51
52 #ifdef DEBUG_LCP_REL
53 #define DEB_LCP_REL(arglist) do { g_eventLogger->info arglist ; } while (0)
54 #else
55 #define DEB_LCP_REL(arglist) do { } while (0)
56 #endif
57
58 #ifdef DEBUG_LCP_ALLOC
59 #define DEB_LCP_ALLOC(arglist) do { g_eventLogger->info arglist ; } while (0)
60 #else
61 #define DEB_LCP_ALLOC(arglist) do { } while (0)
62 #endif
63
64 #ifdef DEBUG_LCP_FREE
65 #define DEB_LCP_FREE(arglist) do { g_eventLogger->info arglist ; } while (0)
66 #else
67 #define DEB_LCP_FREE(arglist) do { } while (0)
68 #endif
69
70 #ifdef DEBUG_LCP_SKIP
71 #define DEB_LCP_SKIP(arglist) do { g_eventLogger->info arglist ; } while (0)
72 #else
73 #define DEB_LCP_SKIP(arglist) do { } while (0)
74 #endif
75
76 #ifdef DEBUG_LCP_SCANNED_BIT
77 #define DEB_LCP_SCANNED_BIT(arglist) \
78 do { g_eventLogger->info arglist ; } while (0)
79 #else
80 #define DEB_LCP_SCANNED_BIT(arglist) do { } while (0)
81 #endif
82
83 #define DBUG_PAGE_MAP 0
84
85 //
86 // PageMap is a service used by Dbtup to map logical page id's to physical
87 // page id's. The mapping is needs the fragment and the logical page id to
88 // provide the physical id.
89 //
90 // This is a part of Dbtup which is the exclusive user of a certain set of
91 // variables on the fragment record and it is the exclusive user of the
92 // struct for page ranges.
93 //
94 // The use of the fragment page map is described in some detail in Backup.cpp
95 // as part of the LCP description. We use 2 bits for important state info on
96 // this and the previous LCP state for a page.
97 //
98 // The following methods operate on the data handled by the page map class.
99 //
100 // Public methods
101 // insertPageRange(Uint32 startPageId, # In
102 // Uint32 noPages) # In
103 // Inserts a range of pages into the mapping structure.
104 //
105 // void releaseFragPage()
106 // Releases a page belonging to a fragment.
107 //
108 // Uint32 allocFragPages(Uint32 tafpNoAllocRequested)
109 // Allocate a set of pages to the fragment from the page manager
110 //
111 // Uint32 getEmptyPage()
112 // Get an empty page from the pool of empty pages on the fragment.
113 // It returns the physical page id of the empty page.
114 // Returns RNIL if no empty page is available.
115 //
116 // Uint32 getRealpid(Uint32 logicalPageId)
117 // Return the physical page id provided the logical page id
118 //
119 // void initializePageRange()
120 // Initialise free list of page ranges and initialise the page raneg records.
121 //
122 // void initFragRange()
123 // Initialise the fragment variables when allocating a fragment to a table.
124 //
125 // void initPageRangeSize(Uint32 size)
126 // Initialise the number of page ranges.
127 //
128 // Uint32 getNoOfPages()
129 // Get the number of pages on the fragment currently.
130 //
131 //
132 // Private methods
133 // Uint32 leafPageRangeFull(PageRangePtr currPageRangePtr)
134 //
135 // void errorHandler()
136 // Method to crash NDB kernel in case of weird data set-up
137 //
138 // void allocMoreFragPages()
139 // When no more empty pages are attached to the fragment and we need more
140 // we allocate more pages from the page manager using this method.
141 //
142 // Private data
143 // On the fragment record
144 // currentPageRange # The current page range where to insert the next range
145 // rootPageRange # The root of the page ranges owned
146 // nextStartRange # The next page id to assign when expanding the
147 // # page map
148 // noOfPages # The number of pages in the fragment
149 // emptyPrimPage # The first page of the empty pages in the fragment
150 //
151 // The full page range struct
152
153 Uint32*
init_page_map_entry(Fragrecord * regFragPtr,Uint32 logicalPageId)154 Dbtup::init_page_map_entry(Fragrecord *regFragPtr, Uint32 logicalPageId)
155 {
156 DEB_LCP(("(%u)init_page_map_entry tab(%u,%u):%u",
157 instance(),
158 regFragPtr->fragTableId,
159 regFragPtr->fragmentId,
160 logicalPageId));
161 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
162 Uint32 *prev_ptr = map.set(2 * logicalPageId + 1);
163 if (prev_ptr == 0)
164 {
165 jam();
166 return 0;
167 }
168 Uint32 *ptr = map.set(2 * logicalPageId);
169 if (ptr == 0)
170 {
171 jam();
172 (*prev_ptr) = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
173 return 0;
174 }
175 if (logicalPageId >= regFragPtr->m_max_page_cnt)
176 {
177 jam();
178 regFragPtr->m_max_page_cnt = logicalPageId + 1;
179 if (DBUG_PAGE_MAP)
180 {
181 g_eventLogger->info("(%u)allocIP: tab(%u,%u), new max: %u",
182 instance(),
183 regFragPtr->fragTableId,
184 regFragPtr->fragmentId,
185 regFragPtr->m_max_page_cnt);
186 }
187 }
188 (void)insert_free_page_id_list(regFragPtr,
189 logicalPageId,
190 ptr,
191 prev_ptr,
192 Uint32(0),
193 Uint32(0));
194 return map.get_dirty(2 * logicalPageId);
195 }
196
getRealpid(Fragrecord * regFragPtr,Uint32 logicalPageId)197 Uint32 Dbtup::getRealpid(Fragrecord* regFragPtr, Uint32 logicalPageId)
198 {
199 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
200 Uint32 *ptr = map.get(2 * logicalPageId);
201 if (likely(ptr != 0))
202 {
203 ndbrequire((*ptr) != RNIL)
204 return ((*ptr) & PAGE_BIT_MASK);
205 }
206 ndbabort();
207 return RNIL;
208 }
209
210 Uint32
getRealpidCheck(Fragrecord * regFragPtr,Uint32 logicalPageId)211 Dbtup::getRealpidCheck(Fragrecord* regFragPtr, Uint32 logicalPageId)
212 {
213 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
214 // logicalPageId might not be mapped yet,
215 // get_dirty returns NULL also in debug in this case.
216 Uint32 *ptr = map.get_dirty(2 * logicalPageId);
217 if (ptr == 0)
218 {
219 jam();
220 ptr = init_page_map_entry(regFragPtr, logicalPageId);
221 }
222 if (likely(ptr != 0))
223 {
224 Uint32 val = *ptr;
225 if ((val & FREE_PAGE_BIT) != 0)
226 return RNIL;
227 else
228 return (val & PAGE_BIT_MASK);
229 }
230 return RNIL;
231 }
232
getNoOfPages(Fragrecord * const regFragPtr)233 Uint32 Dbtup::getNoOfPages(Fragrecord* const regFragPtr)
234 {
235 return regFragPtr->noOfPages;
236 }//Dbtup::getNoOfPages()
237
238 void
init_page(Fragrecord * regFragPtr,PagePtr pagePtr,Uint32 pageId)239 Dbtup::init_page(Fragrecord* regFragPtr, PagePtr pagePtr, Uint32 pageId)
240 {
241 pagePtr.p->page_state = ~0;
242 pagePtr.p->frag_page_id = pageId;
243 pagePtr.p->physical_page_id = pagePtr.i;
244 pagePtr.p->nextList = RNIL;
245 pagePtr.p->prevList = RNIL;
246 pagePtr.p->m_flags = 0;
247 Tup_fixsize_page* fix_page = (Tup_fixsize_page*)pagePtr.p;
248 /**
249 * A new page is required to be fully scanned the first LCP after
250 * allocation to ensure that we generate DELETE BY ROWID for all
251 * positions that are not yet inserted into, this ensures that
252 * we don't leave deleted rows in change pages after an LCP.
253 */
254 fix_page->set_all_change_map();
255 fix_page->clear_max_gci();
256 ndbassert(fix_page->verify_change_maps(jamBuffer()));
257 }
258
259 #ifdef VM_TRACE
260 #define do_check_page_map(x) check_page_map(x)
261 #if DBUG_PAGE_MAP
262 bool
find_page_id_in_list(Fragrecord * fragPtrP,Uint32 pageId)263 Dbtup::find_page_id_in_list(Fragrecord* fragPtrP, Uint32 pageId)
264 {
265 /* Don't use jam's here unless a jamBuf is sent in */
266 DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
267
268 Uint32 prev = FREE_PAGE_RNIL;
269 Uint32 curr = fragPtrP->m_free_page_id_list;
270
271 while (curr != FREE_PAGE_RNIL)
272 {
273 const Uint32 *prevPtr = map.get(2 * curr + 1);
274 ndbrequire(prevPtr != 0);
275 ndbrequire(prev == ((*prevPtr) & PAGE_BIT_MASK));
276 ndbrequire(((*prevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
277
278 Uint32 *nextPtr = map.get(2 * curr);
279 ndbrequire(nextPtr != 0);
280 ndbrequire(((*nextPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
281
282 if (curr == pageId)
283 return true;
284
285 prev = curr;
286 curr = (*nextPtr);
287 curr &= PAGE_BIT_MASK;
288 }
289 return false;
290 }
291
292 void
check_page_map(Fragrecord * fragPtrP)293 Dbtup::check_page_map(Fragrecord* fragPtrP)
294 {
295 /* Don't use jam's here unless a jamBuf is sent in */
296 Uint32 max = fragPtrP->m_max_page_cnt;
297 DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
298
299 for (Uint32 i = 0; i<max; i++)
300 {
301 const Uint32 *ptr = map.get(2 * i);
302 if (ptr == 0)
303 {
304 ndbrequire(find_page_id_in_list(fragPtrP, i) == false);
305 }
306 else
307 {
308 if ((*ptr) == RNIL)
309 {
310 ndbrequire(find_page_id_in_list(fragPtrP, i) == false);
311 }
312 else
313 {
314 Uint32 realpid = ((*ptr) & (Uint32)~LCP_SCANNED_BIT);
315 if (realpid & FREE_PAGE_BIT)
316 {
317 ndbrequire(find_page_id_in_list(fragPtrP, i) == true);
318 }
319 else
320 {
321 PagePtr pagePtr;
322 c_page_pool.getPtr(pagePtr, realpid);
323 ndbrequire(pagePtr.p->frag_page_id == i);
324 ndbrequire(pagePtr.p->physical_page_id == realpid);
325 ndbrequire(find_page_id_in_list(fragPtrP, i) == false);
326 }
327 }
328 }
329 }
330 }
331 #else
check_page_map(Fragrecord *)332 void Dbtup::check_page_map(Fragrecord*) {}
333 #endif
334 #else
335 #define do_check_page_map(x)
336 #endif
337
338 Uint32
getRealpidScan(Fragrecord * regFragPtr,Uint32 logicalPageId,Uint32 ** next_ptr,Uint32 ** prev_ptr)339 Dbtup::getRealpidScan(Fragrecord* regFragPtr,
340 Uint32 logicalPageId,
341 Uint32 **next_ptr,
342 Uint32 **prev_ptr)
343 {
344 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
345 Uint32 * ptr = map.get_dirty(2 * logicalPageId);
346 if (ptr == 0 || (*ptr) == RNIL)
347 {
348 jam();
349 ptr = init_page_map_entry(regFragPtr, logicalPageId);
350 if (ptr == 0)
351 {
352 /**
353 * This logical page id doesn't have any reference at all in the page
354 * map. This means that it cannot have been used since the data node
355 * was started or since the fragment was created. So it can definitely
356 * not have any LCP_SCANNED_BIT set since this only happens when a
357 * page is being dropped, to be dropped a page has to be mapped and once
358 * it is mapped the map isn't removed.
359 */
360 jam();
361 *next_ptr = *prev_ptr = 0;
362 return RNIL;
363 }
364 }
365 ndbrequire(ptr != 0);
366 *next_ptr = ptr;
367 *prev_ptr = map.get_dirty(2 * logicalPageId + 1);
368 Uint32 val = *ptr;
369 ndbassert(val != RNIL);
370 if ((val & FREE_PAGE_BIT) != 0)
371 {
372 jam();
373 return RNIL;
374 }
375 else
376 {
377 jam();
378 return (val & PAGE_BIT_MASK);
379 }
380 }
381
382 void
set_last_lcp_state(Fragrecord * regFragPtr,Uint32 logicalPageId,bool is_new_state_D)383 Dbtup::set_last_lcp_state(Fragrecord *regFragPtr,
384 Uint32 logicalPageId,
385 bool is_new_state_D)
386 {
387 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
388 Uint32 *ptr = map.set(2 * logicalPageId + 1);
389 ndbrequire(ptr != (Uint32*)0);
390 ndbassert((*ptr) != RNIL);
391 set_last_lcp_state(ptr, is_new_state_D);
392 do_check_page_map(regFragPtr);
393 }
394
395 void
set_last_lcp_state(Uint32 * ptr,bool is_new_state_D)396 Dbtup::set_last_lcp_state(Uint32 *ptr, bool is_new_state_D)
397 {
398 if (unlikely(ptr == 0))
399 {
400 jam();
401 return;
402 }
403 Uint32 val = *ptr;
404 ndbassert((val & FREE_PAGE_BIT) == FREE_PAGE_BIT);
405 Uint32 new_last_lcp_state =
406 is_new_state_D ? LAST_LCP_FREE_BIT : 0;
407 val &= (Uint32)~LAST_LCP_FREE_BIT;
408 val |= new_last_lcp_state;
409 *ptr = val;
410 }
411
412 bool
get_lcp_scanned_bit(Uint32 * next_ptr)413 Dbtup::get_lcp_scanned_bit(Uint32 *next_ptr)
414 {
415 if (next_ptr == 0)
416 {
417 jam();
418 return true;
419 }
420 if (((*next_ptr) & LCP_SCANNED_BIT) != 0)
421 {
422 jam();
423 return true;
424 }
425 jam();
426 return false;
427 }
428
429 bool
get_lcp_scanned_bit(Fragrecord * regFragPtr,Uint32 logicalPageId)430 Dbtup::get_lcp_scanned_bit(Fragrecord *regFragPtr, Uint32 logicalPageId)
431 {
432 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
433 Uint32 *ptr = map.set(2 * logicalPageId);
434 return get_lcp_scanned_bit(ptr);
435 }
436
437 /**
438 * Currently not used code, can be activated when we can decrease
439 * m_max_page_cnt.
440 *
441 *void
442 *Dbtup::reset_lcp_scanned_bit(Fragrecord *regFragPtr, Uint32 logicalPageId)
443 *{
444 * DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
445 * Uint32 *ptr = map.set(2 * logicalPageId);
446 * ndbassert(ptr != 0);
447 * ndbassert((*ptr) != RNIL);
448 *#ifdef DEBUG_LCP_SCANNED_BIT
449 * if ((*ptr) & LCP_SCANNED_BIT)
450 * {
451 * g_eventLogger->info("(%u)tab(%u,%u):%u reset_lcp_scanned_bit",
452 * instance(),
453 * regFragPtr->fragTableId,
454 * regFragPtr->fragmentId,
455 * logicalPageId);
456 * }
457 *#endif
458 * *ptr = (*ptr) & (Uint32)~LCP_SCANNED_BIT;
459 * do_check_page_map(regFragPtr);
460 *}
461 */
462
463 void
reset_lcp_scanned_bit(Uint32 * next_ptr)464 Dbtup::reset_lcp_scanned_bit(Uint32 *next_ptr)
465 {
466 if (next_ptr == 0)
467 {
468 jam();
469 return;
470 }
471 *next_ptr = (*next_ptr) & (Uint32)~LCP_SCANNED_BIT;
472 }
473
474 bool
get_last_lcp_state(Uint32 * prev_ptr)475 Dbtup::get_last_lcp_state(Uint32 *prev_ptr)
476 {
477 if (prev_ptr == 0)
478 {
479 jam();
480 /**
481 * If getRealpidScan returned a NULL pointer then the page
482 * definitely didn't exist at the last LCP.
483 */
484 return true;
485 }
486 if (((*prev_ptr) & LAST_LCP_FREE_BIT) != 0)
487 {
488 jam();
489 return true;
490 }
491 else
492 {
493 jam();
494 return false;
495 }
496 }
497
498 Uint32
insert_new_page_into_page_map(EmulatedJamBuffer * jamBuf,Fragrecord * regFragPtr,PagePtr pagePtr,Uint32 noOfPagesAllocated)499 Dbtup::insert_new_page_into_page_map(EmulatedJamBuffer *jamBuf,
500 Fragrecord *regFragPtr,
501 PagePtr pagePtr,
502 Uint32 noOfPagesAllocated)
503 {
504 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
505 Uint32 pageId = regFragPtr->m_max_page_cnt;
506 Uint32 *ptr;
507 Uint32 *prev_ptr = 0;
508 if (pageId >= MAX_PAGES_IN_DYN_ARRAY ||
509 ((prev_ptr = map.set(2 * pageId + 1)) == 0) ||
510 ((ptr = map.set(2 * pageId)) == 0))
511 {
512 thrjam(jamBuf);
513 if (prev_ptr != 0)
514 {
515 jam();
516 *prev_ptr = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
517 }
518 returnCommonArea(pagePtr.i, noOfPagesAllocated);
519 return RNIL;
520 }
521 /**
522 * This should always get a new entry and this always is set initialised
523 * to RNIL.
524 */
525 ndbrequire(*ptr == RNIL);
526 *ptr = pagePtr.i;
527 /* Ensure LAST_LCP_FREE_BIT is initialised to 1 */
528 *prev_ptr = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
529 regFragPtr->m_max_page_cnt = pageId + 1;
530 if (DBUG_PAGE_MAP)
531 {
532 g_eventLogger->info("(%u)tab(%u,%u), new maxII: %u",
533 instance(),
534 regFragPtr->fragTableId,
535 regFragPtr->fragmentId,
536 regFragPtr->m_max_page_cnt);
537 }
538 return pageId;
539 }
540
541 Uint32
remove_first_free_from_page_map(EmulatedJamBuffer * jamBuf,Fragrecord * regFragPtr,PagePtr pagePtr)542 Dbtup::remove_first_free_from_page_map(EmulatedJamBuffer *jamBuf,
543 Fragrecord *regFragPtr,
544 PagePtr pagePtr)
545 {
546 Uint32 pageId = regFragPtr->m_free_page_id_list;
547 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
548 Uint32 *ptr = map.set(2 * pageId);
549 ndbrequire(ptr != 0);
550 ndbassert((*ptr) != RNIL);
551 Uint32 ptr_val = *ptr;
552 ndbrequire((ptr_val & FREE_PAGE_BIT) != 0);
553 Uint32 lcp_scanned_bit = ptr_val & LCP_SCANNED_BIT;
554 Uint32 next = ptr_val & PAGE_BIT_MASK;
555 *ptr = (pagePtr.i | lcp_scanned_bit);
556
557 #ifdef DEBUG_LCP_SCANNED_BIT
558 if (lcp_scanned_bit)
559 {
560 g_eventLogger->info("(%u)tab(%u,%u):%u remove_first_free_from_page_map",
561 instance(),
562 regFragPtr->fragTableId,
563 regFragPtr->fragmentId,
564 pageId);
565 }
566 #endif
567
568 if (next != FREE_PAGE_RNIL)
569 {
570 thrjam(jamBuf);
571 Uint32 * nextPrevPtr = map.set(2 * next + 1);
572 ndbrequire(nextPrevPtr != 0);
573 ndbassert((*nextPrevPtr) != RNIL);
574 ndbassert(((*nextPrevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
575 Uint32 last_lcp_free_bit = (*nextPrevPtr) & LAST_LCP_FREE_BIT;
576 *nextPrevPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT | last_lcp_free_bit;
577 }
578 regFragPtr->m_free_page_id_list = next;
579 DEB_LCP_FREE(("(%u)m_free_page_id_list(1), tab(%u,%u):%u",
580 instance(),
581 regFragPtr->fragTableId,
582 regFragPtr->fragmentId,
583 next));
584 return pageId;
585 }
586
587 void
remove_page_id_from_dll(Fragrecord * fragPtrP,Uint32 page_no,Uint32 pagePtrI,Uint32 * ptr)588 Dbtup::remove_page_id_from_dll(Fragrecord *fragPtrP,
589 Uint32 page_no,
590 Uint32 pagePtrI,
591 Uint32 *ptr)
592 {
593 DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
594 const Uint32 *prevPtr = map.set(2 * page_no + 1);
595 ndbrequire(prevPtr != 0);
596 ndbassert((*prevPtr) != RNIL);
597 ndbassert(((*prevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
598 Uint32 next = *ptr;
599 Uint32 prev = *prevPtr;
600 {
601 /**
602 * Set new entry in DynArray before list manipulations, ensure that
603 * we don't forget the LCP_SCANNED_BIT.
604 */
605 Uint32 lcp_scanned_bit = next & LCP_SCANNED_BIT;
606 *ptr = pagePtrI | lcp_scanned_bit;
607 #ifdef DEBUG_LCP_SCANNED_BIT
608 if (lcp_scanned_bit)
609 {
610 g_eventLogger->info("(%u)tab(%u,%u):%u remove_page_id_from_dll",
611 instance(),
612 fragPtrP->fragTableId,
613 fragPtrP->fragmentId,
614 page_no);
615 }
616 #endif
617 }
618 next &= PAGE_BIT_MASK;
619 prev &= PAGE_BIT_MASK;
620 if (next == FREE_PAGE_RNIL)
621 {
622 jam();
623 // This should be end of list...
624 if (prev == FREE_PAGE_RNIL)
625 {
626 jam();
627 /* page_no is both head and tail */
628 if (fragPtrP->m_free_page_id_list != page_no)
629 {
630 g_eventLogger->info("(%u)m_free_page_id_list = %u,"
631 " tab(%u,%u):%u",
632 instance(),
633 fragPtrP->m_free_page_id_list,
634 fragPtrP->fragTableId,
635 fragPtrP->fragmentId,
636 page_no);
637 ndbrequire(fragPtrP->m_free_page_id_list == page_no);
638 }
639 fragPtrP->m_free_page_id_list = FREE_PAGE_RNIL;
640 DEB_LCP_FREE(("(%u)m_free_page_id_list(2), tab(%u,%u):FREE_PAGE_RNIL",
641 instance(),
642 fragPtrP->fragTableId,
643 fragPtrP->fragmentId));
644 }
645 else
646 {
647 jam();
648 /* page_no is tail, but not head */
649 Uint32 *prevNextPtr = map.set(2 * prev);
650 ndbrequire(prevNextPtr != 0);
651 ndbassert((*prevNextPtr) != RNIL);
652 Uint32 prevNext = *prevNextPtr;
653 ndbrequire(prevNext & FREE_PAGE_BIT);
654 Uint32 lcp_scanned_bit = prevNext & LCP_SCANNED_BIT;
655 ndbrequire((prevNext & PAGE_BIT_MASK) == page_no);
656 *prevNextPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT | lcp_scanned_bit;
657 }
658 }
659 else
660 {
661 jam();
662 Uint32 *nextPrevPtr = map.set(2 * next + 1);
663 ndbrequire(nextPrevPtr != 0);
664 ndbassert((*nextPrevPtr) != RNIL);
665 ndbassert(((*nextPrevPtr) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
666 Uint32 nextPrev = (*nextPrevPtr) & PAGE_BIT_MASK;
667 Uint32 last_lcp_free_bit = (*nextPrevPtr) & LAST_LCP_FREE_BIT;
668 ndbrequire(nextPrev == page_no);
669 *nextPrevPtr = prev | last_lcp_free_bit | FREE_PAGE_BIT;
670 if (prev == FREE_PAGE_RNIL)
671 {
672 jam();
673 /* page_no is head but not tail */
674 ndbrequire(fragPtrP->m_free_page_id_list == page_no);
675 fragPtrP->m_free_page_id_list = next;
676 DEB_LCP_FREE(("(%u)m_free_page_id_list(3), tab(%u,%u):%u",
677 instance(),
678 fragPtrP->fragTableId,
679 fragPtrP->fragmentId,
680 next));
681 }
682 else
683 {
684 jam();
685 /* page_no is neither head nor tail */
686 Uint32 *prevNextPtr = map.get(2 * prev);
687 ndbrequire(prevNextPtr != 0);
688 Uint32 prevNext = *prevNextPtr;
689 Uint32 lcp_scanned_bit = prevNext & LCP_SCANNED_BIT;
690 ndbrequire(prevNext & FREE_PAGE_BIT);
691 prevNext &= PAGE_BIT_MASK;
692 ndbrequire(prevNext == page_no);
693 *prevNextPtr = next | FREE_PAGE_BIT | lcp_scanned_bit;
694 }
695 }
696 }
697
698 void
handle_lcp_skip_bit(EmulatedJamBuffer * jamBuf,Fragrecord * fragPtrP,PagePtr pagePtr,Uint32 page_no)699 Dbtup::handle_lcp_skip_bit(EmulatedJamBuffer *jamBuf,
700 Fragrecord *fragPtrP,
701 PagePtr pagePtr,
702 Uint32 page_no)
703 {
704 Uint32 lcp_scan_ptr_i = fragPtrP->m_lcp_scan_op;
705 if (lcp_scan_ptr_i != RNIL)
706 {
707 thrjam(jamBuf);
708 DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
709 const Uint32 *ptr = map.set(2 * page_no);
710 ndbrequire(ptr != 0);
711 ndbassert((*ptr) != RNIL);
712 Uint32 lcp_scanned_bit = (*ptr) & LCP_SCANNED_BIT;
713 ScanOpPtr scanOp;
714 scanOp.i = lcp_scan_ptr_i;
715 ndbrequire(c_scanOpPool.getValidPtr(scanOp));
716 Local_key key;
717 key.m_page_no = page_no;
718 key.m_page_idx = ZNIL;
719 if (is_rowid_in_remaining_lcp_set(pagePtr.p,
720 fragPtrP,
721 key,
722 *scanOp.p,
723 2 /* Debug for LCP skip bit */))
724 {
725 thrjam(jamBuf);
726 if (lcp_scanned_bit == 0)
727 {
728 thrjam(jamBuf);
729 /**
730 * We allocated a page during an LCP, it was within the pages that
731 * will be checked during the LCP scan. The page has also not yet
732 * been scanned by the LCP. Given that we know that the page will
733 * only contain rows that would set the LCP_SKIP bit we will
734 * set the LCP skip on the page level instead to speed up LCP
735 * processing.
736 *
737 * We use this bit both for ALL ROWS pages and CHANGED ROWS pages.
738 * When we come to the scanning of this page we will decide what
739 * to do with the page whether to skip or record it as DELETE by
740 * PAGEID.
741 */
742 /* Coverage tested */
743 DEB_LCP_SKIP(("(%u)LCP_SKIP in tab(%u,%u):%u",
744 instance(),
745 fragPtrP->fragTableId,
746 fragPtrP->fragmentId,
747 page_no));
748 pagePtr.p->set_page_to_skip_lcp();
749 c_backup->alloc_page_after_lcp_start(page_no);
750 }
751 else
752 {
753 jam();
754 /* Coverage tested */
755 /**
756 * The page had already been handled since it had been dropped
757 * after LCP start and is now allocated again still before the
758 * LCP scan reached it. No need to do anything since its LCP
759 * scanning was handled at drop time.
760 */
761 }
762 }
763 else
764 {
765 if (lcp_scanned_bit)
766 {
767 g_eventLogger->info("(%u):lcp_scanned_bit crash on tab(%u,%u):%u",
768 instance(),
769 fragPtrP->fragTableId,
770 fragPtrP->fragmentId,
771 page_no);
772 }
773 ndbrequire(lcp_scanned_bit == 0);
774 }
775 }
776 }
777
778 void
handle_new_page(EmulatedJamBuffer * jamBuf,Fragrecord * fragPtrP,Tablerec * tabPtrP,PagePtr pagePtr,Uint32 page_no)779 Dbtup::handle_new_page(EmulatedJamBuffer *jamBuf,
780 Fragrecord *fragPtrP,
781 Tablerec* tabPtrP,
782 PagePtr pagePtr,
783 Uint32 page_no)
784 {
785 DEB_LCP_ALLOC(("(%u)allocFragPage: tab(%u,%u) page(%u)",
786 instance(),
787 fragPtrP->fragTableId,
788 fragPtrP->fragmentId,
789 page_no));
790 c_page_pool.getPtr(pagePtr);
791 init_page(fragPtrP, pagePtr, page_no);
792 handle_lcp_skip_bit(jamBuf, fragPtrP, pagePtr, page_no);
793 convertThPage((Fix_page*)pagePtr.p, tabPtrP, MM);
794 {
795 LocalDLFifoList<Page_pool> free_pages(c_page_pool, fragPtrP->thFreeFirst);
796 pagePtr.p->page_state = ZTH_MM_FREE;
797 free_pages.addFirst(pagePtr);
798 }
799 if (DBUG_PAGE_MAP)
800 {
801 g_eventLogger->info("(%u)tab(%u,%u):%u alloc -> (%u max: %u)",
802 instance(),
803 fragPtrP->fragTableId,
804 fragPtrP->fragmentId,
805 page_no,
806 pagePtr.i,
807 fragPtrP->m_max_page_cnt);
808 }
809
810 do_check_page_map(fragPtrP);
811 }
812
813 Uint32
allocFragPage(EmulatedJamBuffer * jamBuf,Uint32 * err,Fragrecord * regFragPtr,Tablerec * regTabPtr)814 Dbtup::allocFragPage(EmulatedJamBuffer* jamBuf,
815 Uint32 * err,
816 Fragrecord* regFragPtr,
817 Tablerec *regTabPtr)
818 {
819 PagePtr pagePtr;
820 Uint32 noOfPagesAllocated = 0;
821 Uint32 list = regFragPtr->m_free_page_id_list;
822
823 allocConsPages(jamBuf, 1, noOfPagesAllocated, pagePtr.i);
824 if (noOfPagesAllocated == 0)
825 {
826 thrjam(jamBuf);
827 * err = ZMEM_NOMEM_ERROR;
828 return RNIL;
829 }//if
830
831 Uint32 pageId;
832 if (list == FREE_PAGE_RNIL)
833 {
834 thrjam(jamBuf);
835 pageId = insert_new_page_into_page_map(jamBuf,
836 regFragPtr,
837 pagePtr,
838 noOfPagesAllocated);
839 DEB_LCP(("(%u)allocFragPage(1): tab(%u,%u):%u",
840 instance(),
841 regFragPtr->fragTableId,
842 regFragPtr->fragmentId,
843 pageId));
844 if (pageId == RNIL)
845 {
846 thrjam(jamBuf);
847 * err = ZMEM_NOMEM_ERROR;
848 return RNIL;
849 }
850 }
851 else
852 {
853 thrjam(jamBuf);
854 pageId = remove_first_free_from_page_map(jamBuf, regFragPtr, pagePtr);
855 DEB_LCP(("(%u)allocFragPage(2): tab(%u,%u):%u",
856 instance(),
857 regFragPtr->fragTableId,
858 regFragPtr->fragmentId,
859 pageId));
860 }
861 if (DBUG_PAGE_MAP)
862 {
863 DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
864 Uint32 *ptr = map.set(2 * pageId);
865 ndbrequire(ptr != 0);
866 ndbassert((*ptr) != RNIL);
867 g_eventLogger->info("(%u)tab(%u,%u) allocRI(%u %u max: %u next: %x)",
868 instance(),
869 regFragPtr->fragTableId,
870 regFragPtr->fragmentId,
871 pageId,
872 pagePtr.i,
873 regFragPtr->m_max_page_cnt,
874 *ptr);
875 }
876 regFragPtr->noOfPages++;
877 handle_new_page(jamBuf, regFragPtr, regTabPtr, pagePtr, pageId);
878 return pagePtr.i;
879 }//Dbtup::allocFragPage()
880
881 Uint32
allocFragPage(Uint32 * err,Tablerec * tabPtrP,Fragrecord * fragPtrP,Uint32 page_no)882 Dbtup::allocFragPage(Uint32 * err,
883 Tablerec* tabPtrP, Fragrecord* fragPtrP, Uint32 page_no)
884 {
885 PagePtr pagePtr;
886 ndbrequire(page_no < MAX_PAGES_IN_DYN_ARRAY);
887 DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
888 DEB_LCP(("(%u)allocFragPage(3): tab(%u,%u):%u",
889 instance(),
890 fragPtrP->fragTableId,
891 fragPtrP->fragmentId,
892 page_no));
893 Uint32 *prev_ptr = map.set(2 * page_no + 1);
894 if (unlikely(prev_ptr == 0))
895 {
896 jam();
897 *err = ZMEM_NOMEM_ERROR;
898 return RNIL;
899 }
900 Uint32 * ptr = map.set(2 * page_no);
901 if (unlikely(ptr == 0))
902 {
903 jam();
904 *prev_ptr = FREE_PAGE_RNIL | LAST_LCP_FREE_BIT;
905 * err = ZMEM_NOMEM_ERROR;
906 return RNIL;
907 }
908 pagePtr.i = * ptr;
909 if (likely(pagePtr.i != RNIL && (pagePtr.i & FREE_PAGE_BIT) == 0))
910 {
911 jam();
912 return (pagePtr.i & PAGE_BIT_MASK);
913 }
914
915 Uint32 noOfPagesAllocated = 0;
916 allocConsPages(jamBuffer(), 1, noOfPagesAllocated, pagePtr.i);
917 if (unlikely(noOfPagesAllocated == 0))
918 {
919 jam();
920 * err = ZMEM_NOMEM_ERROR;
921 return RNIL;
922 }
923
924 if ((*ptr) == RNIL)
925 {
926 /**
927 * DynArr256 delivered a fresh new entry, so no flags are initialised
928 * and we will treat it as if it returned FREE_PAGE_BIT set,
929 * LCP_SCANNED_BIT not set, but also not in any free list, so no need
930 * to remove it from the doubly linked list. We will make use of entry
931 * for a new page, so we don't set the FREE_PAGE_BIT either, we simply
932 * insert the page pointer.
933 *
934 * Since it is the first occurrence of the page we initialise that the
935 * page was free at the last LCP. We always need to set the FREE_PAGE_BIT
936 * also to ensure that drop fragment doesn't drop things from the
937 * prev_ptr position.
938 */
939 jam();
940 *ptr = pagePtr.i;
941 *prev_ptr = FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
942 }
943 else
944 {
945 jam();
946 /**
947 * This page id was in the doubly linked list free list, we need to remove
948 * it from this list.
949 */
950 remove_page_id_from_dll(fragPtrP, page_no, pagePtr.i, ptr);
951 }
952 if (DBUG_PAGE_MAP)
953 {
954 g_eventLogger->info("(%u)tab(%u,%u):%u alloc(%u max: %u next: %x)",
955 instance(),
956 fragPtrP->fragTableId,
957 fragPtrP->fragmentId,
958 page_no,
959 pagePtr.i,
960 fragPtrP->m_max_page_cnt,
961 *ptr);
962 }
963 Uint32 max = fragPtrP->m_max_page_cnt;
964 fragPtrP->noOfPages++;
965
966 if (page_no + 1 > max)
967 {
968 jam();
969 fragPtrP->m_max_page_cnt = page_no + 1;
970 if (DBUG_PAGE_MAP)
971 {
972 g_eventLogger->info("(%u)tab(%u,%u) new max: %u",
973 instance(),
974 fragPtrP->fragTableId,
975 fragPtrP->fragmentId,
976 fragPtrP->m_max_page_cnt);
977 }
978 }
979 handle_new_page(jamBuffer(), fragPtrP, tabPtrP, pagePtr, page_no);
980 return pagePtr.i;
981 }
982
983 void
releaseFragPage(Fragrecord * fragPtrP,Uint32 logicalPageId,PagePtr pagePtr)984 Dbtup::releaseFragPage(Fragrecord* fragPtrP,
985 Uint32 logicalPageId,
986 PagePtr pagePtr)
987 {
988 DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
989 DEB_LCP_REL(("(%u)releaseFragPage: tab(%u,%u) page(%u)",
990 instance(),
991 fragPtrP->fragTableId,
992 fragPtrP->fragmentId,
993 logicalPageId));
994 Uint32 *next = map.set(2 * logicalPageId);
995 Uint32 *prev = map.set(2 * logicalPageId + 1);
996 ndbrequire(next != 0 && prev != 0);
997 ndbassert(((*prev) & FREE_PAGE_BIT) == FREE_PAGE_BIT);
998
999 bool page_freed = false;
1000 Uint32 lcp_scanned_bit = (*next) & LCP_SCANNED_BIT;
1001 Uint32 last_lcp_state = (*prev) & LAST_LCP_FREE_BIT;
1002 Uint32 lcp_scan_ptr_i = fragPtrP->m_lcp_scan_op;
1003 bool lcp_to_scan = false;
1004 if (lcp_scan_ptr_i != RNIL)
1005 {
1006 /**
1007 * We use the method is_rowid_in_remaining_lcp_set. We set the
1008 * key to the page and beyond the last row in the page. This means
1009 * that if the page is not fully scanned yet we will set the
1010 * LCP_SCANNED_BIT, in this case we might have already handled
1011 * some of the rows, so there is a small probability that we will
1012 * duplicate some DELETE BY ROWID, but it should only have a minor
1013 * performance impact. Otherwise we will ignore it.
1014 */
1015 ScanOpPtr scanOp;
1016 Local_key key;
1017 scanOp.i = lcp_scan_ptr_i;
1018 ndbrequire(c_scanOpPool.getValidPtr(scanOp));
1019 key.m_page_no = logicalPageId;
1020 key.m_page_idx = ZNIL;
1021 if (is_rowid_in_remaining_lcp_set(pagePtr.p,
1022 fragPtrP,
1023 key,
1024 *scanOp.p,
1025 1 /* Debug for LCP scanned bit */) ||
1026 pagePtr.p->is_page_to_skip_lcp())
1027 {
1028 jam();
1029 lcp_to_scan = true;
1030 if (lcp_scanned_bit == 0)
1031 {
1032 jam();
1033 /**
1034 * Page is being dropped and it is part of LCP and has not
1035 * yet been scanned by LCP. This means we need to act right
1036 * now before we release the page and record the needed
1037 * information. Also we haven't already dropped the page
1038 * already before in this LCP scan.
1039 *
1040 * is_rowid_in_remaining_lcp_set is normally false when
1041 * is_page_to_skip_lcp is true. The problem however is that
1042 * this is a state on the page, and the page is being dropped,
1043 * so in this case we need to ensure that the page is not
1044 * containing any row at restore. We ensure this by using
1045 * a DELETE BY PAGEID in this case. We also flag that the
1046 * page have been scanned for LCP in the page map. It is
1047 * possible to arrive here after allocate, drop, allocate
1048 * and drop again. In this case the LCP scanned bit will
1049 * still be set and we can ignore the page.
1050 *
1051 * We will release the page during handle_lcp_drop_change
1052 * to ensure that we are certain to get the space we
1053 * need to store the space needed to store things into the
1054 * LCP keep list.
1055 *
1056 * If the SKIP_LCP flag was set on the page then this page
1057 * was added since the start of the LCP and in that case
1058 * we record the page as a DELETE by PAGEID and then sets
1059 * the LCP_SCANNED_BIT to ensure that any further allocation
1060 * and release of the fragment before LCP scan has passed it
1061 * is ignored.
1062 *
1063 * For ALL ROWS pages the LCP scan is always completed in
1064 * this state, all the rows existing at start of LCP has been
1065 * deleted and all of those were put into LCP through the LCP
1066 * keep list. So we ensure that the page is ignored for the
1067 * rest of the LCP scan by setting the LCP_SCANNED_BIT here
1068 * as well.
1069 *
1070 * No need to clear the page to skip lcp flag here since the
1071 * page is dropped immediately following this.
1072 *
1073 * If last page state was D the page will be empty at the
1074 * previous LCP, so this means that there is no need to
1075 * delete rows that won't be there. There is not really any
1076 * problem in performing deletes in this case, but it would
1077 * cause unnecessary work. The rows that was present in the
1078 * page at start of LCP have already been handled through
1079 * LCP keep list.
1080 *
1081 * If last state was A there was a set of rows installed by
1082 * the previous LCP, some of those rows will remain if we
1083 * don't ensure that they are removed. So in this case we
1084 * remove all rows on the page that hasn't got the LCP_SKIP
1085 * flag set. Those rows with this flag set have been handled
1086 * by the LCP keep list before arriving here.
1087 */
1088 bool is_change_part = c_backup->is_change_part_state(logicalPageId);
1089 DEB_LCP_SCANNED_BIT(("(%u)Set lcp_scanned_bit on tab(%u,%u):%u",
1090 instance(),
1091 fragPtrP->fragTableId,
1092 fragPtrP->fragmentId,
1093 logicalPageId));
1094 lcp_scanned_bit = LCP_SCANNED_BIT;
1095 bool page_to_skip_lcp = pagePtr.p->is_page_to_skip_lcp();
1096 Uint32 new_last_lcp_state;
1097 if (page_to_skip_lcp)
1098 {
1099 new_last_lcp_state = LAST_LCP_FREE_BIT;
1100 c_backup->alloc_dropped_page_after_lcp_start(is_change_part);
1101 }
1102 else
1103 {
1104 new_last_lcp_state = 0;
1105 c_backup->dropped_page_after_lcp_start(is_change_part,
1106 (last_lcp_state == 0));
1107 }
1108 if (is_change_part && (last_lcp_state == 0))
1109 {
1110 /**
1111 * Page is a change page and last LCP state was A.
1112 * We set page_freed to true, the reason is that we're
1113 * "stealing" the page to be deleted for use by the
1114 * LCP keep free list. This removes any possibility that
1115 * we will run out of memory for this operation.
1116 *
1117 * The page is removed later when the LCP keep list operation
1118 * is completed.
1119 */
1120 jam();
1121 /* Coverage tested */
1122 c_page_pool.getPtr(pagePtr);
1123 bool delete_by_pageid = page_to_skip_lcp;
1124 page_freed = true;
1125 ndbrequire(c_backup->is_partial_lcp_enabled());
1126 handle_lcp_drop_change_page(fragPtrP,
1127 logicalPageId,
1128 pagePtr,
1129 delete_by_pageid);
1130 }
1131 else
1132 {
1133 /* Coverage tested */
1134 DEB_LCP_REL(("(%u) change_part: %u, last_lcp_state: %u "
1135 "in tab(%u,%u) page(%u)",
1136 instance(),
1137 is_change_part,
1138 last_lcp_state,
1139 fragPtrP->fragTableId,
1140 fragPtrP->fragmentId,
1141 logicalPageId));
1142 }
1143 last_lcp_state = new_last_lcp_state;
1144 }
1145 else
1146 {
1147 DEB_LCP_REL(("(%u)lcp_scanned_bit already set when page released"
1148 "in tab(%u,%u) page(%u)",
1149 instance(),
1150 fragPtrP->fragTableId,
1151 fragPtrP->fragmentId,
1152 logicalPageId));
1153 /* Coverage tested */
1154 }
1155 }
1156 else
1157 {
1158 /* Coverage tested */
1159 }
1160 }
1161 if (!lcp_to_scan)
1162 {
1163 if (unlikely(lcp_scanned_bit != 0))
1164 {
1165 g_eventLogger->info("(%u)tab(%u,%u):%u crash lcp_scanned_bit set",
1166 instance(),
1167 fragPtrP->fragTableId,
1168 fragPtrP->fragmentId,
1169 logicalPageId);
1170 ndbrequire(lcp_scanned_bit == 0);
1171 }
1172 }
1173 if (!page_freed)
1174 {
1175 jam();
1176 returnCommonArea(pagePtr.i, 1);
1177 }
1178
1179 #ifdef DEBUG_LCP_SCANNED_BIT
1180 if (lcp_scanned_bit)
1181 {
1182 g_eventLogger->info("(%u)tab(%u,%u):%u set lcp_scanned_bit",
1183 instance(),
1184 fragPtrP->fragTableId,
1185 fragPtrP->fragmentId,
1186 logicalPageId);
1187
1188 }
1189 #endif
1190
1191 const char *where = insert_free_page_id_list(fragPtrP,
1192 logicalPageId,
1193 next,
1194 prev,
1195 lcp_scanned_bit,
1196 last_lcp_state);
1197 fragPtrP->noOfPages--;
1198 if (DBUG_PAGE_MAP)
1199 {
1200 g_eventLogger->info("(%u)tab(%u,%u):%u release(%u)@%s",
1201 instance(),
1202 fragPtrP->fragTableId,
1203 fragPtrP->fragmentId,
1204 logicalPageId,
1205 pagePtr.i,
1206 where);
1207 }
1208 do_check_page_map(fragPtrP);
1209 }
1210
1211 const char*
insert_free_page_id_list(Fragrecord * fragPtrP,Uint32 logicalPageId,Uint32 * next,Uint32 * prev,Uint32 lcp_scanned_bit,Uint32 last_lcp_state)1212 Dbtup::insert_free_page_id_list(Fragrecord *fragPtrP,
1213 Uint32 logicalPageId,
1214 Uint32 *next,
1215 Uint32 *prev,
1216 Uint32 lcp_scanned_bit,
1217 Uint32 last_lcp_state)
1218 {
1219 /**
1220 * Add to head or tail of list...
1221 */
1222 DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
1223 Uint32 list = fragPtrP->m_free_page_id_list;
1224 const char * where = 0;
1225 if (list == FREE_PAGE_RNIL)
1226 {
1227 jam();
1228 *next = FREE_PAGE_RNIL | FREE_PAGE_BIT | lcp_scanned_bit;
1229 *prev = FREE_PAGE_RNIL | FREE_PAGE_BIT | last_lcp_state;
1230 fragPtrP->m_free_page_id_list = logicalPageId;
1231 DEB_LCP_FREE(("(%u)m_free_page_id_list(4), tab(%u,%u):%u",
1232 instance(),
1233 fragPtrP->fragTableId,
1234 fragPtrP->fragmentId,
1235 logicalPageId));
1236 where = (const char*)"empty";
1237 }
1238 else
1239 {
1240 jam();
1241 *next = list | FREE_PAGE_BIT | lcp_scanned_bit;
1242 *prev = FREE_PAGE_RNIL | FREE_PAGE_BIT | last_lcp_state;
1243 fragPtrP->m_free_page_id_list = logicalPageId;
1244 DEB_LCP_FREE(("(%u)m_free_page_id_list(5), tab(%u,%u):%u",
1245 instance(),
1246 fragPtrP->fragTableId,
1247 fragPtrP->fragmentId,
1248 logicalPageId));
1249 Uint32 *nextPrevPtr = map.set(2 * list + 1);
1250 ndbrequire(nextPrevPtr != 0);
1251 ndbassert((*nextPrevPtr) != RNIL);
1252 Uint32 nextPrev = *nextPrevPtr;
1253 Uint32 this_last_lcp_state = nextPrev & LAST_LCP_FREE_BIT;
1254 ndbrequire(((*nextPrevPtr) & PAGE_BIT_MASK) == FREE_PAGE_RNIL);
1255 *nextPrevPtr = logicalPageId | FREE_PAGE_BIT | this_last_lcp_state;
1256 where = (const char*)"head";
1257 }
1258 return where;
1259 }
1260
errorHandler(Uint32 errorCode)1261 void Dbtup::errorHandler(Uint32 errorCode)
1262 {
1263 switch (errorCode) {
1264 case 0:
1265 jam();
1266 break;
1267 case 1:
1268 jam();
1269 break;
1270 case 2:
1271 jam();
1272 break;
1273 default:
1274 jam();
1275 }
1276 ndbabort();
1277 }//Dbtup::errorHandler()
1278
1279 void
rebuild_page_free_list(Signal * signal)1280 Dbtup::rebuild_page_free_list(Signal* signal)
1281 {
1282 Ptr<Fragoperrec> fragOpPtr;
1283 fragOpPtr.i = signal->theData[1];
1284 Uint32 pageId = signal->theData[2];
1285 Uint32 tail = signal->theData[3];
1286 ptrCheckGuard(fragOpPtr, cnoOfFragoprec, fragoperrec);
1287
1288 Ptr<Fragrecord> fragPtr;
1289 fragPtr.i= fragOpPtr.p->fragPointer;
1290 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
1291
1292 if (pageId == fragPtr.p->m_max_page_cnt)
1293 {
1294 do_check_page_map(fragPtr.p);
1295 RestoreLcpConf* conf = (RestoreLcpConf*)signal->getDataPtrSend();
1296 conf->senderRef = reference();
1297 conf->senderData = fragOpPtr.p->m_senderData;
1298 conf->restoredLcpId = fragOpPtr.p->m_restoredLcpId;
1299 conf->restoredLocalLcpId = fragOpPtr.p->m_restoredLocalLcpId;
1300 conf->maxGciCompleted = fragOpPtr.p->m_maxGciCompleted;
1301 conf->afterRestore = 1;
1302 sendSignal(fragOpPtr.p->m_senderRef,
1303 GSN_RESTORE_LCP_CONF, signal,
1304 RestoreLcpConf::SignalLength, JBB);
1305
1306 releaseFragoperrec(fragOpPtr);
1307 return;
1308 }
1309
1310 DynArr256 map(c_page_map_pool, fragPtr.p->m_page_map);
1311 Uint32 *nextPtr = map.set(2 * pageId);
1312 Uint32 *prevPtr = map.set(2 * pageId + 1);
1313
1314 // Out of memory ?? Should not be possible here/now
1315 ndbrequire(nextPtr != 0 && prevPtr != 0);
1316
1317 /**
1318 * This is called as part of restore, the pages that are defined here
1319 * should have the LAST_LCP_FREE_BIT initialised to 0, those that are
1320 * not yet allocated should have their state initialised to 1.
1321 *
1322 * LAST_LCP_FREE_BIT indicates state at last LCP, at restart the last
1323 * LCP is the restore and this has now been completed. So after this
1324 * we need to have a well defined state of LAST_LCP_FREE.
1325 * Later allocations of new page ids are always assumed to not be part
1326 * of the last LCP.
1327 *
1328 * Using Partial LCP the restore process can call releaseFragPage.
1329 * In this case *nextPtr isn't equal to RNIL, but FREE_PAGE_BIT of
1330 * *nextPagePtr is set instead. So need to check for either of those
1331 * two events.
1332 */
1333 if (*nextPtr == RNIL || ((((*nextPtr) & FREE_PAGE_BIT) != 0)))
1334 {
1335 jam();
1336 /**
1337 * An unallocated page id...put in free list
1338 */
1339
1340 #if DBUG_PAGE_MAP
1341 const char * where;
1342 #endif
1343 if (tail == RNIL)
1344 {
1345 jam();
1346 ndbrequire(fragPtr.p->m_free_page_id_list == FREE_PAGE_RNIL);
1347 fragPtr.p->m_free_page_id_list = pageId;
1348 DEB_LCP_FREE(("(%u)m_free_page_id_list(6), tab(%u,%u):%u",
1349 instance(),
1350 fragPtr.p->fragTableId,
1351 fragPtr.p->fragmentId,
1352 pageId));
1353 *nextPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT;
1354 *prevPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
1355 #if DBUG_PAGE_MAP
1356 where = "head";
1357 #endif
1358 }
1359 else
1360 {
1361 jam();
1362 ndbrequire(fragPtr.p->m_free_page_id_list != FREE_PAGE_RNIL);
1363
1364 *nextPtr = FREE_PAGE_RNIL | FREE_PAGE_BIT;
1365 *prevPtr = tail | FREE_PAGE_BIT | LAST_LCP_FREE_BIT;
1366
1367 Uint32 * prevNextPtr = map.set(2 * tail);
1368 ndbrequire(prevNextPtr != 0);
1369 ndbrequire((*prevNextPtr) == (FREE_PAGE_RNIL | FREE_PAGE_BIT));
1370 *prevNextPtr = pageId | FREE_PAGE_BIT;
1371 #if DBUG_PAGE_MAP
1372 where = "tail";
1373 #endif
1374 DEB_LCP_FREE(("(%u)tab(%u,%u):%u into free page id list",
1375 instance(),
1376 fragPtr.p->fragTableId,
1377 fragPtr.p->fragmentId,
1378 pageId));
1379 }
1380 tail = pageId;
1381 #if DBUG_PAGE_MAP
1382 g_eventLogger->info("(%u)tab(%u,%u):%u adding page to free list @ %s",
1383 instance(),
1384 fragPtr.p->fragTableId,
1385 fragPtr.p->fragmentId,
1386 pageId,
1387 where);
1388 #endif
1389 }
1390 else
1391 {
1392 jam();
1393 /* Clear LAST_LCP_FREE_BIT and set FREE_PAGE_BIT */
1394 DEB_LCP_FREE(("(%u)tab(%u,%u):%u, next: %x, prev: %x",
1395 instance(),
1396 fragPtr.p->fragTableId,
1397 fragPtr.p->fragmentId,
1398 pageId,
1399 *nextPtr,
1400 *prevPtr));
1401 *prevPtr = (*prevPtr) & PAGE_BIT_MASK;
1402 *prevPtr = (*prevPtr) | FREE_PAGE_BIT;
1403 }
1404
1405 signal->theData[0] = ZREBUILD_FREE_PAGE_LIST;
1406 signal->theData[1] = fragOpPtr.i;
1407 signal->theData[2] = pageId + 1;
1408 signal->theData[3] = tail;
1409 sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB);
1410 }
1411